In [1]:
import configparser
import logging
from datetime import datetime
import os
import boto3
from botocore.exceptions import ClientError

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, IntegerType
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

## Start a Spark context

In [45]:
spark = SparkSession.builder\
        .appName("fullDataset")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
        .config("spark.dynamicAllocation.enabled", "true")\
        .config("spark.hadoop.parquet.enable.summary-metadata", "false")\
        .config("spark.sql.parquet.mergeSchema", "false")\
        .config("spark.sql.parquet.filterPushdown", "true")\
        .config("spark.sql.hive.metastorePartitionPruning", "true")\
        .config("spark.sql.autoBroadcastJoinThreshold", "-1")\
        .getOrCreate()
    
spark.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

In [4]:
sc = spark.sparkContext
sc

### Create S3 bucket to write partitioned parquet files
- Run if an ouput S3 bucket hasn't been created yet

In [5]:
bucket_name = 'sparkify-analytics-cj'

s3 = boto3.client('s3')
s3.create_bucket(Bucket=bucket_name)

{'ResponseMetadata': {'RequestId': 'DSSPVN0MY2WG6M8K',
  'HostId': 'fNjE1N1YPovmWfKw9ULuq8DS0byb8jj4RIt4YAX1ktdrSUvHttkUjQifgBli842WW6K882xye0Q=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'fNjE1N1YPovmWfKw9ULuq8DS0byb8jj4RIt4YAX1ktdrSUvHttkUjQifgBli842WW6K882xye0Q=',
   'x-amz-request-id': 'DSSPVN0MY2WG6M8K',
   'date': 'Mon, 02 Aug 2021 12:36:16 GMT',
   'location': '/sparkify-analytics-cj',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': '/sparkify-analytics-cj'}

In [7]:
schema = StructType(
    [StructField("artist_id", StringType(), True),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("num_songs", LongType(), True),
    StructField("song_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("year", LongType(), True)]
)

### Load song data logs from S3 bucket

In [8]:
song_data = "s3a://udacity-dend/song_data/A/A/*/*.json"
df = spark.read.json(song_data, schema=schema)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
df.rdd.getNumPartitions()

19

### Create songs_table and write partitioned parquet file in songs directory to S3 bucket

In [62]:
songs_columns = ['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table = df.select([col for col in songs_columns])
songs_table.printSchema()
songs_table.createOrReplaceTempView("songs_table")

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [12]:
songs_analytics_folder = "songs_tables"

songs_table.write.mode("overwrite").option("compression", "gzip").partitionBy("year", "artist_id").parquet("s3a://{}/songs_analytics_tables.parquet".format(bucket_name))

### Create artists table and write partitioned parquet files in artists directory to S3 bucket

In [13]:
artists_columns = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
artists_table = df.select([col for col in artists_columns])
artists_table.show(5)
artists_table.createOrReplaceTempView("artists_table")

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARSUVLW12454A4C8B8|Royal Philharmoni...|           Tennessee|       35.83073|       -85.97874|
|ARXQC081187FB4AD42|William Shatner_ ...|                  UK|       54.31407|        -2.23001|
|ARWUNH81187FB4A3E0|         Trick Daddy|     Miami , Florida|           null|            null|
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [14]:
artist_analytics_folder = "artists_tables"

artists_table.write.mode("overwrite").parquet("s3a://{}/artists_analytics_tables.parquet".format(bucket_name))

### Load log data into Spark dataframe

In [15]:
schema = StructType(
    [StructField("artist", StringType(), True),
     StructField("auth", StringType(), True),
     StructField("firstName", StringType(), True),
     StructField("gender", StringType(), True),
     StructField("itemInSession", LongType(), True),
     StructField("lastName", StringType(), True),
     StructField("length", DoubleType(), True),
     StructField("level", StringType(), True),
     StructField("location", StringType(), True),
     StructField("method", StringType(), True),
     StructField("page", StringType(), True),
     StructField("registration", DoubleType(), True),
     StructField("sessionId", LongType(), True),
     StructField("song", StringType(), True),
     StructField("status", IntegerType(), True),
     StructField("ts", LongType(), True),
     StructField("userAgent", StringType(), True),
     StructField("userId", StringType(), True)]
)

In [16]:
log_data = "s3a://udacity-dend/log_data/*/*/*.json"
df_logs = spark.read.json(log_data, schema)
df_logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [17]:
df_logs_filtered = df_logs.filter(df_logs.page == 'NextSong')
df_logs_filtered.show(5)
df_logs_filtered.printSchema()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

### Create users table and write partitioned parquet files to S3 bucket

In [18]:
df_logs_filtered.createOrReplaceTempView("users_table")
users_table_columns = ['userId', 'firstName', 'lastName', 'gender', 'level']
users_table = df_logs_filtered.select([col for col in users_table_columns])

users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    61|   Samuel|Gonzalez|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [19]:
users_analytics_folder = "users_tables"

users_table.write.mode("overwrite").parquet("s3a://{}/users_analytics_tables.parquet".format(bucket_name))

### Convert ts to timestamp type

In [20]:
import pyspark.sql.functions as F

df_ts = df_logs_filtered.withColumn("timestamp", F.to_utc_timestamp(F.from_unixtime(F.col('ts')/1000, 'yyyy-MM-dd HH:mm:ss'), 'EST'))
df_ts.printSchema()
df_ts.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     aut

### Convert ts to datetime

In [21]:
df_ts_dt = df_ts.withColumn("datetime", F.to_date(F.from_unixtime(F.col('ts')/1000)))
df_ts_dt.show(4)
df_ts_dt.printSchema()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+----------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          timestamp|  datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+----------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 05:30:26|2018-11-15|
|The Prodigy|Logged In|     Ryan|   

### Create time_table and write partitioned parquet files to S3 bucket

In [22]:
df_ts_dt.createOrReplaceTempView("time_table")

time_table = spark.sql("""
    SELECT DISTINCT datetime as start_time,
        hour(timestamp) as hour,
        day(timestamp) as day,
        weekofyear(timestamp) as week,
        month(timestamp) as month,
        year(timestamp) as year,
        dayofweek(timestamp) as weekday
    FROM time_table
    ORDER BY start_time
""")

time_table.printSchema()
time_table.show(5)

time_table.createOrReplaceTempView("time_tables")

root
 |-- start_time: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|2018-11-01|   3|  2|  44|   11|2018|      6|
|2018-11-01|   2|  2|  44|   11|2018|      6|
|2018-11-02|   0|  3|  44|   11|2018|      7|
|2018-11-02|  17|  2|  44|   11|2018|      6|
|2018-11-02|  14|  2|  44|   11|2018|      6|
+----------+----+---+----+-----+----+-------+
only showing top 5 rows



In [23]:
time_table_analytics_folder = "time_tables"

time_table.write.mode("overwrite").partitionBy("year", "month").parquet("s3a://{}/time__analytics_tables.parquet".format(bucket_name))

In [24]:
# song_data is already created above so use here to join with log data for songplays table

df.printSchema()
df_ts_dt.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable

## Join artists and songs tables in order to create songplays dataframe

In [25]:
join_cond = [df_ts_dt.artist == df.artist_name, df_ts_dt.song == df.title]

df_songplays = df_ts_dt.join(df, join_cond)
join_sp_t = [df_songplays.datetime == time_table.start_time]
df_songplays_t = df_songplays.join(time_table, join_sp_t)

df_songplays_t.printSchema()
df_songplays_t.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: date (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable =

In [26]:
songplays_analytics_folder = "songplays_tables"
df_songplays_t = df_songplays_t.withColumn("songplay_id", monotonically_increasing_id())

df_songplays_t.createOrReplaceTempView("songplays_table")

songplays_table = spark.sql("""
    SELECT songplay_id as songplay_id,
        t.start_time as start_time,
        sp.userId as user_id,
        sp.level as level,
        s.song_id as song_id,
        s.artist_id as artist_id,
        sp.sessionId as sessionId,
        sp.location as location,
        sp.userAgent as user_agent,
        t.year,
        t.month
    FROM songplays_table sp
    JOIN songs_table s ON sp.song = s.title AND sp.length = s.duration
    JOIN artists_table a ON sp.artist = a.artist_name AND a.artist_id = s.artist_id
    JOIN time_tables t ON sp.datetime = t.start_time
""")

In [27]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: date (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [28]:
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet("s3a://{}/songplays_analytics_tables.parquet".format(bucket_name))

In [69]:
query = """
SELECT DISTINCT a.artist_name AS artist_name, SUM(s.duration) total_song_duration
FROM songplays_table sp
JOIN songs_table s ON sp.song_id = s.song_id
JOIN artists_table a ON sp.artist_id = a.artist_id
GROUP BY a.artist_name
ORDER BY total_song_duration DESC
"""

sql_query = """
SELECT DISTINCT a.artist_name AS artist_name, s.title AS song_title, s.duration as song_length
FROM songplays_table sp
JOIN songs_table s ON sp.song_id = s.song_id
JOIN artists_table a ON sp.artist_id = a.artist_id
WHERE a.artist_name='The Verve'
"""

In [70]:
test_query_df = spark.sql(sql_query).toPandas()
test_query_df.head()

Unnamed: 0,artist_name,song_title,song_length
0,The Verve,Bitter Sweet Symphony,360.25424


## Close Spark and delete S3 bucket
- only run when done with all processing

In [29]:
spark.stop()

In [30]:
s3_bucket = boto3.resource('s3').Bucket(bucket_name)
bucket_versioning = boto3.resource('s3').BucketVersioning(bucket_name)
if bucket_versioning.status == 'Enabled':
    s3_bucket.object_versions.delete()
else:
    s3_bucket.objects.all().delete()
s3.delete_bucket(Bucket=bucket_name)

{'ResponseMetadata': {'RequestId': '3C0DPBN0YDME8GSR',
  'HostId': '15GdSMAUR9wRip27dbtxdIjmshFp+l5fVyGuUpIoLcIY2uoiQGrt/izC3vRVCLNz4Icy8C6i7fo=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': '15GdSMAUR9wRip27dbtxdIjmshFp+l5fVyGuUpIoLcIY2uoiQGrt/izC3vRVCLNz4Icy8C6i7fo=',
   'x-amz-request-id': '3C0DPBN0YDME8GSR',
   'date': 'Sat, 31 Jul 2021 18:10:21 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}