In [1]:
import sys
import pandas as pd
from pyspark.sql.functions import round as spark_round

sys.path.append('..')
from etl import create_spark_session
from schemas import song_data_schema, log_data_schema, user_table_schema, song_table_schema, artist_table_schema, songplay_table_schema, time_table_schema
from basic_pipeline import create_basic_pipeline

In [2]:
spark = create_spark_session(local=True)
spark.sparkContext.setLogLevel("ERROR")
spark

In [3]:
df_log_data = spark.read.format('json').schema(log_data_schema).option("recursiveFileLookup",True).load('s3a://udacity-dend/log_data')
df_song_data = spark.read.format('json').schema(song_data_schema).option("recursiveFileLookup",True).load('s3a://udacity-dend/song_data')

df_joined = df_log_data.join(
    df_song_data, 
    on=(df_log_data.song == df_song_data.title) & (spark_round(df_log_data.length, 4) == spark_round(df_song_data.duration, 4)),
    how='LEFT'
)

In [4]:
# dict - get column name from expression
rename_transformations = {
    'start_time': 'ts',
    'user_id': 'userId',
    'first_name': 'firstName',
    'last_name': 'lastName',
    'name': 'artist_name',
    'location': 'artist_location',
    'latitude': 'artist_latitude',
    'longitude': 'artist_longitude',
    'session_id': 'sessionId',
    'user_agent': 'userAgent' 
}

cast_transformations = {
    'start_time': 'to_timestamp(start_time / 1000) as start_time', 
    'user_id': 'INT(user_id) as user_id',
}

time_transformations = {
  "hour": "hour(start_time) as hour",
  "day": "dayofmonth(start_time) as day",
  "week": "weekofyear(start_time) as week",
  "month": "month(start_time) as month",
  "year": "year(start_time) as year",
  "weekday": "dayofweek(start_time) as weekday",
}

basic_pipeline = create_basic_pipeline(rename_transformations=rename_transformations, cast_transformations=cast_transformations)
time_basic_pipeline = create_basic_pipeline(cast_transformations=time_transformations)

In [21]:
df_users = basic_pipeline((df_log_data, user_table_schema)).distinct()
df_songs = basic_pipeline((df_song_data, song_table_schema))
df_artists = basic_pipeline((df_song_data, artist_table_schema)).distinct()
df_songplays = basic_pipeline((df_joined, songplay_table_schema))
df_times = time_basic_pipeline((df_songplays, time_table_schema)).distinct()

In [19]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df_users.write \
    .partitionBy('user_id') \
    .option('schema', user_table_schema) \
    .format('parquet') \
    .mode('overwrite') \
    .save('s3a://dutrajardim/udacity-dl-project/users.parquet')



In [29]:
df_songs.write \
    .partitionBy(['year', 'artist_id']) \
    .option('schema', song_table_schema) \
    .format('parquet') \
    .mode('overwrite') \
    .save('s3a://dutrajardim/udacity-dl-project/songs.parquet')
    



In [24]:
df_artists.write \
    .partitionBy(['artist_id']) \
    .option('schema', artist_table_schema) \
    .format('parquet') \
    .mode('overwrite') \
    .save('s3a://dutrajardim/udacity-dl-project/artists.parquet')



In [None]:
from pyspark.sql.functions import year, month
df_songplays \
    .withColumn('year', year('start_time')) \
    .withColumn('month', month('start_time')) \
    .write \
    .partitionBy(['year', 'month'])
    .option('schema', songplay_table_schema) \
    .mode('overwrite') \
    .save('s3a://dutrajardim/udacity-dl-project/songplays.parquet')

In [7]:
spark.stop()