In [6]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import expr

In [7]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time

In [8]:
config = configparser.ConfigParser()
config.read('dl.cfg')

KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [9]:
s3 =  boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )


In [10]:
udacityBucket =  s3.Bucket("udacity-dend")
for obj in udacityBucket.objects.filter(Prefix="song_data/A/A/A/TRAAAA"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')


In [11]:
for obj in udacityBucket.objects.filter(Prefix="log_data/2018/11/2018-11-01"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')


In [12]:
for obj in udacityBucket.objects.filter(Prefix="log_json_"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_json_path.json')


In [13]:
#song_data_path = os.path.join("s3a://udacity-dend", "song_data/*/*/*/*.json")
#log_data_path = os.path.join("s3a://udacity-dend", "log_data/*/*/*.json")
song_data_path = "data/song_data/*/*/*.json"
log_data_path = "data/log_data/*.json"

In [14]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [15]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [16]:
log_df = spark.read.json(log_data_path)

In [17]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [18]:
log_df.count()

8056

In [19]:
log_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [20]:
# filter by actions for song plays
log_df = log_df.filter(log_df.page == "NextSong")

In [21]:
log_df.count()

6820

#### Create User Table

In [22]:
# extract columns for users table    
users_table = log_df.selectExpr(["userId as user_id","firstName as first_name", "lastName as last_name", "gender", "level"]).drop_duplicates()

In [23]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [24]:
users_table = users_table.withColumn("user_id", expr("cast(user_id as int)"))

In [24]:
users_table.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [25]:
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free


In [26]:
users_table.count()

104

Write users_table parquet files

In [27]:
#users_table.write.mode("overwrite").parquet("data/output/users_table")

#### Create Time table

Get timestamp from original milliseconds column. The only code works.

In [28]:
log_df = log_df.withColumn("start_time", F.from_unixtime(F.col("ts")/1000))

In [29]:
time_df = log_df.select("start_time").drop_duplicates()

This one doesn't work

In [30]:
#get_timestamp = udf(lambda x: F.from_unixtime(int(x)/1000.0))
#time_df = time_df.withColumn("start_time", get_timestamp(time_df.ts))

This one doesn't work

In [31]:
#get_timestamp = udf(lambda x: pd.to_datetime(x/1000.0, unit="ms"))
#df = df.withColumn("start_time", get_timestamp(F.col("ts")))

This one doesn't work

In [32]:
#get_timestamp = udf(lambda x: datetime.to_timestamp(x/1000.0))
#df = df.withColumn("start_time", get_timestamp(df.ts))

In [33]:
time_df.printSchema()

root
 |-- start_time: string (nullable = true)



In [34]:
time_df.limit(5).toPandas()

Unnamed: 0,start_time
0,2018-11-15 07:56:18
1,2018-11-15 16:51:56
2,2018-11-15 18:31:38
3,2018-11-14 00:41:15
4,2018-11-14 00:53:43


In [35]:
# extract columns to create time table
time_table = time_df.selectExpr(["start_time as start_time", "hour(start_time) as hour", "dayofmonth(start_time) as day", "weekofyear(start_time) as week", "month(start_time) as month", "year(start_time) as year", "dayofweek(start_time) as weekday"])

In [36]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [37]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 07:56:18,7,15,46,11,2018,5
1,2018-11-15 16:51:56,16,15,46,11,2018,5
2,2018-11-15 18:31:38,18,15,46,11,2018,5
3,2018-11-14 00:41:15,0,14,46,11,2018,4
4,2018-11-14 00:53:43,0,14,46,11,2018,4


In [38]:
#time_table.write.partitionBy("year","month").mode("overwrite").parquet("data/output/time_table")

#### Read song data

In [39]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
songSchema = R([
    Fld("num_songs",Int()),
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_name",Str()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("duration",Dbl()),
    Fld("year",Str()),
])

In [40]:
song_df = spark.read.json(song_data_path)

In [41]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [42]:
song_df.count()

71

In [43]:
song_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [44]:
# extract columns to create songs table
songs_table = song_df.select("song_id","title","artist_id","year","duration")

In [56]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [45]:
# write songs table to parquet files partitioned by year and artist
#songs_table.write.partitionBy("year","artist_id").mode("overwrite").parquet("data/output/songs_table")

In [46]:
# extract columns to create artists table
artists_table = song_df.selectExpr("artist_id","artist_name as name", "artist_location as location","artist_latitude as latitude","artist_longitude as longitude")

In [55]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [47]:
#artists_table.write.mode("overwrite").parquet("data/output/artists_table")

#### Load songplays table

In [48]:
# create temp view of song_df and df
songs_table.createOrReplaceTempView("songs")
artists_table.createOrReplaceTempView("artists")
log_df.createOrReplaceTempView("logs")

In [49]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)



In [50]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql("""
    select
            row_number() over (order by start_time) as songplay_id,
            start_time,
            userId as user_id,
            level,
            s.song_id,
            a.artist_id,
            sessionId as session_id, 
            l.location, 
            userAgent as user_agent,
            year(start_time) as year,
            month(start_time) as month
        from 
            logs l
        join
            songs s
        on
            l.song=s.title
            and
            l.length=s.duration
        join
            artists a
        on
            l.artist=name

""")

In [51]:
songplays_table.printSchema()

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [52]:
songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,1,2018-11-21 21:56:47,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11


In [53]:
songplays_table.count()

1

In [54]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year","month").mode("overwrite").parquet("data/output/songplays_table")

In [2]:
def return_tuple():
    return (1,2)

In [3]:
a,b = return_tuple()

In [4]:
a

1

In [5]:
b

2