In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col , column
import os
import time
import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1563990563973_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

VBox()

# Song staging table

In [32]:
#works with col
songPath = "s3://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json"
#songPath = "s3://udacity-dend/song_data/*/*/*/*.json"
song_stage_df = spark.read.json(songPath) \
            .withColumn("artist_id", col("artist_id").cast("int")) \
            .withColumn("year", col("year").cast("int")) \
            .withColumn("artist_latitude", col("artist_latitude").cast("float")) \
            .withColumn("artist_longitude", col("artist_latitude").cast("float"))
            

VBox()

In [33]:
song_stage_df.printSchema()

VBox()

root
 |-- artist_id: integer (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

In [41]:
song_stage_df.show(1)

VBox()

+---------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+
|artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|              title|year|
+---------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+
|     null|           null|               |            null|Line Renaud|152.92036|        1|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff|   0|
+---------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+

## Artists: artist_id, name, location, lattitude, longitude 

In [39]:
artists_df = song_stage_df.selectExpr("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude") \
                .distinct()

VBox()

In [40]:
artists_df.show(1)

VBox()

+---------+-----------+---------------+---------------+----------------+
|artist_id|artist_name|artist_location|artist_latitude|artist_longitude|
+---------+-----------+---------------+---------------+----------------+
|     null|Line Renaud|               |           null|            null|
+---------+-----------+---------------+---------------+----------------+

# Log staging Table

In [3]:
logPath = "s3://udacity-dend/log_data/2018/11/2018-11-12-events.json"
#logPath = "s3://udacity-dend/log_data/*/*/*.json"
log_df = spark.read.json(logPath) \
            .withColumn("ts_str", col("ts").cast("string")) \
            
       
    

VBox()

In [38]:
log_df.printSchema()

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_str: string (nullable = true)

## Users : user_id, first_name, last_name, gender, level


In [42]:
users_df = log_df.selectExpr("cast(userId as int) as userId", "firstName", "lastName", "gender", "level")  \
                .distinct()

VBox()

In [43]:
users_df.show(1)

VBox()

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    47|   Kimber|  Norris|     F| free|
+------+---------+--------+------+-----+
only showing top 1 row

## Time:  start_time, hour, day, week, month, year, weekday

In [25]:
from pyspark.sql.functions import *
time_df = log_df \
        .withColumn('dateColumn', F.to_timestamp((log_df['ts_str']/1000).cast('timestamp'), "yyyy-MM-dd hh:mm:ss")) \
        .select('dateColumn', date_format('dateColumn', 'h:m:s a').alias('dt_starttime'), \
        hour('dateColumn').alias('dt_hour'), \
        dayofyear('dateColumn').alias('dt_day'), \
        dayofweek('dateColumn').alias('dt_dayofweek'), \
        month('dateColumn').alias('dt_month'), year('dateColumn').alias('dt_year'))   
         
       

VBox()

In [26]:
time_df.show(1)

VBox()

+-------------------+------------+-------+------+------------+--------+-------+
|         dateColumn|dt_starttime|dt_hour|dt_day|dt_dayofweek|dt_month|dt_year|
+-------------------+------------+-------+------+------------+--------+-------+
|2018-11-12 02:36:57|  2:36:57 AM|      2|   316|           2|      11|   2018|
+-------------------+------------+-------+------+------------+--------+-------+
only showing top 1 row

In [27]:
time_df.printSchema()

VBox()

root
 |-- dateColumn: timestamp (nullable = true)
 |-- dt_starttime: string (nullable = true)
 |-- dt_hour: integer (nullable = true)
 |-- dt_day: integer (nullable = true)
 |-- dt_dayofweek: integer (nullable = true)
 |-- dt_month: integer (nullable = true)
 |-- dt_year: integer (nullable = true)

In [30]:
# UDF

from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))

time_udf_df = log_df.withColumn("date_time", get_timestamp(log_df.ts))

VBox()

In [31]:
time_udf_df .printSchema()

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_str: string (nullable = true)
 |-- date_time: string (nullable = true)

In [22]:
# 3rd option

time3_df = log_df.withColumn('dateColumn', F.to_timestamp((log_df['ts']/1000).cast('timestamp'), "yyyy-MM-dd hh:mm:ss"))

VBox()

In [23]:
time3_df.printSchema()

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_str: string (nullable = true)
 |-- dateColumn: timestamp (nullable = true)

## Song Table: song_id, title, artist_id, year, duration

In [34]:
songs_df = song_stage_df.selectExpr("song_id", "title", "artist_id", "cast(year as int) as year", "cast(duration as int) as duration") \
                .distinct()

VBox()

In [36]:
songs_df.count()

VBox()

1

## Songplays: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent


In [45]:
df_songplays = song_stage_df.join(log_df, (log_df.artist == song_stage_df.artist_name) & (log_df.song == song_stage_df.title) )  \
            .select(log_df["ts"], log_df["userId"], log_df["level"], song_stage_df["song_id"], song_stage_df["artist_id"], log_df["sessionid"] \
                , log_df["location"] , log_df["userAgent"] )


VBox()

In [46]:
df_songplays.count()

VBox()

0

In [None]:
df_songplays.printSchema()

In [None]:
df_songplays.show()