In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [2]:
spark = create_spark_session()

from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Dat, TimestampType
input_data = "s3a://udacity-dend/"
song_data = input_data + 'song_data/*/*/*/*.json'

songSchema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Int()),
    Fld("title",Str()),
    Fld("year",Int()),
])

df = spark.read.json(song_data, schema=songSchema)

In [32]:
# Getting the paths of the files for parallel ingestion
import boto3

s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket='udacity-dend', Prefix="song_data")

bucket = "udacity-dend"
files = ["s3a://{}/{}".format(bucket, item["Key"]) for item in response["Contents"]]
del files[0] #remove the directory itself
files[:5:]

In [55]:
# Get from local filesystem
import glob
files = glob.glob("data/song_data/**/*.json", recursive=True)
#del files[0] #remove the directory itself
files[:5:]

['data/song_data/A/A/A/TRAAARJ128F9320760.json',
 'data/song_data/A/A/A/TRAAAFD128F92F423A.json',
 'data/song_data/A/A/A/TRAAAMO128F1481E7F.json',
 'data/song_data/A/A/A/TRAAAAW128F429D538.json',
 'data/song_data/A/A/A/TRAAAEF128F4273421.json']

In [56]:
# get filepath to song data file
#song_data = "s3a://udacity-dend/song_data/*/*/*/*.json"
#song_data = "data/song_data/A/A/A/TRAAAAW128F429D538.json"
song_data = "s3a://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json"

from pyspark.sql.types import *
'''{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null,
"artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", 
"duration": 152.92036, "year": 0}
'''
'''
song_shema = StructType()\
    .add("num_songs", IntegerType())\
    .add("artist_id", StringType())\
    .add("artist_latitude", DoubleType())\
    .add("artist_longitude", DoubleType())\
    .add("artist_location", StringType())\
    .add("artist_name", StringType())\
    .add("song_id", StringType())\
    .add("title", StringType())\
    .add("duration", DoubleType())\
    .add("year", IntegerType())
# read song data file
df = spark.read.schema(song_shema).json(song_data)
'''
df = spark.read.json(files)
df.printSchema()
df.limit(10).toPandas()

# extract columns to create songs table
#songs_table = 

# write songs table to parquet files partitioned by year and artist
#songs_table

# extract columns to create artists table
#artists_table = 

# write artists table to parquet files
#artists_table

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005
5,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
6,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
7,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
8,ARI2JSK1187FB496EF,51.50632,"London, England",-0.12714,Nick Ingman;Gavyn Wright,111.62077,1,SODUJBS12A8C132150,Wessex Loses a Bride,0
9,AROUOZZ1187B9ABE51,40.79195,"New York, NY [Spanish Harlem]",-73.94512,Willie Bobo,168.25424,1,SOBZBAZ12A6D4F8742,Spanish Grease,1997


In [5]:
print("lol")

lol


In [76]:
# extract columns to create songs table
#songs_table = 

# write songs table to parquet files partitioned by year and artist
#songs_table

#song_id, title, artist_id, year, duration

songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"])
songs_table.write.partitionBy("year", "artist_id").parquet("data/songs_table.parquet", mode="overwrite")

In [77]:
# extract columns to create artists table
#artists_table = 

# write artists table to parquet files
#artists_table

#artist_id, name, location, lattitude, longitude

artists_table = df.selectExpr(["artist_id", "artist_name as name", "artist_location as location",\
                               "artist_latitude as latitude", "artist_longitude as longitude"])
artists_table.write.parquet("data/artists_table.parquet", mode="overwrite")

## Processing log_data

In [82]:
'''# get filepath to log data file
log_data =

# read log data file
df = 

# filter by actions for song plays
df = 

# extract columns for users table    
artists_table = 

# write users table to parquet files
artists_table

# create timestamp column from original timestamp column
get_timestamp = udf()
df = 

# create datetime column from original timestamp column
get_datetime = udf()
df = 

# extract columns to create time table
time_table = 

# write time table to parquet files partitioned by year and month
time_table

# read in song data to use for songplays table
song_df = 

# extract columns from joined song and log datasets to create songplays table 
songplays_table = 

# write songplays table to parquet files partitioned by year and month
songplays_table
'''

'# get filepath to log data file\nlog_data =\n\n# read log data file\ndf = \n\n# filter by actions for song plays\ndf = \n\n# extract columns for users table    \nartists_table = \n\n# write users table to parquet files\nartists_table\n\n# create timestamp column from original timestamp column\nget_timestamp = udf()\ndf = \n\n# create datetime column from original timestamp column\nget_datetime = udf()\ndf = \n\n# extract columns to create time table\ntime_table = \n\n# write time table to parquet files partitioned by year and month\ntime_table\n\n# read in song data to use for songplays table\nsong_df = \n\n# extract columns from joined song and log datasets to create songplays table \nsongplays_table = \n\n# write songplays table to parquet files partitioned by year and month\nsongplays_table\n'

In [85]:
import glob
files = glob.glob("data/log-data/*.json", recursive=True)
#del files[0] #remove the directory itself
files[:5:]

['data/log-data/2018-11-11-events.json',
 'data/log-data/2018-11-22-events.json',
 'data/log-data/2018-11-18-events.json',
 'data/log-data/2018-11-16-events.json',
 'data/log-data/2018-11-02-events.json']

In [91]:
# read log data file
df = spark.read.json(files)
df.printSchema()
df.limit(5).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [92]:
# Get only play music event
df = df.filter("page == 'NextSong'")
df.limit(5).toPandas()
# filter by actions for song plays
#df = 

# extract columns for users table    
#artists_table = 

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [97]:
# user_id, first_name, last_name, gender, level
users_table = df.selectExpr(["userId as user_id", "firstName as first_name", "lastName as last_name",\
                            "gender", "level"])
users_table.write.parquet("users_table.parquet", mode="overwrite")

In [117]:
from pyspark.sql.types import TimestampType
df = df.withColumn("ts_timestamp", (df.ts/1000).cast(TimestampType()).cast(Date))
df.printSchema()
df.select("ts","ts_timestamp").show(10) #TODO: check if the conversion is rigth
# create timestamp column from original timestamp column
#get_timestamp = udf()
#df = 

# create datetime column from original timestamp column
#get_datetime = udf()
#df = 

# extract columns to create time table
#time_table = 

# write time table to parquet files partitioned by year and month
#time_table
#df.withColumn("ts", )


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_timestamp: date (nullable = true)

+-------------+------------+
|           ts|ts_timestamp|
+-------------+------------+
|1542241826796|  2018-11-14|
|1542242481796|  2018-11-14|
|1542242741796|  2018-11-14|
|1542253449796|  2018-11-15|
|1542260935796|  2018-11-15|
|1542261224796|  2018-11-15|
