In [3]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime, dayofweek
from datetime import datetime as dt

# Connect to AWS
config = configparser.ConfigParser()
config.read('dl_prod.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get("CREDENTIALS", 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get("CREDENTIALS", 'AWS_SECRET_ACCESS_KEY')

def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [7]:
def read_song_data(input_data):
    # get filepath to song data file
    song_data = f'{input_data}/song_data/*/*/*/*.json'

    # read song data file
    return spark.read.json(song_data)

In [4]:
spark = create_spark_session()
input_data = "s3a://udacity-dend/"
input_data = 's3://udacity-dend/'
input_data = 'data'
output_data="D://OneDrive - Watson + Holmes/WerkbestandenYannick/Werkmap_Python/Projecten/Studie/Udacity/projects_data_engineer/project4_Spark_datalakes/data"
output_data = "./Results/"
# process_song_data(spark, input_data, output_data)

### Song & Artist Table from Song_data

In [5]:
# get filepath to song data file
song_data = f'{input_data}/song_data/*/*/*/*.json'

# read song data file
df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).drop_duplicates()

# write songs table to parquet files partitioned by year and artist
# df_song.write.parquet(output_data + 'songs.parquet', partitionBy=('year', 'artist_id'), mode='overwrite')

# extract columns to create artists table
artists_table = df.selectExpr(["artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude"])

# write artists table to parquet files
# artists_table.write.parquet(output_data + 'artists.parquet', partitionBy=("artist_id"), mode='overwrite')

### Create user, time and songplays table from logdata

In [6]:
# # get filepath to log data file
log_data = f'{input_data}/log_data/*.json'

# # read log data file
df = spark.read.json(log_data)

# # filter by actions for song plays
df = df.where(df.page=="NextSong")
df.limit(5).show()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [8]:
# # extract columns for users table    
user_table = df.selectExpr(["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"])

# # write users table to parquet files
# user_table

# # create timestamp column from original timestamp column
get_timestamp = udf(lambda x: x)
df = df.withColumn("start_time", get_timestamp(df.ts))
# df = 

# create datetime column from original timestamp column
df = df.withColumn('start_time', from_unixtime(col('ts')/1000))
time_table = df.select('start_time') \
    .withColumn("hour", hour("start_time")) \
    .withColumn("day", dayofmonth("start_time")) \
    .withColumn("month", month("start_time")) \
    .withColumn("week", weekofyear("start_time")) \
    .withColumn("year", year(df.start_time)) \
    .withColumn("weekday", dayofweek("start_time"))


# # write time table to parquet files partitioned by year and month
# time_table

# # read in song data to use for songplays table
song_df = read_song_data(input_data)


song_df.createOrReplaceTempView("song_table")
df.createOrReplaceTempView("log_table")

# # extract columns from joined song and log datasets to create songplays table 
# songplays_table = 

# # write songplays table to parquet files partitioned by year and month
# songplays_table

In [14]:
song_df.createOrReplaceTempView("song_table")
df.createOrReplaceTempView("log_table")

In [13]:
song_df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [15]:
df.limit(1)

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, start_time: string]

In [18]:
songplays = spark.sql("""SELECT DISTINCT
    log.ts AS start_time,
    log.userId AS user_id,
    log.level,
    song.song_id,
    song.artist_id,
    log.sessionId AS session_id,
    location,
    userAgent AS user_agent
    FROM song_table as song
    LEFT JOIN log_table as log 
        ON log.artist = song.artist_name AND 
        log.song = song.title AND 
        song.duration = log.length
        """)
        

In [9]:
song_df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [1]:
from etl import main
main()