In [4]:

import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import time
import glob
import pandas as pd
from schema import SONG_DATA_SCHEMA, LOG_DATA_SCHEMA

In [5]:
def create_spark_session():
    """
    Return a spark session by getting an existing or creating a new one
    :return:
    """
    spark = SparkSession \
        .builder \
        .appName("Spark SQL Quiz") \
        .getOrCreate()
    return spark

In [6]:
spark = create_spark_session()

In [4]:
spark

In [7]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [31]:
song_data = get_files('data/song_data')

In [32]:
df = spark.read.json(song_data, schema=SONG_DATA_SCHEMA)

In [33]:
df.head(1)

[Row(song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', duration=511.16363, year=0, num_songs=1, artist_id='ARDR4AC1187FB371A1', artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', artist_location='', artist_latitude=None, artist_longitude=None)]

In [34]:
songs_table = df.select('song_id', 'title', 'artist_id', 'artist_name', 'year', 'duration').distinct()

In [35]:
songs_table.head(1)

[Row(song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', artist_id='ARDR4AC1187FB371A1', artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', year=0, duration=511.16363)]

In [36]:
import os
currentDirectory = os.getcwd()
print(currentDirectory)
output_directory = currentDirectory + "/data/output_data"
#print(output_directory)
songs_output_file = "data/output_data/" + "songs_table.parquet"
print(songs_output_file)
songs_table.write.parquet(songs_output_file, \
                          mode="overwrite", \
                          partitionBy=["year", "artist_id"])
#songs_table.write.parquet("songs_table.parquet")

#songs_table.write.parquet("data2")

c:\Users\drggf\Desktop\Data_Engineering_Nanodegree\Data-Lakes-with-Spark
data/output_data/songs_table.parquet


In [16]:
# extract columns to create artists table
artists_table = df.select(
    "artist_id",
    col("artist_name").alias("name"),
    col("artist_location").alias("location"),
    col("artist_latitude").alias("latitude"),
    col("artist_longitude").alias("longitude")
    ).drop_duplicates()

In [61]:
artists_table.head(1)

[Row(artist_id='ARPBNLO1187FB3D52F', name='Tiny Tim', location='New York, NY', latitude=40.71455, longitude=-74.00712)]

In [62]:
artists_table.write.parquet("data/output_data/" + "artists_table.parquet", mode="overwrite")

In [3]:
from schema import SONG_DATA_SCHEMA, LOG_DATA_SCHEMA

In [37]:
log_data = get_files('data/log_data')

In [38]:
df = spark.read.json(log_data, schema=LOG_DATA_SCHEMA)

In [39]:
df = df.filter(col("page") == "NextSong")

In [40]:
df.head(1)

[Row(auth='Logged In', userId=None, registration=1541016707796.0, level='free', firstName='Ryan', lastName='Smith', gender='M', location='San Jose-Sunnyvale-Santa Clara, CA', sessionId=583, ts=1542241826796, page='NextSong', method='PUT', status=200, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', itemInSession=0, artist='Harmonia', song='Sehr kosmisch', length=655.77751)]

In [41]:
get_datetime = udf(
        lambda x: datetime.utcfromtimestamp(x / 1000),
        TimestampType()
    )

In [42]:
get_timestamp = udf(lambda ts: datetime.utcfromtimestamp(ts / 1000.0).
            strftime("%Y-%m-%d %H:%M:%S"), 
            StringType()
    )

In [49]:
# df = df.withColumn('timestamp', get_timestamp("ts"))

In [43]:
df = df.withColumn("start_time", get_datetime("ts"))

In [44]:
df.head(1)

[Row(auth='Logged In', userId=None, registration=1541016707796.0, level='free', firstName='Ryan', lastName='Smith', gender='M', location='San Jose-Sunnyvale-Santa Clara, CA', sessionId=583, ts=1542241826796, page='NextSong', method='PUT', status=200, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', itemInSession=0, artist='Harmonia', song='Sehr kosmisch', length=655.77751, start_time=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000))]

In [66]:
time_table = (
        df
        .withColumn("hour", hour("start_time"))
        .withColumn("day", dayofmonth("start_time"))
        .withColumn("week", weekofyear("start_time"))
        .withColumn("month", month("start_time"))
        .withColumn("year", year("start_time"))
        .withColumn("weekday", dayofweek("start_time"))
        .select(
            "start_time",
            "hour",
            "day",
            "week",
            "month",
            "year",
            "weekday"
        )
        .drop_duplicates(["year", "month", "day", "hour"])
    )

In [67]:
time_table.head(1)

[Row(start_time=datetime.datetime(2018, 11, 21, 4, 0, 49, 796000), hour=4, day=21, week=47, month=11, year=2018, weekday=4)]

In [57]:
time_output_file = "data/output_data/" + "time.parquet"

In [59]:
time_table.write.parquet(
        os.path.join(time_output_file, "time_table.parquet"),
        mode="overwrite",
        partitionBy=["year", "month"]
    )

In [16]:
output_data = "data/output_data"

artists_df = spark.read.parquet(
        os.path.join(output_data, "artists_table.parquet")
    )
    
songs_df = spark.read.parquet(
        os.path.join(output_data, "songs_table.parquet")
    )

In [56]:
songs = spark.read.parquet(
        os.path.join(output_data, "songs_table.parquet")
    )

In [57]:
songs.head(1)

[Row(song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, year=0, artist_id='ARDR4AC1187FB371A1')]

In [58]:
df.head(1)

[Row(auth='Logged In', userId=None, registration=1541016707796.0, level='free', firstName='Ryan', lastName='Smith', gender='M', location='San Jose-Sunnyvale-Santa Clara, CA', sessionId=583, ts=1542241826796, page='NextSong', method='PUT', status=200, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', itemInSession=0, artist='Harmonia', song='Sehr kosmisch', length=655.77751, start_time=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000))]

In [59]:
songplays_table = (
        df
        .join(songs, [
            df.song == songs.title,
            df.artist == songs.artist_name,
            df.length == songs.duration
        ], "left")
    )

In [62]:
songplays_table = (
        songplays_table
        .select(
            monotonically_increasing_id().alias("songplay_id"),
            "start_time",
            col("userID").alias("user_id"),
            "level",
            "song_id",
            "artist_id",
            col("sessionId").alias("session_id"),
            "location",
            col("userAgent").alias("user_agent"),
            month("start_time").alias("month"),
            year("start_time").alias("year")
        )
    )

In [63]:
songplays_table.head(1)

[Row(songplay_id=0, start_time=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), user_id=None, level='free', song_id=None, artist_id=None, session_id=583, location='San Jose-Sunnyvale-Santa Clara, CA', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', month=11, year=2018)]