In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import os
import configparser

In [2]:
config = configparser.ConfigParser()
config.read("dl.cfg")

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [7]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
        .config("spark.sql.files.ignoreCorruptFiles", "true")\
        .getOrCreate()

In [6]:
spark.version

NameError: name 'spark' is not defined

In [7]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://hasitha-data-lake/sparkify/"

In [8]:
song_data_path = os.path.join(input_data, "song-data/A/A/*")
song_data_path

's3a://udacity-dend/song-data/A/A/A'

In [7]:

#song_df = spark.read.json('s3a://udacity-dend/song-data/A/A/A/TRAAAAK128F9318786.json')

song_df = spark.read.option("mode", "DROPMALFORMED").json(song_data_path)

    

In [8]:
song_df.count()

24

In [9]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
# Chage column type
song_df = song_df.withColumn('artist_latitude', F.col('artist_latitude').cast(T.DoubleType())).withColumn('artist_longitude', F.col('artist_longitude').cast(T.DoubleType()))

In [11]:
# Read log data
log_df= spark.read.option("inferschema", "true").json('s3a://udacity-dend/log-data/2018/11/2018-11-01-events.json')

In [12]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [13]:
# Chage column names
new_names = ['artist', 'auth', 'first_name', 'gender', 'items_in_session', 'last_name', 'length', 'level', 'location', 'method','page','registration', 'session_id', 'song', 'status', 'ts', 'user_agent', 'user_id']
log_df = log_df.toDF(*new_names)

In [14]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- items_in_session: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- session_id: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- user_id: string (nullable = true)



In [15]:
# In JSON user_id is defiend as string. Chage that to integer
log_df = log_df.withColumn('user_id', F.col('user_id').cast(T.LongType()) )

# Add new timestamp column by converting 'ts' column. Assumed 'PST' timezone
log_df = log_df.withColumn("time_stamp", F.to_utc_timestamp(F.from_unixtime(F.col("ts")/1000,'yyyy-MM-dd HH:mm:ss'),'PST'))

In [16]:

log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- items_in_session: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- session_id: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- time_stamp: timestamp (nullable = true)



In [17]:
log_df.limit(5).toPandas()

Unnamed: 0,artist,auth,first_name,gender,items_in_session,last_name,length,level,location,method,page,registration,session_id,song,status,ts,user_agent,user_id,time_stamp
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919000000.0,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39,2018-11-02 03:57:10
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540345000000.0,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-02 04:01:46
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-02 04:01:46
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540345000000.0,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-02 04:02:12
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-02 04:05:52


In [34]:
# extract columns to create songs table
songs_table = song_df.select('song_id','title', 'artist_id', 'year', 'duration').dropDuplicates(['song_id'])


#songs_table = songs_table.dropDuplicates("song_id")
songs_table.toPandas()

# write songs table to parquet files partitioned by year and artist
#songs_table.write.mode('overwrite').partitionBy("year","artist_id").parquet("./test1/songs.parquet")

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBLFFE12AF72AA5BA,Scream,ARJNIUY12298900C91,2009,213.9424


In [20]:
songs_table.show()

+------------------+------+------------------+----+--------+
|           song_id| title|         artist_id|year|duration|
+------------------+------+------------------+----+--------+
|SOBLFFE12AF72AA5BA|Scream|ARJNIUY12298900C91|2009|213.9424|
+------------------+------+------------------+----+--------+



In [30]:
# extract columns to create artists table
artists_table = song_df.selectExpr('artist_id', 'artist_name as name', 'artist_location as location', 'artist_latitude as latitude', 'artist_longitude as longitude').dropDuplicates(['artist_id'])


In [50]:
# Write artist table to parquet file
#artists_table.write.mode('overwrite').parquet("./test2/artists.parquet")

In [36]:
# extract Columns to create time table
time_table = log_df.filter(log_df.page=='NextSong').selectExpr('time_stamp', 'hour(time_stamp) as hour', 'day(time_stamp) as day', 'weekofyear(time_stamp) as week', 'month(time_stamp) as month', 'year(time_stamp) as year', 'weekday(time_stamp) as weekday').dropDuplicates(['time_stamp'])
time_table.limit(5).show()

+-------------------+----+---+----+-----+----+-------+
|         time_stamp|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-02 05:23:14|   5|  2|  44|   11|2018|      4|
|2018-11-02 04:24:53|   4|  2|  44|   11|2018|      4|
|2018-11-02 04:28:54|   4|  2|  44|   11|2018|      4|
|2018-11-02 04:05:52|   4|  2|  44|   11|2018|      4|
|2018-11-02 04:42:00|   4|  2|  44|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+



In [35]:
# extract columns for users table    
users_table = log_df.filter(log_df.page=='NextSong').select('user_id', 'first_name', 'last_name', 'gender', 'level').dropDuplicates(['user_id'])
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     10|    Sylvie|     Cruz|     F| free|
|    101|    Jayden|      Fox|     M| free|
|      8|    Kaylee|  Summers|     F| free|
+-------+----------+---------+------+-----+



In [74]:
l = log_df.filter(log_df.page=='NextSong').alias("l")
a = artists_table.alias("a")
s = songs_table.alias("s")

songplays_table = l.join(a, F.col("l.artist")==F.col("a.name"), "left")\
                    .join(s, [(F.col("l.song")==F.col("s.title")) & (F.col("l.length")==F.col("s.duration"))], "left")\
                    .select(F.monotonically_increasing_id().alias('songplay_id'), 'l.time_stamp', 'l.user_id', 'l.level', 's.song_id', 'a.artist_id', 'l.session_id', 'a.location', 'l.user_agent')


songplays_table.limit(5).show()


+-----------+-------------------+-------+-----+-------+---------+----------+--------+--------------------+
|songplay_id|         time_stamp|user_id|level|song_id|artist_id|session_id|location|          user_agent|
+-----------+-------------------+-------+-----+-------+---------+----------+--------+--------------------+
|          0|2018-11-02 04:01:46|      8| free|   null|     null|       139|    null|"Mozilla/5.0 (Win...|
|          1|2018-11-02 04:05:52|      8| free|   null|     null|       139|    null|"Mozilla/5.0 (Win...|
|          2|2018-11-02 04:08:16|      8| free|   null|     null|       139|    null|"Mozilla/5.0 (Win...|
|          3|2018-11-02 04:11:13|      8| free|   null|     null|       139|    null|"Mozilla/5.0 (Win...|
|          4|2018-11-02 04:17:33|      8| free|   null|     null|       139|    null|"Mozilla/5.0 (Win...|
+-----------+-------------------+-------+-----+-------+---------+----------+--------+--------------------+



In [None]:
# Read data from parquet
output_data = "s3a://hasitha-data-lake/sparkify/"

# read in song data to use for songplays table
song_data_path = os.path.join(output_data, "songs.parquet")
songs_df = spark.read.json(song_data_path)

In [None]:
# read in song data to use for songplays table
artist_data_path = os.path.join(output_data, "artists.parquet")
artists_df = spark.read.json(artist_data_path)
