In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import from_unixtime, monotonically_increasing_id



In [2]:
config = configparser.ConfigParser()
config.read('cred.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS CREDS']['AWS_SECRET_ACCESS_KEY']



In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.5")\
                     .getOrCreate()


In [4]:
#Create S3 bucket and enter here
output_data = "s3a://'bucketname'/"

In [5]:
#song_data = "s3a://udacity-dend/song_data/*/*/*/*.json"
song_data= 's3a://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json'

In [6]:
df_songs = spark.read.json(song_data)

In [7]:
df_songs.printSchema()
df_songs.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude| artist_name|duration|num_songs|           song_id| title|year|
+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+----+
|ARJNIUY12298900C91|           null|               |            null|Adelitas Way|213.9424|        1|SOBLFFE12AF72AA5BA|Scream|2009|
+------------------+---------------+-----------

In [8]:
# extract columns to create songs table
# song_id, title, artist_id, year, duration
songs_table = df_songs.select("song_id", "title", "artist_id", "year", "duration")
songs_table.show()

+------------------+------+------------------+----+--------+
|           song_id| title|         artist_id|year|duration|
+------------------+------+------------------+----+--------+
|SOBLFFE12AF72AA5BA|Scream|ARJNIUY12298900C91|2009|213.9424|
+------------------+------+------------------+----+--------+



In [9]:
# write songs table to parquet files partitioned by year and artist
#songs_table.write.partitionBy("year","artist_id") .parquet("song_data.parquet")
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data + "songs")

In [10]:
# extract columns to create artists table
artists_table = df_songs.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
artists_table.show()

+------------------+------------+---------------+---------------+----------------+
|         artist_id| artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------+---------------+---------------+----------------+
|ARJNIUY12298900C91|Adelitas Way|               |           null|            null|
+------------------+------------+---------------+---------------+----------------+



In [11]:
# write artists table to parquet files
artists_table.write.mode('overwrite').parquet(output_data + "artists")

In [12]:
#log_data = "s3a://udacity-dend/log_data/*/*/*/*.json"
log_data= 's3a://udacity-dend/log_data/2018/11/2018-11-12-events.json'

In [13]:
df_log = spark.read.json(log_data)

In [14]:
df_log.printSchema()
df_log.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth| firstName|gender|itemInSession|lastName|   le

In [15]:
# filter by actions for song plays
df_log = df_log.filter(df_log['page'] == 'NextSong')
df_log.show(5)

+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth| firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Pavement|Logged In|    Sylvie|     F|            0|    Cruz| 99.16036| free|Washington-Arling...|   PUT|NextSong|1.540266185796E12|      345|Mercy:The Laundromat|   200|1541990258796|"Mozilla/5.0 (Mac...|    10|
|Barry Tuckwell/Ac...|Logged In|   Celeste|     F|            1|Williams|277.15873| free|   Klamath 

In [16]:
# extract columns for users table ex. user_id, first_name, last_name, gender, level 
users_table = df_log.select("userId", "firstName", "lastName", "gender", "level")
users_table.show(5)

+------+----------+--------+------+-----+
|userId| firstName|lastName|gender|level|
+------+----------+--------+------+-----+
|    10|    Sylvie|    Cruz|     F| free|
|    53|   Celeste|Williams|     F| free|
|    53|   Celeste|Williams|     F| free|
|    53|   Celeste|Williams|     F| free|
|    29|Jacqueline|   Lynch|     F| paid|
+------+----------+--------+------+-----+
only showing top 5 rows



In [17]:
# write users table to parquet files
users_table.write.mode('overwrite').parquet(output_data + "users")

In [18]:
# create timestamp column from original timestamp column
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))


In [19]:
df_log.printSchema()
df_log.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|            

In [20]:
df_log.select('timestamp').show(truncate=False)



+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-12 02:37:38.796|
|2018-11-12 02:37:44.796|
|2018-11-12 02:42:21.796|
|2018-11-12 02:45:52.796|
|2018-11-12 02:47:22.796|
|2018-11-12 02:50:21.796|
|2018-11-12 02:54:26.796|
|2018-11-12 02:57:12.796|
|2018-11-12 03:00:48.796|
|2018-11-12 03:03:24.796|
|2018-11-12 03:36:28.796|
|2018-11-12 06:13:27.796|
|2018-11-12 08:50:44.796|
|2018-11-12 08:54:58.796|
|2018-11-12 08:58:41.796|
|2018-11-12 09:02:12.796|
|2018-11-12 09:03:03.796|
|2018-11-12 09:07:43.796|
|2018-11-12 10:18:43.796|
|2018-11-12 10:21:13.796|
+-----------------------+
only showing top 20 rows



In [21]:
# create datetime column from original timestamp column
df_log = df_log.withColumn('date_time', from_unixtime(df_log.ts/1000).cast(dataType=T.TimestampType()))




In [22]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date_time: timestamp (nullable = true)



In [23]:
# extract columns to create time table 
#start_time, hour, day, week, month, year, weekday
time_table = df_log.select(col("date_time").alias("start_time"),
                           year(col('date_time')).alias('year'),
                           month(col('date_time')).alias('month'),
                           dayofmonth(col('date_time')).alias('day'),
                           hour(col('date_time')).alias('hour'),
                           weekofyear(col('date_time')).alias('week')
                          )

In [24]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)



In [25]:
time_table.show(5)

+-------------------+----+-----+---+----+----+
|         start_time|year|month|day|hour|week|
+-------------------+----+-----+---+----+----+
|2018-11-12 02:37:38|2018|   11| 12|   2|  46|
|2018-11-12 02:37:44|2018|   11| 12|   2|  46|
|2018-11-12 02:42:21|2018|   11| 12|   2|  46|
|2018-11-12 02:45:52|2018|   11| 12|   2|  46|
|2018-11-12 02:47:22|2018|   11| 12|   2|  46|
+-------------------+----+-----+---+----+----+
only showing top 5 rows



In [26]:
time_table.write.partitionBy("year","month").parquet(output_data+"time")

AnalysisException: 'path s3a://adnans3bucket/time already exists.;'

In [None]:
#This works
"""
# create datetime column from original timestamp column
time_format = 'yy-MM-dd HH:mm:ss z'
seconds = df_log.ts /1000
x = date_format(seconds.cast(dataType = T.TimestampType()), time_format)
time_table = df_log.withColumn('ts',to_timestamp(x,time_format))
"""

In [None]:
"""
time_table.select('ts').show(truncate=False)
"""

In [None]:
"""
# extract columns to create time table 
#start_time, hour, day, week, month, year, weekday
time_table = time_table.select(col("ts").alias("start_time"),
                           year(col('ts')).alias('year'),
                           month(col('ts')).alias('month'),
                           dayofmonth(col('ts')).alias('day'),
                           hour(col('ts')).alias('hour'),
                           weekofyear(col('ts')).alias('week')
                          )

"""

In [None]:
# read in song data to use for songplays table
#song_id, title, artist_id, year, duration
song_df = spark.read.json(song_data)

In [27]:
# extract columns from joined song and log datasets to create songplays table 
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#records with page NextSong

df_log.createOrReplaceTempView("log_df_table")
df_songs.createOrReplaceTempView("song_df_table")
time_table.createOrReplaceTempView("time_table_table")
songplays_table = spark.sql("""
                SELECT DISTINCT log_df_table.userId, log_df_table.level, log_df_table.location, log_df_table.userAgent, log_df_table.sessionId, log_df_table.date_time, song_df_table.artist_id, song_df_table.song_id, time_table_table.year, time_table_table.month 
                FROM log_df_table
                JOIN song_df_table
                ON song_df_table.artist_name = log_df_table.artist
                JOIN time_table_table
                ON time_table_table.start_time = log_df_table.date_time
                """
                )
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())


In [28]:
#write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year","month").parquet(output_data+"songplays")

AnalysisException: 'path s3a://adnans3bucket/songplays already exists.;'

In [29]:
songplays_table.printSchema()

root
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- songplay_id: long (nullable = false)



In [31]:
songplays_table.show()

+------+-----+--------+---------+---------+---------+---------+-------+----+-----+-----------+
|userId|level|location|userAgent|sessionId|date_time|artist_id|song_id|year|month|songplay_id|
+------+-----+--------+---------+---------+---------+---------+-------+----+-----+-----------+
+------+-----+--------+---------+---------+---------+---------+-------+----+-----+-----------+

