In [1]:
import configparser
from datetime import datetime
import os
import boto3
import pandas as pd

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id 

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, StringType, IntegerType, DateType, LongType, TimestampType

In [4]:
config = configparser.ConfigParser()
config.read_file(open('./dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']


In [5]:
# Create spark session with hadoop-aws package
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [58]:
type(spark)

pyspark.sql.session.SparkSession

In [6]:
# Create aws session 
session = boto3.session.Session()
s3_client = session.client('s3') # a raw, low-level calls made by service clients.
s3_resource = session.resource('s3') # a higher-level abstraction

# Process Song Data

## What does the song data look like?

In [7]:
# Bucket =  s3_resource.Bucket("udacity-dend")
# for obj in Bucket.objects.filter(Prefix="song_data"):
#     print(obj)

In [8]:
song_json =s3_client.get_object(Bucket='udacity-dend', Key='song_data/A/A/A/TRAAAUC128F428716F.json')
song_text = song_json['Body'].read().decode('utf-8')
print(song_text)
# song_df = pd.read_json(song_json['Body'], lines=True)
# song_df.head(3)

{"artist_id":"ARA23XO1187B9AF18F","artist_latitude":40.57885,"artist_location":"Carteret, New Jersey","artist_longitude":-74.21956,"artist_name":"The Smithereens","duration":192.522,"num_songs":1,"song_id":"SOKTJDS12AF72A25E5","title":"Drown In My Own Tears (24-Bit Digitally Remastered 04)","year":0}


## Read raw song data from S3

In [45]:
RawSongSchema = StructType([
    StructField('artist_id', StringType()),
    StructField('artist_latitude', DoubleType()),
    StructField('artist_location', StringType()),
    StructField('artist_longitude', DoubleType()),
    StructField('artist_name', StringType()),
    StructField('duration', DoubleType()),
    StructField('num_songs', IntegerType()),
    StructField('song_id', StringType()),
    StructField('title', StringType()),
    StructField('year', IntegerType())
])

In [46]:
song_df = spark.read.json("s3a://udacity-dend/song_data/A/A/A",  schema=RawSongSchema)

song_df.printSchema()
song_df.show(3)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|       artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|The Bonzo Dog Band|301

## Queries on raw song data

In [47]:
song_df.createOrReplaceTempView("song_raw")
spark.sql("SELECT song_id, title, artist_id, year, duration FROM song_raw limit 3").show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
+------------------+--------------------+------------------+----+---------+



In [12]:
spark.sql("""
SELECT artist_id, artist_name,artist_location, artist_latitude, artist_longitude 
FROM song_raw limit 3
""").show()

+------------------+------------------+--------------------+---------------+----------------+
|         artist_id|       artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+------------------+--------------------+---------------+----------------+
|ARTC1LV1187B9A4858|The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|   The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
|ARSVTNL1187B992A91|     Jonathan King|     London, England|       51.50632|        -0.12714|
+------------------+------------------+--------------------+---------------+----------------+



## Write songs table to parquet files partitioned by year and artist

In [13]:
song_df = spark.sql("""
SELECT song_id, title, artist_id, year, duration FROM song_raw
WHERE song_id is not null
""")
song_df.write.parquet("s3a://mysparkify/data/song", mode="overwrite", partitionBy=['year','artist_id'])

## Write artists table to parquet files

In [14]:
spark.sql("""
SELECT artist_id, artist_name,artist_location, artist_latitude, artist_longitude 
FROM song_raw 
WHERE artist_id is not null
""").write.parquet("s3a://mysparkify/data/artists", mode="overwrite")

## Checking the songs, artists table stored in S3

In [56]:
spark.sql("SELECT * FROM parquet.`s3a://mysparkify/data/song` where year = 2004").show()

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SORRNOC12AB017F52B|The Last Beat Of ...|337.81506|2004|ARSZ7L31187FB4E610|
|SOIGICF12A8C141BC5|        Game & Watch|580.54485|2004|AREWD471187FB49873|
|SOIGHOD12A8C13B5A1|        Indian Angel|171.57179|2004|ARY589G1187B9A9F4E|
+------------------+--------------------+---------+----+------------------+



In [57]:
spark.sql("SELECT * FROM parquet.`s3a://mysparkify/data/artists`").show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|       51.50632|        -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|       37.77916|      -122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|           null|            null|
|ARSZ7L31187FB4E610|           Devotchka|          Denver, CO|       39.74001|      -104.99226|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|           null|            null|
|ARZ5H0P1187B98A1DD|          Snoop Dogg

# Process Log Data

## What does the log data look like?

In [17]:
# Bucket =  s3_resource.Bucket("udacity-dend")
# for obj in Bucket.objects.filter(Prefix="log_data"):
#     print(obj)

In [18]:
log_file_object =s3_client.get_object(Bucket='udacity-dend', Key='log_data/2018/11/2018-11-01-events.json')
file_content = log_file_object['Body'].read().decode('utf-8')
file_content[:200]

'{"artist":null,"auth":"Logged In","firstName":"Walter","gender":"M","itemInSession":0,"lastName":"Frye","length":null,"level":"free","location":"San Francisco-Oakland-Hayward, CA","method":"GET","page'

In [48]:
# result = s3_client.get_object(Bucket='udacity-dend', Key='log_json_path.json')
# text = result['Body'].read().decode('utf-8')
# print(text)

## Read raw song data from S3

In [20]:
RawLogSchema = StructType([
    StructField('artist', StringType()),
    StructField('auth', StringType()),
    StructField('firstName', StringType()),
    StructField('gender', StringType()),
    StructField('itemInSession', IntegerType()),
    StructField('lastName', StringType()),
    StructField('length', DoubleType()),
    StructField('level', StringType()),
    StructField('location', StringType()),
    StructField('method', StringType()),    
    StructField('page', StringType()),
    StructField('registration', StringType()),
    StructField('sessionId', IntegerType()),
    StructField('song', StringType()),
    StructField('status', IntegerType()),
    StructField('ts', LongType()),
    StructField('userAgent', StringType()),
    StructField('userId', StringType())
])
# registration, userId dtype 

In [21]:
log_df = spark.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-01-events.json",  schema=RawLogSchema)

In [52]:
log_df = log_df.select("*").where("page = 'NextSong'")

## Add songplay_id

In [22]:
log_df = log_df.select("*").withColumn("songplay_id", monotonically_increasing_id())

In [23]:
log_df.select("songplay_id", "artist", "firstName").show(3)

+-----------+-------+---------+
|songplay_id| artist|firstName|
+-----------+-------+---------+
|          0|   null|   Walter|
|          1|   null|   Kaylee|
|          2|Des'ree|   Kaylee|
+-----------+-------+---------+
only showing top 3 rows



## Convert ts to timestamps

In [24]:
log_df.select("ts").show(3)

+-------------+
|           ts|
+-------------+
|1541105830796|
|1541106106796|
|1541106106796|
+-------------+
only showing top 3 rows



In [25]:
@udf(TimestampType())
def convert_unix_time_udf(x):
    if x:
        return pd.to_datetime(x,unit='ms')
    else:
        return None

In [26]:
log_df= log_df.withColumn("start_time", convert_unix_time_udf("ts"))

In [27]:
log_df.select("ts", "start_time").show(3)

+-------------+--------------------+
|           ts|          start_time|
+-------------+--------------------+
|1541105830796|2018-11-01 20:57:...|
|1541106106796|2018-11-01 21:01:...|
|1541106106796|2018-11-01 21:01:...|
+-------------+--------------------+
only showing top 3 rows



In [28]:
# pdf = log_df.toPandas()  

## users table

In [29]:
log_df.createOrReplaceTempView("log_raw")
spark.sql("""
SELECT userId as user_id,
firstName as first_name,
lastName as last_name,
gender,
level
FROM log_raw
limit 3
""").show(3)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     39|    Walter|     Frye|     M| free|
|      8|    Kaylee|  Summers|     F| free|
|      8|    Kaylee|  Summers|     F| free|
+-------+----------+---------+------+-----+



In [40]:
spark.sql("""
select user_id, first_name, last_name, gender, level
from
(SELECT userId as user_id, firstName as first_name, lastName as last_name, gender, level
FROM log_raw
WHERE userId is not null)
group by 1,2,3,4,5
""").write.parquet("s3a://mysparkify/data/users", mode="overwrite")

## time table

In [31]:
log_df.select("ts", "start_time", \
                  year("start_time").alias("year"),\
                  month("start_time").alias("month"),\
                  weekofyear("start_time").alias("week"),\
                  dayofmonth("start_time").alias("day"),\
                  hour("start_time").alias("hour"),\
                  date_format("start_time",'E').alias("weekday")).show(3)

+-------------+--------------------+----+-----+----+---+----+-------+
|           ts|          start_time|year|month|week|day|hour|weekday|
+-------------+--------------------+----+-----+----+---+----+-------+
|1541105830796|2018-11-01 20:57:...|2018|   11|  44|  1|  20|    Thu|
|1541106106796|2018-11-01 21:01:...|2018|   11|  44|  1|  21|    Thu|
|1541106106796|2018-11-01 21:01:...|2018|   11|  44|  1|  21|    Thu|
+-------------+--------------------+----+-----+----+---+----+-------+
only showing top 3 rows



In [33]:
spark.sql("""
select start_time, hour, day, week, month, year, weekday
from
(SELECT start_time, hour(start_time) as hour, dayofmonth(start_time) as day,
weekofyear(start_time) as week, month(start_time) as month, 
year(start_time) as year, date_format(start_time,'E') as weekday
FROM log_raw
WHERE start_time is not null)
group by 1,2,3,4,5,6,7
""").show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 22:23:...|  22|  1|  44|   11|2018|    Thu|
|2018-11-01 21:42:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:52:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 20:57:...|  20|  1|  44|   11|2018|    Thu|
|2018-11-01 21:50:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:02:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:24:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|    Thu|
|2018-11-01 21:28:...|  21|  1|  44|   11|2018|    Thu|
+--------------------+----+---+----+-----+----+-

In [34]:
# log_df.createOrReplaceTempView("log_raw")
spark.sql("""
select start_time, hour, day, week, month, year, weekday
from
(SELECT start_time, hour(start_time) as hour, dayofmonth(start_time) as day,
weekofyear(start_time) as week, month(start_time) as month, 
year(start_time) as year, date_format(start_time,'E') as weekday
FROM log_raw
WHERE start_time is not null)
group by 1,2,3,4,5,6,7
""").write.parquet("s3a://mysparkify/data/time", mode="overwrite", partitionBy='year')

## songplays

In [35]:
song_df = spark.read.parquet("s3a://mysparkify/data/song")
artists_df = spark.read.parquet("s3a://mysparkify/data/artists")

song_df.createOrReplaceTempView("songs")
artists_df.createOrReplaceTempView("artists")

In [36]:
log_df.toPandas().columns

Index(['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
       'length', 'level', 'location', 'method', 'page', 'registration',
       'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId',
       'songplay_id', 'start_time'],
      dtype='object')

In [38]:
spark.sql("""
SELECT songplay_id, start_time, b.song_id as song_id, c.artist_id as artist_id,
userId as user_id,
sessionId as session_id,
location,
level,
userAgent as user_agent
FROM log_raw a
left join songs b
on a.song = b.title
left join artists c
on a.artist = c.artist_name
""").write.parquet("s3a://mysparkify/data/songplays", mode="overwrite")

## Checking the users, time, songplays table stored in S3

In [53]:
spark.sql("SELECT * FROM parquet.`s3a://mysparkify/data/users`").show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| free|
|     88|  Mohammad|Rodriguez|     M| paid|
|     75|    Joseph|Gutierrez|     M| free|
|     69|  Anabelle|  Simpson|     F| free|
|     11| Christian|   Porter|     F| free|
|     53|   Celeste| Williams|     F| free|
|     77| Magdalene|   Herman|     F| free|
|     89|   Kynnedi|  Sanchez|     F| free|
|     61|    Samuel| Gonzalez|     M| free|
|     45|  Dominick|   Norris|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     16|     Rylan|   George|     M| free|
|     90|    Andrea|   Butler|     F| free|
|     36|   Matthew|    Jones|     M| free|
|     72|    Hayden|    Brock|     F| paid|
|     40|    Tucker| Garrison|     M| free|
|     28|  Brantley|     West|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     64|    Hannah|  Calhoun|     F| free|
|     52|  Theodore|    Smith|  

In [54]:
spark.sql("SELECT * FROM parquet.`s3a://mysparkify/data/time`").show()

+--------------------+----+---+----+-------+----+-----+
|          start_time|hour|day|week|weekday|year|month|
+--------------------+----+---+----+-------+----+-----+
|2018-11-15 11:40:...|  16| 15|  46|    Thu|2018|   11|
|2018-11-15 12:03:...|  17| 15|  46|    Thu|2018|   11|
|2018-11-15 12:25:...|  17| 15|  46|    Thu|2018|   11|
|2018-11-13 22:26:...|   3| 14|  46|    Wed|2018|   11|
|2018-11-14 07:26:...|  12| 14|  46|    Wed|2018|   11|
|2018-11-14 08:12:...|  13| 14|  46|    Wed|2018|   11|
|2018-11-28 11:32:...|  16| 28|  48|    Wed|2018|   11|
|2018-11-28 16:20:...|  21| 28|  48|    Wed|2018|   11|
|2018-11-28 18:57:...|  23| 28|  48|    Wed|2018|   11|
|2018-11-04 23:40:...|   4|  5|  45|    Mon|2018|   11|
|2018-11-05 10:31:...|  15|  5|  45|    Mon|2018|   11|
|2018-11-13 05:17:...|  10| 13|  46|    Tue|2018|   11|
|2018-11-13 10:54:...|  15| 13|  46|    Tue|2018|   11|
|2018-11-13 15:00:...|  20| 13|  46|    Tue|2018|   11|
|2018-11-13 16:12:...|  21| 13|  46|    Tue|2018

In [55]:
spark.sql("SELECT * FROM parquet.`s3a://mysparkify/data/songplays`").show(3)

+-----------+--------------------+-------+---------+-------+----------+--------------------+-----+--------------------+
|songplay_id|          start_time|song_id|artist_id|user_id|session_id|            location|level|          user_agent|
+-----------+--------------------+-------+---------+-------+----------+--------------------+-----+--------------------+
|          0|2018-11-14 19:30:...|   null|     null|     26|       583|San Jose-Sunnyval...| free|"Mozilla/5.0 (X11...|
|          1|2018-11-14 19:41:...|   null|     null|     26|       583|San Jose-Sunnyval...| free|"Mozilla/5.0 (X11...|
|          2|2018-11-14 19:45:...|   null|     null|     26|       583|San Jose-Sunnyval...| free|"Mozilla/5.0 (X11...|
+-----------+--------------------+-------+---------+-------+----------+--------------------+-----+--------------------+
only showing top 3 rows

