### Test notebook for prototyping

**Name: Darren Foley**

**Email: darren.foley@ucdconnect.ie**

In [1]:
import os
import configparser
from datetime import datetime
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
!pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [3]:
import findspark
findspark.init()

In [4]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

AWS_ACCESS_KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
AWS_SECRET_KEY=config.get('AWS','AWS_SECRET_ACCESS_KEY')

#!echo $AWS_ACCESS_KEY_ID
#!echo $AWS_SECRET_ACCESS_KEY

In [5]:

conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0')
sc = SparkContext(conf=conf)

# add aws credentials
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

#creating the context
sqlContext = SQLContext(sc)

In [6]:
#reading the first csv file and store it in an RDD
song_data = sqlContext.read.json("s3a://udacity-dend/song_data/A/{A,Z}/{A,Z}/*.json")

In [7]:
song_data.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
1,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0
2,ARM66431187FB4CD0B,54.3274,"Downpatrick, Co. Down, Northern Ireland",-5.70496,Ash,384.96608,1,SOVYBAW12A8C144CD4,Twilight Of The Innocents - (i-Tunes Festival),0
3,ARMB9SV1187B99CFF9,,,,The Last Electro-Acoustic Space Jazz & Percuss...,237.03465,1,SOWXPRB12AB018A4CA,Waltz For Woody (For Woody Shaw),2010
4,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001


In [8]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
song_data.count()

73

#### Extract to songs table

In [12]:
#song_id, title, artist_id, year, duration
#rdd.select(["song_id","title","artist_id","year","duration"]).limit(5).toPandas()
songs = song_data.select(["song_id","title","artist_id","year","duration"]).distinct()
#songs.limit(5).toPandas()
songs.count()

73

#### Extract to artist table

In [10]:
#Artists table
#artist_id, name, location, lattitude, longitude
artist = song_data.select(["artist_id","artist_name","artist_location","artist_latitude","artist_longitude"]).distinct()
#artist.limit(5).toPandas()
artist.select(["artist_name"]).count()

73

#### Partitioning

Songs table files are partitioned by year and then artist. - Songs(year,Artist)

Time table files are partitioned by year and month. - Time(year, month)

Songplays table files are partitioned by year and month. - songPlays(year, month)

In [15]:
songs.write.format("parquet").partitionBy("year", "artist_id").mode("overwrite").save("tmp/parquet_test.parquet")
artist.write.format("parquet").partitionBy("artist_name").mode("overwrite").save("tmp/parquet_test.parquet")

#### Writing parquet to S3 bucket in us-west-2

In [19]:
artist.write.format("parquet").partitionBy("artist_name").mode("overwrite").parquet("s3a://sparkify-data-lake-df/test/artist")

### Processing log data

s3a://udacity-dend/log_data/


Sample path: s3a://udacity-dend/log_data/2018/11/2018-11-13-events.json


In [6]:
log_data= sqlContext.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-13-events.json")

In [7]:
log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1540007000000.0,514,,200,1542069417796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
1,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,514,Ja I Ty,200,1542069637796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
2,,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540677000000.0,510,,200,1542071524796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51
3,All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,510,A Party Song (The Walk of Shame),200,1542071549796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51
4,Nik & Jay,Logged In,Wyatt,M,0,Scott,196.51873,free,"Eureka-Arcata-Fortuna, CA",PUT,NextSong,1540872000000.0,379,Pop-Pop!,200,1542079142796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9


In [8]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

