In [4]:
%%configure -f
{ 
    "conf": 
        {
            "spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.0"
        }
}

In [26]:
from datetime import datetime
from pyspark.sql.types import StructType as R, StructField as Fld, \
     DoubleType as Dbl, LongType as Long, StringType as Str, \
     IntegerType as Int, DecimalType as Dec, DateType as Date, \
     TimestampType as Stamp
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, \
     date_format
import os

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def get_song_schema():
    song_schema = R([
        Fld("num_songs", Int()),
        Fld("artist_id", Str()),
        Fld("artist_latitude", Dec()),
        Fld("artist_longitude", Dec()),
        Fld("artist_location", Str()),
        Fld("artist_name", Str()),
        Fld("song_id", Str()),
        Fld("title", Str()),
        Fld("duration", Dbl()),
        Fld("year", Int())
    ])
    return song_schema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
df = spark.read.format("json").load("s3://udacity-dend/song_data/A/A/A/*.json", schema = get_song_schema())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
songs_table = df.select("song_id",
                        "title",
                        "artist_id",
                        "year",
                        "duration").dropDuplicates(["song_id"])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
songs_table.limit(17).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
|SOFRDWL12A58A7CEF7|        Hit Da Scene|AR9Q9YC1187FB5609B|   0|252.94322|
|SOFSOCN12A8C143F5D|      Face the Ashes|ARXR32B1187FB57099|2007|209.60608|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOBLFFE12AF72AA5BA|              Scream|ARJNIUY12298900C91|2009| 213.9424|
|SOERIDA12A6D4F8506|I Want You (Album...|ARBZIN01187FB362CC|2006|192.28689|
|SOABWAP12A8C13F82A|           Take Time|AR5LMPY1187FB573FE|1978|258.89914|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOIGICF12A8

In [6]:
output_data = "s3://udacity-dend-andreiliphd"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
songs_table.write.parquet(os.path.join(output_data, "songs_table.parquet"),
                          partitionBy = ["year", "artist_id"],
                          mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
artists_table = df.select("artist_id",
                          "artist_name",
                          "artist_location",
                          "artist_latitude",
                          "artist_longitude").dropDuplicates(["artist_id"])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
artists_table.write.parquet(os.path.join(output_data, "artists_table.parquet"),
                            mode = "overwrite")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
def get_log_schema():
    log_schema = R([
        Fld("artist", Str()),
        Fld("auth", Str()),
        Fld("firstName", Str()),
        Fld("gender", Str()),
        Fld("itemInSession", Str()),
        Fld("lastName", Str()),
        Fld("length", Dbl()),
        Fld("level", Str()),
        Fld("location", Str()),
        Fld("method", Str()),
        Fld("page", Str()),
        Fld("registration", Dbl()),
        Fld("sessionId", Str()),
        Fld("song", Str()),
        Fld("status", Str()),
        Fld("ts", Long()),
        Fld("userAgent", Str()),
        Fld("userId", Str())
    ])
    return log_schema


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
log_data = os.path.join("s3://udacity-dend", "log-data/*/*/*.json")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df = spark.read.json(log_data, schema = get_log_schema())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
df = df.filter(df.page == "NextSong")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
users_table = df.selectExpr("userId as user_id",
                            "firstName as first_name",
                            "lastName as last_name",
                            "gender",
                            "level").dropDuplicates(["user_id"]) 


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
users_table.write.parquet(os.path.join(output_data, "users_table.parquet"),
                              mode = "overwrite")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
get_timestamp = udf(lambda x: datetime.fromtimestamp((x / 1000)), Stamp())
df = df.withColumn("timestamp", get_timestamp(col("ts")))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
get_datetime = udf(lambda x: datetime.fromtimestamp((x / 1000)), Stamp())
df = df.withColumn("datetime", get_datetime(col("ts")))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
time_table = df.selectExpr("timestamp as start_time",
                           "hour(timestamp) as hour",
                           "dayofmonth(timestamp) as day",
                           "weekofyear(timestamp) as week",
                           "month(timestamp) as month",
                           "year(timestamp) as year",
                           "dayofweek(timestamp) as weekday"
                           ).dropDuplicates(["start_time"])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
time_table.write.parquet(os.path.join(output_data, "time_table.parquet"),
                         partitionBy = ["year", "month"],
                         mode = "overwrite")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
song_data = os.path.join("s3://udacity-dend", "song_data/*/*/*/*.json")
song_df = spark.read.json(song_data, schema = get_song_schema())


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
song_df.createOrReplaceTempView("song_data")
df.createOrReplaceTempView("log_data")


In [None]:
songplays_table = spark.sql("""
                            SELECT monotonically_increasing_id() as songplay_id,
                            ld.timestamp as start_time,
                            year(ld.timestamp) as year,
                            month(ld.timestamp) as month,
                            ld.userId as user_id,
                            ld.level as level,
                            sd.song_id as song_id,
                            sd.artist_id as artist_id,
                            ld.sessionId as session_id,
                            ld.location as location,
                            ld.userAgent as user_agent
                            FROM log_data ld
                            JOIN song_data sd
                            ON (ld.song = sd.title
                            AND ld.length = sd.duration
                            AND ld.artist = sd.artist_name)
                            """)


In [None]:
songplays_table.write.parquet(os.path.join(output_data, "songplays_table.parquet"),
                              partitionBy=["year", "month"],
                              mode="overwrite")
