In [78]:
import configparser
from datetime import datetime
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, weekofyear, date_format, from_unixtime
import pandas as pd

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']["AWS_ACCESS_KEY_ID"]
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']["AWS_SECRET_ACCESS_KEY"]

In [4]:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()

In [5]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://misho-udacity-bucket/datalake/"

# Getting data songs

In [32]:
# df = spark.read.json("s3a://{}:{}@udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json".format(os.environ['AWS_ACCESS_KEY_ID'],os.environ['AWS_SECRET_ACCESS_KEY']))

In [6]:
df = spark.read.json("s3a://{}:{}@udacity-dend/song_data/A/A/*/*.json".format(os.environ['AWS_ACCESS_KEY_ID'],os.environ['AWS_SECRET_ACCESS_KEY']))

In [7]:
df.count()

604

In [43]:
df.limit(10).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
1,ARIOZCU1187FB3A3DC,,"Hamlet, NC",,JOHN COLTRANE,220.44689,1,SOCEMJV12A6D4F7667,Giant Steps (Alternate Version_ Take 5_ Altern...,0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
3,AR5S9OB1187B9931E3,34.05349,"Los Angeles, CA",-118.24532,Bullet Boys,156.62975,1,SOMAPYF12A6D4FEC3E,All Day & All Of The Night,0
4,AR5T40Y1187B9996C6,,"Lulea, Sweden",,The Bear Quartet,249.3122,1,SOAPVNX12AB0187625,I Remember Nights Wide Open,1998
5,AR9OEB71187B9A97C6,,"Edmonton, Alberta, Canada",,Faunts,397.16526,1,SOFIUVJ12A8C13C296,Will You Tell Me Then,2005
6,ARBDJHO1252CCFA6FC,,,,The Band of HM Royal Marines,188.73424,1,SOBHXUU12A6D4F5F14,National Emblem (March),0
7,ARAADXM1187FB3ECDB,34.1688,"Woodland Hills, CA",-118.61092,Styles Of Beyond,67.63057,1,SOQFYBD12AB0182188,Intro,1999
8,ARZJDBC1187FB52056,27.94017,"Brandon, Florida",-82.32547,Nasty Savage,327.00036,1,SOYLILV12A8C136650,XXX,1984
9,AROSPS51187B9B481F,,,,Vince Guaraldi Trio,197.95546,1,SOHTCZS12A6D4FC402,The Christmas Song,1965


In [22]:
song_columns = ['song_id', 'title', 'artist_id', 'year', 'duration']
artist_columns = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [23]:
df_song = df.select(*song_columns)
df_artist = df.select(*artist_columns)

In [21]:
df_song.show()

+------------------+-------------------+------------------+----+---------+
|           song_id|              title|         artist_id|year| duration|
+------------------+-------------------+------------------+----+---------+
|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
+------------------+-------------------+------------------+----+---------+



In [24]:
df_artist.show()

+------------------+-----------+---------------+---------------+----------------+
|         artist_id|artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+-----------+---------------+---------------+----------------+
|ARJIE2Y1187B994AB7|Line Renaud|               |           null|            null|
+------------------+-----------+---------------+---------------+----------------+



### Partition

In [38]:
df_song.write.parquet('output/songs', mode='overwrite', partitionBy=['year', 'artist_id'])

# Write data

In [35]:
df_song.write.parquet(output_data+'/songs/')

# Getting Data logs

In [70]:
df2 = spark.read.json("s3a://{}:{}@udacity-dend/log_data/*/*/*.json"\
                      .format(os.environ['AWS_ACCESS_KEY_ID'],os.environ['AWS_SECRET_ACCESS_KEY']))

In [71]:
df2.count()

8056

In [8]:
df2.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [72]:
df2 = df2.filter(df2['page']=='NextSong')

In [10]:
df2.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [11]:
users_columns = ['userId', 'firstName', 'lastName', 'gender', 'level']

In [12]:
df_users = df2.select(*users_columns)

In [13]:
df_users.limit(2).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,26,Ryan,Smith,M,free
1,26,Ryan,Smith,M,free


In [14]:
# timestamp

In [84]:
df3 = df2.withColumn('datetime', from_unixtime(col('ts')/1000))

In [83]:
df_time = df3.select('datetime')

In [86]:
df_time = df_time.withColumnRenamed('datetime', 'start_time')\
                 .orderBy('start_time', ascending=True)\
                 .withColumn('hour', hour(col('start_time')))\
                 .withColumn('day', dayofmonth(col('start_time')))\
                 .withColumn('week', weekofyear(col('start_time')))\
                 .withColumn('month', month(col('start_time')))\
                 .withColumn('year', year(col('start_time')))\
                 .withColumn('weekday', dayofweek(col('start_time')))

In [80]:
df_time.show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:46|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:52|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:16|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:13|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:33|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:53|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:54|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:42:00|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:52:05|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:25|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:14|  22|  1|  44|   11|2018|      5|
|2018-11-02 01:25:34|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:30:41|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:34:17|   1|  2|  44|   11|2018|      6|
|2018-11-02 02:42:48|   2|  2|  44|   11|2018|      6|
|2018-11-0

In [81]:
df_time.write.parquet('output/time', mode='overwrite', partitionBy=['year', 'month'])