In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id
from pyspark.sql.types import DateType

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [81]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data + 'song_data/*/*/*/*.json'
    
    # read song data file
    df = spark.read.json(song_data).dropDuplicates()

    # extract columns to create songs table
    songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
    
    # write songs table to parquet files partitioned by year and artist
    (
       songs_table
           .write.mode("overwrite")
           .partitionBy("year","artist_id")
           .parquet(output_data + "songs")
    )

    # extract columns to create artists table
    artists_table = (
        df
        .select(
            "artist_id",
            col("artist_name").alias("name"),
            col("artist_location").alias("location"),
            col("artist_latitude").alias("latitude"),
            col("artist_longitude").alias("longitude"))
        .dropDuplicates()
    )

    # write artists table to parquet files
    (
        artists_table
            .write.mode("overwrite")
            .parquet(output_data + "artists")
    )
    return ( songs_table.show(n=10), artists_table.show(n=10) )

In [75]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = input_data + "log-data/*/*/*.json"

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = (
        df
            .filter(col("page")=='NextSong')
            .filter(df.userId.isNotNull())
    )
    
    # extract columns for users table    
    users_table = (
        df
        .select(
            col("userId").alias("user_id"), 
            col("firstName").alias("first_name"), 
            col("lastName").alias("last_name"), 
            "gender", 
            "level")
        .dropDuplicates()
    )
    
    # write users table to parquet files
    (
        users_table
            .write.mode("overwrite")
            .parquet(output_data+"users")
    )
    
    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
    df = df.withColumn("timestamp", get_timestamp(col("ts")))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda ts: datetime.fromtimestamp(ts // 1000), DateType())
    df = df.withColumn("datetime", get_datetime(col("ts")))

    # extract columns to create time table
    time_table = df.select(
        col('datetime').alias('start_time'),
        hour('datetime').alias('hour'),
        dayofmonth('datetime').alias('day'),
        weekofyear('datetime').alias('week'),
        month('datetime').alias('month'),
        year('datetime').alias('year'),
        date_format('datetime', 'F').alias('weekday')
    )
    
    # write time table to parquet files partitioned by year and month
    (
        time_table
            .write.mode("overwrite")
            .partitionBy("year","month")
            .parquet(output_data+"time")
    )
    
    # read in song data to use for songplays table
    song_data = input_data + "song_data/*/*/*/*.json"
    song_df = spark.read.json(song_data)
    
    # extract columns from joined song and log datasets to create songplays table 
    df = df['datetime', 'userId', 'level', 'song', 'artist', 'sessionId', 'location', 'userAgent']

    songplays_table = (
        song_df
        .join(df, (song_df.artist_name==df.artist) & (song_df.title == df.song) )
        .select(
            monotonically_increasing_id().alias('songplay_id'),
            col('datetime').alias('start_time'),
            col("userId").alias("user_id"),
            "level",
            "song_id",
            "artist_id",
            col("sessionId").alias("session_id"),
            col("artist_location").alias("location"),
            "userAgent",
            # Month and year are included to partition the parquet by these variables
            month(col("datetime")).alias("month"),
            year(col("datetime")).alias("year")
        )
    )
    
    # write songplays table to parquet files partitioned by year and month
    (
        songplays_table
            .write.mode("overwrite")
            .partitionBy("year","month")
            .parquet(output_data+"songplays")
    )
    
    return( (users_table.show(n=10), time_table.show(n=10), songplays_table.show(n=10)))

In [76]:
spark = create_spark_session()

In [77]:
spark

In [78]:
input_data = "data/"
output_data = "data/"
#process_song_data(spark, input_data, output_data)   

In [82]:
process_song_data(spark, input_data, output_data)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SOLLHMX12AB01846DC|   The Emperor Falls|AR1Y2PT1187FB5B9CE|   0|484.62322|
|SONHOTT12A8C13493C|     Something Girls|AR7G5I41187FB4CE6C|1982|233.40363|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOUDSGM12AC9618304|Insatiable (Instr...|ARNTLGG11E2835DDB9|   0|266.39628|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SOMVWWT12A58A7AE05|Knocked Out Of Th...|ARQ9BO41187FB5CF1F|   0|183.17016|
|SOYTPEP12AB0180E7B|     Twist and Shout|ARAJPHH1187FB5566A|1964|164.80608|
+-----------

(None, None)

In [80]:
process_log_data(spark, input_data, output_data)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     80|     Tegan|   Levine|     F| paid|
|     34|    Evelin|    Ayala|     F| free|
|     97|      Kate|  Harrell|     F| paid|
|     73|     Jacob|    Klein|     M| paid|
|     32|      Lily|    Burns|     F| free|
|     58|     Emily|   Benson|     F| paid|
|     78|     Chloe|     Roth|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     33|   Bronson|   Harris|     M| free|
|     29|Jacqueline|    Lynch|     F| paid|
|     49|     Chloe|   Cuevas|     F| free|
|     51|      Maia|    Burke|     F| free|
|     61|    Samuel| Gonzalez|     M| free|
|     94|      Noah|   Chavez|     M| free|
|     12|    Austin|  Rosales|     M| free|
|     86|     Aiden|     Hess|     M| free|
|     14|  Theodore|   Harris|     M| free|
|     10|    Sylvie|     Cruz|     F| free|
|     37|    Jordan|    Hicks|  

(None, None, None)

In [62]:
log_data = input_data + "log_data/*.json"

df = spark.read.json(log_data)
df = (
    df
        .filter(col("page")=='NextSong')
        .filter(df.userId.isNotNull())
)

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
df = df.withColumn("timestamp", get_timestamp(col("ts")))

# create datetime column from original timestamp column
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts // 1000), DateType())
df = df.withColumn("datetime", get_datetime(col("ts")))
    
df.toPandas().head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542241826,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542242481,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542242741,2018-11-15
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,1542253449,2018-11-15
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,1542260935,2018-11-15


In [63]:
song_data = input_data + "song_data/*/*/*/*.json"
song_df = spark.read.json(song_data)

song_df.toPandas().head(10)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005
5,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
6,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
7,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
8,ARI2JSK1187FB496EF,51.50632,"London, England",-0.12714,Nick Ingman;Gavyn Wright,111.62077,1,SODUJBS12A8C132150,Wessex Loses a Bride,0
9,AR0RCMP1187FB3F427,30.08615,"Beaumont, TX",-94.10158,Billie Jo Spears,133.32853,1,SOGXHEG12AB018653E,It Makes No Difference Now,1992


In [67]:
songplays_table = (
    song_df
    .join(df, (song_df.artist_name==df.artist) & (song_df.title == df.song) )
    .select(
        monotonically_increasing_id().alias('songplay_id'),
        col('datetime').alias('start_time'),
        col("userId").alias("user_id"),
        "level",
        "song_id",
        "artist_id",
        col("sessionId").alias("session_id"),
        col("artist_location").alias("location"),
        "userAgent",
        # Month and year are included to partition the parquet by these variables
        month(col("datetime")).alias("month"),
        year(col("datetime")).alias("year")
    )
)
songplays_table.toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,userAgent,month,year
0,0,2018-11-21,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018
