In [259]:
import configparser
from datetime import datetime
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from datetime import datetime

In [213]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [214]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [215]:
spark = create_spark_session()
spark

In [275]:
input_data = "s3a://udacity-dend/"
output_data = ""

In [140]:
log_rdd = sc.textFile(input_data + 'log_data/*/*').map(lambda x: json.loads(x))
log_df = log_rdd.map(lambda items: Row(**items)).toDF()
log_df.printSchema()
log_df.limit(3).toPandas()
log_df.cache()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [10]:
# songSchema = R([
#     Fld('num_songs', Int()),
#     Fld('artist_id', Str()),
#     Fld('artist_latitude', Dbl()),
#     Fld('artist_longitude', Dbl()),
#     Fld('artist_location', Str()),
#     Fld('artist_name', Str()),
#     Fld('song_id', Str()),
#     Fld('title', Str()),
#     Fld('duration', Dbl()),
#     Fld('year', Int())
# ])

In [72]:
# subdir = [x[0] for x in os.walk(song_data)]
# song_df = spark.read.json(subdir, schema=songSchema)
# song_df.printSchema()
# song_df.limit(5).toPandas()

In [94]:
sc = spark.sparkContext
rdd = sc.textFile(input_data + 'song_data/A/A/*').map(lambda x: json.loads(x))
song_df = rdd.map(lambda items: Row(**items)).toDF()
song_df.printSchema()
song_df.limit(3).toPandas()
song_df.cache()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARJNIUY12298900C91,,,,Adelitas Way,213.9424,1,SOBLFFE12AF72AA5BA,Scream,2009
1,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
2,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969


In [220]:
songs_table = song_df.select('song_id', 'title', 'artist_id', 'year', 'duration')
artists_table = song_df.select('artist_id', 
                               col('artist_name').alias('name'), 
                               col('artist_location').alias('location'), 
                               col('artist_latitude').alias('latitude'), 
                               col('artist_longitude').alias('longitude')).dropDuplicates()

In [221]:
songs_table.show(3)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBLFFE12AF72AA5BA|              Scream|ARJNIUY12298900C91|2009| 213.9424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOCIWDW12A8C13D406|           Soul Deep|ARMJAGH1187FB546F3|1969|148.03546|
+------------------+--------------------+------------------+----+---------+
only showing top 3 rows



In [156]:
artists_table.show(3)

In [142]:
log_df = log_df.filter(col('page') == 'NextSong')

In [138]:
log_df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
1,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [145]:
users_table = log_df.select(col('userId').alias('user_id'), 
                            col('firstName').alias('first_name'),
                            col('lastName').alias('last_name'),
                            'gender',
                            'level')

In [191]:
get_timestamp = udf(lambda ts: str(int(ts)//1000))
log_df = log_df.withColumn('timestamp', get_timestamp('ts'))

In [197]:
get_datetime = udf(lambda timestamp: str(datetime.fromtimestamp(int(timestamp))))
log_df = log_df.withColumn('datetime', get_datetime('timestamp'))

In [202]:
log_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,sessionId,song,status,ts,userAgent,userId,timestampe,timestamp,datetime,hour
0,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,1541106106,1541106106,2018-11-01 14:01:46,14
1,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,1541106352,1541106352,2018-11-01 14:05:52,14
2,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,1541106496,1541106496,2018-11-01 14:08:16,14
3,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,139,Eriatarka,200,1541106673796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,1541106673,1541106673,2018-11-01 14:11:13,14
4,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,139,Becoming Insane,200,1541107053796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,1541107053,1541107053,2018-11-01 14:17:33,14


In [209]:
time_table = log_df.select(col('ts').alias('start_time'), 'datetime')\
    .withColumn('hour', hour('datetime'))\
    .withColumn('day', dayofmonth('datetime'))\
    .withColumn('week', weekofyear('datetime'))\
    .withColumn('month', month('datetime'))\
    .withColumn('year', year('datetime'))\
    .withColumn('weekday', dayofweek('datetime'))\
    .drop('datetime')

In [217]:
time_table.show(5)

+-------------+----+---+----+-----+----+-------+
|   start_time|hour|day|week|month|year|weekday|
+-------------+----+---+----+-----+----+-------+
|1541106106796|  14|  1|  44|   11|2018|      5|
|1541106352796|  14|  1|  44|   11|2018|      5|
|1541106496796|  14|  1|  44|   11|2018|      5|
|1541106673796|  14|  1|  44|   11|2018|      5|
|1541107053796|  14|  1|  44|   11|2018|      5|
+-------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [247]:
join_df = log_df.join(song_df, col('artist') == col('artist_name'))

In [261]:
# join_df.select("*")\
#     .withColumn("songplay_id", row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)\
#     .limit(10).toPandas()

In [272]:
songplays_table = join_df.withColumn("songplay_id", monotonically_increasing_id())\
    .select('songplay_id',
            col('datetime').alias('start_time'),
            col('userId').alias('user_id'),
            col('level').alias('level'),
            col('song_id').alias('song_id'),
            col('artist_id').alias('artist_id'),
            col('sessionId').alias('session_id'),
            col('location').alias('location'),
            col('userAgent').alias('user_agent'),
            year('datetime').alias('year'),
            month('datetime').alias('month'))

### Find songs with longest durations

In [283]:
songs_table.createOrReplaceTempView('songs')
qry = """
select title, duration
from songs
order by duration desc
limit 10
"""
spark.sql(qry).toPandas()

Unnamed: 0,title,duration
0,Oro Caldo,1108.24444
1,Exodus: Part I: Moses and Pharaoh,1047.71873
2,Welcome to the Pleasuredome,821.05424
3,Treflon,768.44363
4,Further,683.72853
5,My Friend the Moon,661.18485
6,The Czar: Usurper/Escape/Martyr/Spiral (Album ...,654.28853
7,Scrapple From The Apple,581.38077
8,Game & Watch,580.54485
9,Shark,576.07791


### User who are using our app and order by number of uses in locations

In [288]:
songplays_table.createOrReplaceTempView('songplays')
qry = """
select user_id, location, count(location) as cnt
from songplays
group by location, user_id
order by count(location) desc
limit 10
"""
spark.sql(qry).toPandas()

Unnamed: 0,user_id,location,cnt
0,49,"San Francisco-Oakland-Hayward, CA",47
1,80,"Portland-South Portland, ME",46
2,44,"Waterloo-Cedar Falls, IA",31
3,97,"Lansing-East Lansing, MI",30
4,29,"Atlanta-Sandy Springs-Roswell, GA",27
5,15,"Chicago-Naperville-Elgin, IL-IN-WI",24
6,24,"Lake Havasu City-Kingman, AZ",24
7,36,"Janesville-Beloit, WI",20
8,42,"New York-Newark-Jersey City, NY-NJ-PA",19
9,95,"Winston-Salem, NC",14
