## Testing Functions

In [90]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.functions import udf, col, to_timestamp, from_unixtime,monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [91]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .getOrCreate()
    return spark

In [92]:
spark = create_spark_session()

In [93]:
# It retrieve only the data in folder song_data/A/A/A
song_data_read_path = os.path.join("s3a://udacity-dend/", "song_data/A/A/A/*.json")
log_data_read_path = os.path.join("s3a://udacity-dend/", "log-data/*/*/*.json")

In [94]:
# read song data file
song_data = spark.read.json(song_data_read_path)

In [95]:
log_data = spark.read.json(log_data_read_path)

#### Song Schema

In [96]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



#### Create Song table

In [97]:
song_data.select('song_id','artist_id','title','year','duration').show(n=5)

+------------------+------------------+--------------------+----+---------+
|           song_id|         artist_id|               title|year| duration|
+------------------+------------------+--------------------+----+---------+
|SOAFBCP12A8C13CC7D|ARTC1LV1187B9A4858|King Of Scurf (20...|1972|301.40036|
|SOKTJDS12AF72A25E5|ARA23XO1187B9AF18F|Drown In My Own T...|   0|  192.522|
|SOEKAZG12AB018837E|ARSVTNL1187B992A91|I'll Slap Your Fa...|2001|129.85424|
|SOQPWCR12A6D4FB2A3|AR73AIO1187B9AD57B|A Poor Recipe For...|2005|118.07302|
|SOBRKGM12A8C139EF6|ARXQBR11187B98A2CC|Welcome to the Pl...|1985|821.05424|
+------------------+------------------+--------------------+----+---------+
only showing top 5 rows



#### Create Artist table

In [98]:
song_data.select('artist_id','artist_name', 'artist_location','artist_latitude', 'artist_longitude').limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARTC1LV1187B9A4858,The Bonzo Dog Band,"Goldsmith's College, Lewisham, Lo",51.4536,-0.01802
1,ARA23XO1187B9AF18F,The Smithereens,"Carteret, New Jersey",40.57885,-74.21956
2,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
3,AR73AIO1187B9AD57B,Western Addiction,"San Francisco, CA",37.77916,-122.42005
4,ARXQBR11187B98A2CC,Frankie Goes To Hollywood,"Liverpool, England",,


#### LOG Schema

In [99]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [100]:
log_data_next_song = log_data.where(log_data['page'] == 'NextSong')

#### Create User Table

In [101]:
log_data_next_song.select('userId','firstName','lastName','gender','level').dropDuplicates().limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,57,Katherine,Gay,F,free
1,84,Shakira,Hunt,F,free
2,22,Sean,Wilson,F,free
3,52,Theodore,Smith,M,free
4,80,Tegan,Levine,F,paid


#### Create Time Table

In [115]:
get_timestamp = udf(lambda x: x/1000, IntegerType())
test_df = log_data_next_song.withColumn('start_time', get_timestamp(log_data_next_song['ts']))
test_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,


In [116]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))

In [117]:
log_timestamp = log_data_next_song.withColumn('start_time', get_timestamp(log_data_next_song['ts']))

In [118]:
log_timestamp.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55


In [107]:
time_table = log_timestamp.withColumn('hour',       hour(log_timestamp['start_time'])) \
        .withColumn('day',        dayofmonth(log_timestamp['start_time'])) \
        .withColumn('week',       weekofyear(log_timestamp['start_time'])) \
        .withColumn('month',      month(log_timestamp['start_time'])) \
        .withColumn('year',       year(log_timestamp['start_time'])) \
        .withColumn('weekday',    dayofweek(log_timestamp['start_time'])) \
        .select('start_time', 'hour', 'day','week', 'month', 'year', 'weekday') \
        .dropDuplicates()

In [108]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 12:38:03,12,15,46,11,2018,5
1,2018-11-15 22:00:58,22,15,46,11,2018,5
2,2018-11-21 19:00:45,19,21,47,11,2018,4
3,2018-11-21 20:22:17,20,21,47,11,2018,4
4,2018-11-21 22:26:57,22,21,47,11,2018,4


#### Songplays Table

In [112]:
songplays_table = log_timestamp.join(song_data, (log_timestamp.song == song_data.title) & (log_timestamp.artist == song_data.artist_name) & (log_data.length == song_data.duration), 'left_outer')\
        .select(
            monotonically_increasing_id().alias('songplay_id'),
            log_data.ts,
            col("userId").alias('user_id'),
            log_data.level,
            song_data.song_id,
            song_data.artist_id,
            col("sessionId").alias("session_id"),
            log_data.location,
            col("useragent").alias("user_agent"),
            year('start_time').alias('year'),
            month('start_time').alias('month')
        )

In [113]:
songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,ts,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,1542241826796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
1,1,1542242481796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
2,2,1542242741796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
3,3,1542253449796,61,free,,,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
4,4,1542260935796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
