In [1]:
import configparser
from datetime import datetime
from pyspark.sql.functions import udf, col,monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,dayofweek
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import  TimestampType

In [2]:
output_data = "s3a://data-lake-pyspark95/"
input_data = "s3a://udacity-dend/"
log_data =  input_data + "log_data/*.json"
song_data=input_data + "song_data/A/A/A/*.json"

In [3]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [5]:
df = spark.read.json(song_data)

In [6]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
df.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|     The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
|ARSVTNL1187B992A91|       51.50632|     London, England|        -0.12714|       Jonathan King|129.85424|        1|SOEKAZG12AB018837E|I'll Slap Your Fa...|2001|
|AR73AIO1187B9AD57B|       37.7791

In [9]:
songs_table =df.select('song_id', 'title', 'artist_id','year', 'duration').dropDuplicates()

In [62]:
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data+"songs_table/.parquet", 'overwrite')
                     

In [10]:
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SODZYPO12A8C13A91E,Burn My Body (Album Version),AR1C2IX1187B99BF74,0,177.99791
1,SOIGHOD12A8C13B5A1,Indian Angel,ARY589G1187B9A9F4E,2004,171.57179
2,SOOVHYF12A8C134892,I'll Be Waiting,ARCLYBR1187FB53913,1989,304.56118
3,SOAPERH12A58A787DC,The One And Only (Edited),ARZ5H0P1187B98A1DD,0,230.42567
4,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016


In [11]:
artists_table = df.select("artist_id","artist_name", "artist_location", "artist_latitude", "artist_longitude")\
                    .withColumnRenamed("artist_name","name")\
                     .withColumnRenamed("artist_location","location")\
                      .withColumnRenamed("artist_latitude","lattitude")\
                       .withColumnRenamed("artist_longitude","longitude")\
                        .dropDuplicates()

In [63]:
artists_table.write.parquet(output_data+"artist_table/", mode = "overwrite")

In [12]:
artists_table.show(5)

+------------------+-----------------+--------------------+---------+---------+
|         artist_id|             name|            location|lattitude|longitude|
+------------------+-----------------+--------------------+---------+---------+
|AR5LMPY1187FB573FE|Chaka Khan_ Rufus|         Chicago, IL| 41.88415|-87.63241|
|ARXR32B1187FB57099|              Gob|                    |     null|     null|
|ARA23XO1187B9AF18F|  The Smithereens|Carteret, New Jersey| 40.57885|-74.21956|
|ARZKCQM1257509D107|       Dataphiles|                    |     null|     null|
|ARC1IHZ1187FB4E920|     Jamie Cullum|                    |     null|     null|
+------------------+-----------------+--------------------+---------+---------+
only showing top 5 rows



In [13]:
artists_table.show(5)

AttributeError: 'NoneType' object has no attribute 'show'

In [13]:
 df1=spark.read.json("s3a://udacity-dend/log-data/2018/11/*.json")

In [14]:
 df1.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
page_df= df1.filter(df1.page=="NextSong")

In [17]:
 users_table =page_df.select("userId", "firstName", "lastName", "gender", "level").dropDuplicates()

In [18]:
 users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [69]:
users_table.write.parquet(output_data+"users_table/", mode = "overwrite")

In [20]:
get_datetime =udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
page_df= page_df.withColumn("start_time",get_datetime (page_df.ts))

In [21]:
   # extract columns to create time table
time_table =page_df.withColumn("hour",hour("start_time"))\
                .withColumn("day",dayofmonth("start_time"))\
                .withColumn("week",weekofyear("start_time"))\
                .withColumn("month",month("start_time"))\
                .withColumn("year",year("start_time"))\
                 .withColumn("weekday",dayofweek("start_time"))
                 
                   

In [22]:
time_table=time_table.select("start_time","hour","week","month","year","weekday").dropDuplicates()

In [75]:
time_table.write.partitionBy("year","month").parquet(output_data+"time_table/", mode = "overwrite")

In [23]:
time_table.show(10)

+--------------------+----+----+-----+----+-------+
|          start_time|hour|week|month|year|weekday|
+--------------------+----+----+-----+----+-------+
|2018-11-03 16:58:...|  16|  44|   11|2018|      7|
|2018-11-14 05:08:...|   5|  46|   11|2018|      4|
|2018-11-12 06:13:...|   6|  46|   11|2018|      2|
|2018-11-14 12:11:...|  12|  46|   11|2018|      4|
|2018-11-21 19:48:...|  19|  47|   11|2018|      4|
|2018-11-28 21:42:...|  21|  48|   11|2018|      4|
|2018-11-06 06:13:...|   6|  45|   11|2018|      3|
|2018-11-30 17:47:...|  17|  48|   11|2018|      6|
|2018-11-20 20:57:...|  20|  47|   11|2018|      3|
|2018-11-29 21:00:...|  21|  48|   11|2018|      5|
+--------------------+----+----+-----+----+-------+
only showing top 10 rows



In [24]:
songplays_table = page_df.join(df,(page_df.song==df.title))\
                        .select(
                                "start_time",
                                col('userId').alias('user_id'),
                                "level",
                                "song_id",
                                "artist_id",
                                col('sessionId').alias('session_id'),
                                "location",
                                col('userAgent').alias('user_agent'))\
                            .withColumn('songplay_id', monotonically_increasing_id())

In [25]:
songplays_table.show()

+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|songplay_id|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|2018-11-15 16:19:...|     97| paid|SOBLFFE12AF72AA5BA|ARJNIUY12298900C91|       605|Lansing-East Lans...|"Mozilla/5.0 (X11...|          0|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+



In [26]:
songplays_table=songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner")\
                                .select("songplay_id",
                                        songplays_table.start_time,
                                        "user_id", 
                                        "level", 
                                        "song_id",
                                        "artist_id",
                                        "session_id",
                                        "location",
                                        "user_agent",
                                        "year",
                                        "month")\
                                .dropDuplicates()


In [80]:
songplays_table.write.partitionBy("year","month").parquet(output_data+"songplays_table/", mode = "overwrite")

In [27]:
songplays_table.show(10)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-15 16:19:...|     97| paid|SOBLFFE12AF72AA5BA|ARJNIUY12298900C91|       605|Lansing-East Lans...|"Mozilla/5.0 (X11...|2018|   11|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+

