In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, coalesce
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import StructType as Struct, StructField as Field, DoubleType as Double, StringType as String, IntegerType as Int, LongType as Long, DateType as Date, TimestampType as Timestamp 

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['IAM']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['IAM']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()

In [4]:
input_data = config['S3']['SOURCE']
output_data = config['S3']['DESTINATION']

In [5]:
song_data = input_data + "song_data/A/A/*"
print(song_data)

s3a://udacity-dend/song_data/A/A/*


In [6]:
#song_data = input_data + "song_data/*/*/*"
#print(song_data)

In [7]:
df = spark.read.load(song_data, format="json")

In [8]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|           Tennessee|       -85.97874|Royal Philharmoni..

604

In [9]:
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates(("song_id",))

In [10]:
songs_table.printSchema()
songs_table.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOQXJID12A8AE456CD| Domingo astronómico|ARBJ3VU1187B9B472D|2005|264.69832|
|SOCVGQS12A8C13D7DF|When Morning Come...|ARR2TI31187FB380FE|1989|316.73424|
|SODOLVO12B0B80B2F4|                Hell|AR6XPWV1187B9ADAEB|2005|  117.002|
|SOFVOQL12A6D4F7456|The Boy With The ...|ARPN0Y61187B9ABAA0|1985|196.67546|
|SONEDGA12A8C13F068|   Die Dunkelsequenz|ARFA2P91187B9B35F0|1997|430.62812|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [11]:
songs_table.write.partitionBy("year","artist_id").mode("overwrite").save(output_data+"Songs.parquet")

In [12]:
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude").dropDuplicates(("artist_id",))\
                .withColumnRenamed("artist_name","name").withColumnRenamed("artist_location","location")\
                .withColumnRenamed("artist_latitude","latitude") .withColumnRenamed("artist_longitude","longitude")          

In [13]:
artists_table.printSchema()
artists_table.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+-------------------+--------------------+--------+---------+
|         artist_id|               name|            location|latitude|longitude|
+------------------+-------------------+--------------------+--------+---------+
|AR9ODB41187FB459B2|Organized Konfusion|SPRINGFIELD, Virg...|    null|     null|
|ARC6UC81187B989062|                Hem|  NY - New York City|40.71455|-74.00712|
|ARDDQKN1187FB50651|             Rednex|                    |    null|     null|
|ARV1P811187FB3CFC6|     Jupiter Rising|                    |    null|     null|
|ARZ7ICQ1187B9B76E1|     Fertile Ground|             Alabama|32.61436|-86.68073|
+------------------+-------------------+--------------------+--------+---------+
only showing top 5 rows



In [14]:
artists_table.write.mode("overwrite").save(output_data+"Artists.parquet")

In [15]:
log_data = input_data + "log_data/*/*"
print(log_data)

s3a://udacity-dend/log_data/*/*


In [16]:
df = spark.read.load(log_data, format="json")

In [18]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

8056

In [19]:
df = df.filter(col("page") == 'NextSong')

In [20]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|         

6820

In [21]:
users_table = df.select("userId", "firstName", "lastName", "gender", "level").dropDuplicates(("userId",))\
                .withColumnRenamed("userId","user_id").withColumnRenamed("firstName",'first_name').withColumnRenamed("lastName",'last_name')

In [22]:
users_table.printSchema()
users_table.show(5)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     51|      Maia|    Burke|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     54|     Kaleb|     Cook|     M| free|
|    101|    Jayden|      Fox|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [23]:
users_table.write.mode("overwrite").save(output_data+"Users.parquet")

In [24]:
get_datetime = udf(lambda t: datetime.fromtimestamp(t/1000), Timestamp())
df = df.withColumn("timestamp", get_datetime("ts"))

In [25]:
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|

In [26]:
time_table = df.select("timestamp","ts").dropDuplicates(("timestamp",)).withColumn("hour", hour("timestamp"))\
                .withColumnRenamed("ts","start_time").withColumn("day", dayofmonth("timestamp"))\
                .withColumn("week", weekofyear("timestamp")).withColumn("month", month("timestamp"))\
                .withColumn("year", year("timestamp")).withColumn("weekday", dayofweek("timestamp"))\
                .drop("timestamp")

In [27]:
time_table.printSchema()
time_table.show(5)

root
 |-- start_time: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------+----+---+----+-----+----+-------+
|   start_time|hour|day|week|month|year|weekday|
+-------------+----+---+----+-----+----+-------+
|1542781092796|   6| 21|  47|   11|2018|      4|
|1542208815796|  15| 14|  46|   11|2018|      4|
|1541435519796|  16|  5|  45|   11|2018|      2|
|1542132026796|  18| 13|  46|   11|2018|      3|
|1543552322796|   4| 30|  48|   11|2018|      6|
+-------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [28]:
time_table.write.partitionBy("year","month").mode("overwrite").save(output_data+"Time.parquet")

In [29]:
song_df = spark.read.load(output_data+"Songs.parquet")

In [30]:
song_df.printSchema()
song_df.show(5)
song_df.count()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...| 94.56281|   0|ARSUVLW12454A4C8B8|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|227.10812|2001|ARWUNH81187FB4A3E0|
|SOYVBGZ12A6D4F92A8|Piano Sonata No. ...|221.70077|   0|ARLRWBW1242077EB29|
|SODBHKO12A58A77F36|Fingers Of Love (...|335.93424|   0|ARKGS2Z1187FB494B5|
|SOGXFIF12A58A78CC4|Hanging On (Mediu...|204.06812|   0|AR5LZJD1187FB4C5E5|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



604

In [31]:
songplays_table = df.join(song_df, df.song == song_df.title, how="left").withColumn("songplay_id", monotonically_increasing_id())\
                .select("songplay_id","ts","userId","level","song_id","artist_id","sessionId","location","userAgent", year("timestamp").alias("year"),month("timestamp").alias("month"))\
                .withColumnRenamed("ts","start_time").withColumnRenamed("userId","user_id")\
                .withColumnRenamed("sessionId","session_id").withColumnRenamed("userAgent","user_agent")                

In [32]:
songplays_table.printSchema()
songplays_table.show(5)

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-----------+-------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|   start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+-------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          0|1542241826796|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          1|1542242481796|     26| free|   null|     null|    

In [33]:
songplays_table.write.partitionBy("year", "month").mode("overwrite").save(output_data+"Songplays.parquet")