### Test notebook for prototyping

**Name: Darren Foley**

**Email: darren.foley@ucdconnect.ie**

In [1]:
import os
import configparser
from datetime import datetime
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [2]:
!pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [3]:
import findspark
findspark.init()

In [4]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

AWS_ACCESS_KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
AWS_SECRET_KEY=config.get('AWS','AWS_SECRET_ACCESS_KEY')

#!echo $AWS_ACCESS_KEY_ID
#!echo $AWS_SECRET_ACCESS_KEY

In [5]:

conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0')
sc = SparkContext(conf=conf)

# add aws credentials
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

#creating the context
sqlContext = SQLContext(sc)

In [6]:
song_schema = StructType([StructField("artist_id",StringType(),True), \
                         StructField("artist_latitude",DoubleType(),True), \
                         StructField("artist_location",StringType(),True), \
                         StructField("artist_longitude",DoubleType(),True), \
                         StructField("artist_name",StringType(),True), \
                         StructField("duration",DoubleType(),True), \
                         StructField("num_songs",LongType(),True), \
                         StructField("song_id",StringType(),True), \
                         StructField("title",StringType(),True), \
                         StructField("year",LongType(),True)])

In [7]:
#reading the first csv file and store it in an RDD
#song_data = sqlContext.read.json("s3a://udacity-dend/song_data/*/*/*/*.json", schema = song_schema)
song_data = sqlContext.read.json("s3a://udacity-dend/song_data/A/A/[A-Z]/*.json")

In [8]:
song_data.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARSUVLW12454A4C8B8,35.83073,Tennessee,-85.97874,Royal Philharmonic Orchestra/Sir Thomas Beecham,94.56281,1,SOBTCUI12A8AE48B70,Faust: Ballet Music (1959 Digital Remaster): V...,0
1,ARXQC081187FB4AD42,54.31407,UK,-2.23001,William Shatner_ David Itkin_ The Arkansas Sym...,1047.71873,1,SOXRPUH12AB017F769,Exodus: Part I: Moses and Pharaoh,0
2,ARWUNH81187FB4A3E0,,"Miami , Florida",,Trick Daddy,227.10812,1,SOVNKJI12A8C13CB0D,Take It To Da House (Featuring The Slip N' Sli...,2001
3,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
4,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0


In [9]:
song_data.count()

604

In [10]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



#### Extract to songs table

In [13]:
#song_id, title, artist_id, year, duration
#rdd.select(["song_id","title","artist_id","year","duration"]).limit(5).toPandas()
songs = song_data.select(["song_id","title","artist_id","year","duration"]).distinct()
#songs.count()

In [15]:
songs.select(["song_id"]).distinct().count()

604

In [16]:
songs.select(["song_id"]).count()

604

In [22]:
songs.select(["artist_id"]).distinct().count()

587

#### Extract to artist table

In [20]:
#Artists table
#artist_id, name, location, lattitude, longitude
artist_window = Window.partitionBy("artist_id").orderBy("duration")
song_data_with_window = song_data.withColumn("row_number_1",row_number().over(artist_window))

artist = song_data_with_window.filter(song_data_with_window.row_number_1 == 1).select(["artist_id","artist_name","artist_location","artist_latitude","artist_longitude"]).distinct()
#artist.select(["artist_name"]).count()

In [21]:
artist.orderBy("artist_id").limit(20).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR06EB01187FB40150,NOFX,"Berkeley, CA",,
1,AR06XSY1187B9B279E,Little River Band,"Melbourne, Australia",,
2,AR08LXJ1187B9995A4,Tungtvann,,,
3,AR08VNE1187FB45C2F,Dance With A Stranger,,,
4,AR0IT221187B999C4D,The Weathermen,BELGIUM,50.50101,4.47684
5,AR0L04E1187B9AE90C,The Verve,"Wigan, Lancashire, England",,
6,AR0MWD61187B9B2B12,The (International) Noise Conspiracy,,,
7,AR0TKGM1187B98B40E,Stereolab,London,51.50632,-0.12714
8,AR0WQ0N1187FB3AAB9,The Accüsed,,,
9,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,


In [23]:
artist.select(["artist_id"]).count()

587

#### Window Function to remove duplicates

#### Partitioning

Songs table files are partitioned by year and then artist. - Songs(year,Artist)

Time table files are partitioned by year and month. - Time(year, month)

Songplays table files are partitioned by year and month. - songPlays(year, month)

In [15]:
songs.write.format("parquet").partitionBy("year", "artist_id").mode("overwrite").save("tmp/parquet_test.parquet")
artist.write.format("parquet").partitionBy("artist_name").mode("overwrite").save("tmp/parquet_test.parquet")

#### Writing parquet to S3 bucket in us-west-2

In [19]:
artist.write.format("parquet").partitionBy("artist_name").mode("overwrite").parquet("s3a://sparkify-data-lake-df/test/artist")

### Processing log data

s3a://udacity-dend/log_data/


Sample path: s3a://udacity-dend/log_data/2018/11/2018-11-13-events.json


In [24]:
log_data= sqlContext.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-13-events.json")
#log_data= sqlContext.read.json("s3a://udacity-dend/log_data/*/*/*.json")

In [25]:
log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1540007000000.0,514,,200,1542069417796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
1,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,514,Ja I Ty,200,1542069637796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
2,,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540677000000.0,510,,200,1542071524796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51
3,All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,510,A Party Song (The Walk of Shame),200,1542071549796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51
4,Nik & Jay,Logged In,Wyatt,M,0,Scott,196.51873,free,"Eureka-Arcata-Fortuna, CA",PUT,NextSong,1540872000000.0,379,Pop-Pop!,200,1542079142796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9


In [26]:
log_data.select(["userId","level"]).count()

394

In [27]:
log_data.select(["userId", "level"]).distinct().count()

29

In [8]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



#### SongPlay: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#### Users: user_id, first_name, last_name, gender, level
#### Time: start_time, hour, day, week, month, year, weekday

In [30]:
# Users
user_window = Window.partitionBy("userId").orderBy(col("ts").desc())
log_data_with_window = log_data.withColumn("row_number_1",row_number().over(user_window))


users = log_data_with_window.filter(log_data_with_window.row_number_1 == 1).filter(log_data_with_window.userId != "").select(["userId", "firstName", "lastName", "gender", "level"]).distinct()
#users.limit(5).toPandas()

In [31]:
users.orderBy("userId").limit(20).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,14,Theodore,Harris,M,free
1,15,Lily,Koch,F,paid
2,25,Jayden,Graves,M,paid
3,26,Ryan,Smith,M,free
4,29,Jacqueline,Lynch,F,paid
5,32,Lily,Burns,F,free
6,37,Jordan,Hicks,F,free
7,40,Tucker,Garrison,M,free
8,44,Aleena,Kirby,F,paid
9,49,Chloe,Cuevas,F,free


In [32]:
users.select(["userId"]).count()

28

In [33]:
users.select(["userId"]).distinct().count()

28

In [12]:
#start_time, hour, day, week, month, year, weekday
# Time
get_hour = udf(lambda x: x.hour)
get_day = udf(lambda x: x.day)
get_week = udf(lambda x: x.isocalendar()[1])
get_month = udf(lambda x: x.month)
get_year = udf(lambda x: x.year)
get_weekday = udf(lambda x: x.isoweekday())
to_timestamp = udf(lambda x: int(x.timestamp()*1000))
to_datetime = udf(lambda x : datetime.utcfromtimestamp(x/1000.0))

time_df = log_data.withColumn("ts_m", to_datetime("ts"))

In [13]:
time = time_df.select(["ts_m"]).withColumn("ts", to_timestamp(time_df.ts_m)) \
                        .withColumn("hour", get_hour(time_df.ts_m)) \
                        .withColumn("day", get_day(time_df.ts_m)) \
                        .withColumn("week", get_week(time_df.ts_m)) \
                        .withColumn("month", get_month(time_df.ts_m)) \
                        .withColumn("year", get_year(time_df.ts_m)) \
                        .withColumn("weekday", get_weekday(time_df.ts_m)) \
                        .select(["ts","hour","day","week","month","year","weekday"]) \
                        .distinct()

#time.filter(time.ts == 1542071549796).limit(1).toPandas()

In [14]:
# SongPlay data (songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
songplay = log_data.select(["ts","userId","level","song","artist", "sessionId", "location", "userAgent"])
log_data.select(["ts","userId","level","song","artist", "sessionId", "location", "userAgent"]).limit(5).toPandas()

Unnamed: 0,ts,userId,level,song,artist,sessionId,location,userAgent
0,1542069417796,66,free,,,514,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,1542069637796,66,free,Ja I Ty,Fu,514,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
2,1542071524796,51,free,,,510,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK..."
3,1542071549796,51,free,A Party Song (The Walk of Shame),All Time Low,510,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK..."
4,1542079142796,9,free,Pop-Pop!,Nik & Jay,379,"Eureka-Arcata-Fortuna, CA",Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....


In [23]:
artist_only = artist.select(["artist_id","artist_name"]).distinct()
songs_only = songs.select(["song_id","title"]).distinct()
 
result = songplay.join(artist_only, artist_only.artist_name == songplay.artist, 'inner') \
                 .join(songs_only, songs_only.title == songplay.song, 'inner') \
                 .select(songplay.ts, songplay.userId, songplay.level, songs_only.song_id, artist_only.artist_id, songplay.sessionId, songplay.location, songplay.userAgent) \


In [24]:
result.limit(5).toPandas()

Unnamed: 0,ts,userId,level,song_id,artist_id,sessionId,location,userAgent
0,1542125570796,37,free,SOJRCDU12AB0189D27,ARKZJ301187FB521B2,547,"Salinas, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,1542148779796,55,free,SOXQYSC12A6310E908,AR0L04E1187B9AE90C,415,"Minneapolis-St. Paul-Bloomington, MN-WI","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


#### Sample Query to prove everything has worked correctly

Who are the top ten users of the platform?

In [25]:
result.createOrReplaceTempView("songplay")
users.createOrReplaceTempView("users")

In [28]:
query_result = sqlContext.sql("""
    SELECT
        u.firstName,
        u.lastName,
        sp.userId,
        COUNT(DISTINCT sp.sessionId) as session_count
    FROM songplay sp
    JOIN users u
    ON u.userId = sp.userId
    GROUP BY 
        u.firstName,
        u.lastName,
        sp.userId
    ORDER BY session_count DESC
    LIMIT 10
""")

In [29]:
query_result.limit(10).toPandas()

Unnamed: 0,firstName,lastName,userId,session_count
0,Jordan,Hicks,37,1
1,Martin,Johnson,55,1


#### Reading from parquet files into spark

Then run a sample query

In [5]:

conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0')
sc = SparkContext(conf=conf)

# add aws credentials
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

#creating the context
sqlContext = SQLContext(sc)

In [8]:
user_df = sqlContext.read.parquet("s3a://sparkify-data-lake-df/dim_user/")
songPlay_df = sqlContext.read.parquet("s3a://sparkify-data-lake-df/fact_songPlay/")

In [9]:
user_df.limit(5).toPandas()

Unnamed: 0,first_name,last_name,gender,level,user_id
0,Jacqueline,Lynch,F,paid,29
1,Jacqueline,Lynch,F,free,29
2,Kinsley,Young,F,paid,85
3,Kinsley,Young,F,free,85
4,Chloe,Cuevas,F,free,49


In [10]:
songPlay_df.limit(5).toPandas()

Unnamed: 0,ts,user_id,level,song_id,artist_id,session_id,location,userAgent,year,month
0,1541674446796,80,paid,SOXPSNU12AC468744F,ARUJ5A41187FB3F5F1,342,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
1,1542787870796,88,paid,SONQEYS12AF72AABC9,ARLY7P81187B9ACF4D,744,"Sacramento--Roseville--Arden-Arcade, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
2,1543588921796,85,paid,SOPPSDA12AF72A3D97,ARDAF601187FB4CD05,977,"Red Bluff, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_...",2018,11
3,1543350042796,88,paid,SOOVUZR12A8AE46B5B,AR2JB471187FB470A2,969,"Sacramento--Roseville--Arden-Arcade, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
4,1542180107796,80,paid,SOHPWUL12A6D4F93B1,ARLMSOV1187B9AE5D4,548,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11


In [11]:
songPlay_df.createOrReplaceTempView("songplay")
user_df.createOrReplaceTempView("users")

In [14]:
query_result = sqlContext.sql("""
    SELECT
        u.first_name,
        u.last_name,
        sp.user_id,
        COUNT(DISTINCT sp.session_id) as session_count
    FROM songplay sp
    JOIN users u
    ON u.user_id = sp.user_id
    GROUP BY 
        u.first_name,
        u.last_name,
        sp.user_id
    ORDER BY session_count DESC
    LIMIT 10
""")

In [15]:
query_result.limit(10).toPandas()

Unnamed: 0,first_name,last_name,user_id,session_count
0,Chloe,Cuevas,49,28
1,Tegan,Levine,80,22
2,Kate,Harrell,97,13
3,Lily,Koch,15,11
4,Aleena,Kirby,44,10
5,Ava,Robinson,50,8
6,Mohammad,Rodriguez,88,8
7,Matthew,Jones,36,8
8,Layla,Griffin,24,7
9,Jacqueline,Lynch,29,7
