In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
input_data = "s3a://udacity-dend/"


output_data = "output/"
input_data ="data/"


song_data_path = "song_data/*/*/*/*.json"
log_data_path = "log_data/*.json"

s3 = 's3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json'


In [9]:
#song_data = spark.read.json("s3a://udacity-dend/song_data/*/*/*")

## Process song_data

In [5]:
song_data = spark.read.json(input_data+song_data_path)

In [6]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
song_data.describe()

DataFrame[summary: string, artist_id: string, artist_latitude: string, artist_location: string, artist_longitude: string, artist_name: string, duration: string, num_songs: string, song_id: string, title: string, year: string]

In [13]:
song_data.show(n=1)

+------------------+---------------+---------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|               |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
+------------------+---------------+---------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
only showing top 1 row



In [18]:
print(song_data.count())

71


## Create songs table

In [15]:
song_data.createOrReplaceTempView("song_data_view")

In [16]:
spark.sql("""select
    song_id, title, artist_id, year, duration
    from song_data_view
""").show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SODUJBS12A8C132150|Wessex Loses a Bride|ARI2JSK1187FB496EF|   0|111.62077|
|SOZHPGD12A8C1394FE|     Baby Come To Me|AR9AWNF1187B9AB0B4|   0|236.93016|
|SOGXHEG12AB

In [17]:
spark.sql("""select
    count(*)
    from song_data_view
""").show()

+--------+
|count(1)|
+--------+
|      71|
+--------+



In [18]:
spark.sql("""select
    count(distinct(song_id))
    from song_data_view
""").show()

+-----------------------+
|count(DISTINCT song_id)|
+-----------------------+
|                     71|
+-----------------------+



In [19]:
songs = spark.sql("""select
    song_id, title, artist_id, year, duration
    from song_data_view
""")

In [20]:
songs.write.parquet(output_data+"songs.parquet",mode='overwrite',partitionBy=("year", "artist_id"))

## Create artists table

In [21]:
artists = spark.sql("""select
    artist_id,
    artist_name as name,
    artist_location as location,
    artist_latitude as latitude,
    artist_longitude as longitude
    from song_data_view
""")

In [22]:
artists.show()

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|      null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|      null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624| -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455| -74.00712|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086| -73.96644|
|ARDNS031187B9924F0|          Tim Wilson|             Georgia|32.67828| -83.22295|
|ARLTWXK1187FB5A3F8|         King Curtis|      Fort Worth, TX|32.74863| -97.32925|
|ARPFHN61187FB575F6|         Lupe Fiasco|         Chicago, IL|41.88415| -87.63241|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632|  -0.12714|
|AR9

In [23]:
artists.write.parquet(output_data+"artists.parquet",mode='overwrite')

## Process log_data

In [7]:
log_data = spark.read.json(input_data+log_data_path)

In [8]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [26]:
log_data.count()

8056

In [9]:
log_data = log_data.filter(log_data.page == "NextSong")
log_data.count()

6820

## Create time table

In [7]:
get_time = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))

In [8]:
log_data = log_data.withColumn("start_time", get_time(log_data.ts))

In [9]:
log_data.createOrReplaceTempView("log_data_view")
time = spark.sql("""
    select start_time, 
           hour(start_time) as hour, 
           day(start_time)  as day, 
           weekofyear(start_time) as week,
           month(start_time) as month,
           year(start_time) as year,
           dayofweek(start_time) as weekday
    from log_data_view
""")

In [11]:
time.show(10)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:26|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:21|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:41|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:09|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:55|   5| 15|  46|   11|2018|      5|
|2018-11-15 05:53:44|   5| 15|  46|   11|2018|      5|
|2018-11-15 05:55:56|   5| 15|  46|   11|2018|      5|
|2018-11-15 06:01:02|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:07:37|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:10:33|   6| 15|  46|   11|2018|      5|
+-------------------+----+---+----+-----+----+-------+
only showing top 10 rows



In [12]:
time.write.parquet(output_data+"time.parquet",mode='overwrite',partitionBy=("year", "month"))

## Create users table

In [33]:
log_data.createOrReplaceTempView("log_data_view")

users = spark.sql("""
    select distinct(userId) as user_id,
    firstName as first_name,
    lastName as last_name,
    gender,
    level
    from log_data_view order by user_id
""")

In [34]:
users.count()

104

In [35]:
users.createOrReplaceTempView("users_view")
spark.sql("""
    select count(user_id)
    from users_view
""").show(200)

+--------------+
|count(user_id)|
+--------------+
|           104|
+--------------+



In [36]:
temp = users.toPandas()

In [37]:
temp

Unnamed: 0,user_id,first_name,last_name,gender,level
0,10,Sylvie,Cruz,F,free
1,100,Adler,Barrera,M,free
2,101,Jayden,Fox,M,free
3,11,Christian,Porter,F,free
4,12,Austin,Rosales,M,free
5,13,Ava,Robinson,F,free
6,14,Theodore,Harris,M,free
7,15,Lily,Koch,F,paid
8,15,Lily,Koch,F,free
9,16,Rylan,George,M,paid


In [38]:
users.write.parquet(output_data+"users.parquet",mode='overwrite')

## Create songplays table

In [19]:
log_data.createOrReplaceTempView("log_data_view")
song_data.createOrReplaceTempView("song_data_view")

songplays = spark.sql("""
    select 
    
    monotonically_increasing_id() as songplay_id,
    from_unixtime(ts/1000) as start_time,
    month(from_unixtime(ts/1000) ) as month,
    year(from_unixtime(ts/1000) ) as year,
    userId as user_id,
    level,
    a.song_id,
    a.artist_id,
    sessionId as session_id,
    location,
    userAgent as user_agent
    
    from song_data_view a right join log_data_view b
    on a.artist_name=b.artist and a.title=b.song and a.duration=b.length
    
""")

In [11]:
song_data.printSchema()
log_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = 

In [12]:
songplays.createOrReplaceTempView("songplays_view")
spark.sql("""
    select 
    
    count(*)
    
    from songplays_view where song_id is not null
    
""").show(200)

+--------+
|count(1)|
+--------+
|       1|
+--------+



In [51]:
songplays.count()

6820

In [17]:
partition = spark.sql("""
    select distinct
    
    month(start_time) as month,
    year(start_time) as year
    
    from songplays_view order by year, month
    
""").toPandas()

In [20]:
songplays.write.parquet(output_data+"songplays.parquet",mode='overwrite',partitionBy=("year", "month"))