In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_date
from pyspark.sql import types as T

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

## Create spark dataframe to songs

In [4]:
df_songs = spark.read.json("s3a://udacity-dend/song_data/A/B/C/*.json")

In [5]:
df_songs.show(5)

+------------------+---------------+---------------+----------------+----------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|     artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+----------------+---------+---------+------------------+--------------------+----+
|ARLTWXK1187FB5A3F8|       32.74863| Fort Worth, TX|       -97.32925|     King Curtis|326.00771|        1|SODREIN12A58A7F2E5|A Whiter Shade Of...|   0|
|ARIOZCU1187FB3A3DC|           null|     Hamlet, NC|            null|   JOHN COLTRANE|220.44689|        1|SOCEMJV12A6D4F7667|Giant Steps (Alte...|   0|
|ARPFHN61187FB575F6|       41.88415|    Chicago, IL|       -87.63241|     Lupe Fiasco|279.97995|        1|SOWQTQZ12A58A7B63E|Streets On Fire (...|   0|
|AR5S9OB1187B9931E3|       34.05349|Los Angeles, CA|      -118.24532|     Bullet Boys|15

## Create songs table

In [6]:
songs_columns = ['song_id', 'title', 'artist_id', 'year', 'duration']

In [7]:
songs_table = df_songs[songs_columns].dropDuplicates()

In [8]:
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOQFYBD12AB0182188,Intro,ARAADXM1187FB3ECDB,1999,67.63057
1,SOFIUVJ12A8C13C296,Will You Tell Me Then,AR9OEB71187B9A97C6,2005,397.16526
2,SOGDBUF12A8C140FAA,Intro,AR558FS1187FB45658,2003,75.67628
3,SOBHXUU12A6D4F5F14,National Emblem (March),ARBDJHO1252CCFA6FC,0,188.73424
4,SOIKLJM12A8C136355,Eso Duele,AR7AE0W1187B98E40E,2003,196.25751


## Create Artists table

In [9]:
artists_columns = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [10]:
artists_table = df_songs[artists_columns].dropDuplicates()

In [11]:
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
1,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
2,ARJIE2Y1187B994AB7,Line Renaud,,,
3,ARVBRGZ1187FB4675A,Gwen Stefani,,,
4,ARCKOJF1241B9C75B4,Eddie Sierra,,,


## Create spark dataframe to logs

In [12]:
df_logs = spark.read.json("s3a://udacity-dend/log_data/2018/11/*.json")

In [13]:
df_logs.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


## Create Users table

In [14]:
users_columns = ['userId', 'firstName', 'lastName', 'gender', 'level']

In [15]:
users_table = df_logs[users_columns].dropDuplicates()

In [16]:
users_table.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,57,Katherine,Gay,F,free
1,84,Shakira,Hunt,F,free
2,22,Sean,Wilson,F,free
3,52,Theodore,Smith,M,free
4,80,Tegan,Levine,F,paid


## Create Time table

In [17]:
get_timestamp = udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType())
get_hour = udf(lambda x: x.hour, T.IntegerType()) 
get_day = udf(lambda x: x.day, T.IntegerType()) 
get_week = udf(lambda x: x.isocalendar()[1], T.IntegerType()) 
get_month = udf(lambda x: x.month, T.IntegerType()) 
get_year = udf(lambda x: x.year, T.IntegerType()) 
get_weekday = udf(lambda x: x.weekday(), T.IntegerType()) 
    
df_logs = df_logs.withColumn("timestamp", get_timestamp(df_logs.ts))
df_logs = df_logs.withColumn("hour", get_hour(df_logs.timestamp))
df_logs = df_logs.withColumn("day", get_day(df_logs.timestamp))
df_logs = df_logs.withColumn("week", get_week(df_logs.timestamp))
df_logs = df_logs.withColumn("month", get_month(df_logs.timestamp))
df_logs = df_logs.withColumn("year", get_year(df_logs.timestamp))
df_logs = df_logs.withColumn("weekday", get_weekday(df_logs.timestamp))

In [18]:
df_logs.limit(5).show()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+----+---+----+-----+----+-------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|           timestamp|hour|day|week|month|year|weekday|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+----+---+----+-----+----+-------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26

In [19]:
time_columns = ['timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday']

In [20]:
time_table = df_logs[time_columns]

In [21]:
time_table.limit(5).toPandas()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,3
1,2018-11-15 00:41:21.796,0,15,46,11,2018,3
2,2018-11-15 00:45:41.796,0,15,46,11,2018,3
3,2018-11-15 01:57:51.796,1,15,46,11,2018,3
4,2018-11-15 03:29:37.796,3,15,46,11,2018,3


## Create Songplays table

In [22]:
df_songplays = df_songs.join(df_logs, (df_songs.title == df_logs.song)).where(df_logs.page == 'NextSong').orderBy(df_logs.timestamp)

In [23]:
songplays_table = df_songplays['timestamp', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']

In [24]:
songplays_table = songplays_table.withColumn('songplay_id',monotonically_increasing_id())
songplays_table.limit(5).toPandas()

Unnamed: 0,timestamp,userId,level,song_id,artist_id,sessionId,location,userAgent,songplay_id
0,2018-11-14 05:06:03.796,10,free,SOGDBUF12A8C140FAA,AR558FS1187FB45658,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",0
1,2018-11-14 05:06:03.796,10,free,SOQFYBD12AB0182188,ARAADXM1187FB3ECDB,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1
2,2018-11-19 09:14:20.796,24,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,672,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2
3,2018-11-19 09:14:20.796,24,paid,SOQFYBD12AB0182188,ARAADXM1187FB3ECDB,672,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",3
4,2018-11-27 22:35:59.796,80,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,992,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",4
