# Process Log data

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import *

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
%%time
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

CPU times: user 75.2 ms, sys: 34 ms, total: 109 ms
Wall time: 15.4 s


# Save S3 data

In [4]:
output_data = "s3a://salas-bucket/temp-save/"

# Log data

## Check log data folder structure

In [5]:
import boto3 

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('udacity-dend')
log_data_list = []
for object_summary in my_bucket.objects.filter(Prefix="log-data/"):
    log_data_list.append(object_summary.key)

In [6]:
log_data_list

['log-data/',
 'log-data/2018/11/2018-11-01-events.json',
 'log-data/2018/11/2018-11-02-events.json',
 'log-data/2018/11/2018-11-03-events.json',
 'log-data/2018/11/2018-11-04-events.json',
 'log-data/2018/11/2018-11-05-events.json',
 'log-data/2018/11/2018-11-06-events.json',
 'log-data/2018/11/2018-11-07-events.json',
 'log-data/2018/11/2018-11-08-events.json',
 'log-data/2018/11/2018-11-09-events.json',
 'log-data/2018/11/2018-11-10-events.json',
 'log-data/2018/11/2018-11-11-events.json',
 'log-data/2018/11/2018-11-12-events.json',
 'log-data/2018/11/2018-11-13-events.json',
 'log-data/2018/11/2018-11-14-events.json',
 'log-data/2018/11/2018-11-15-events.json',
 'log-data/2018/11/2018-11-16-events.json',
 'log-data/2018/11/2018-11-17-events.json',
 'log-data/2018/11/2018-11-18-events.json',
 'log-data/2018/11/2018-11-19-events.json',
 'log-data/2018/11/2018-11-20-events.json',
 'log-data/2018/11/2018-11-21-events.json',
 'log-data/2018/11/2018-11-22-events.json',
 'log-data/2018/11

## log data paths

In [8]:
# get filepath to log data file
input_data = "s3a://udacity-dend/"
log_data = os.path.join(input_data, 'log_data/*/*/*.json')
log_data

's3a://udacity-dend/log_data/*/*/*.json'

## create dataframe

In [9]:
%%time
log_df = spark.read.json(log_data)

CPU times: user 4.09 ms, sys: 4.06 ms, total: 8.15 ms
Wall time: 24.9 s


In [None]:
log_df.printSchema()

In [None]:
log_df

## Filter by location 

In [10]:
# filter by actions for song plays
log_df = log_df.filter(log_df.page == 'NextSong')
log_df

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11...",26
The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11...",26
Train,Logged In,Ryan,M,2,Smith,205.45261,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11...",26
Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,Houston-The Woodl...,PUT,NextSong,1540492941796.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Mac...",61
Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Best Of Both Worl...,200,1542260935796,"""Mozilla/5.0 (Mac...",80
Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Call Me If You Ne...,200,1542261224796,"""Mozilla/5.0 (Mac...",80
Edward Sharpe & T...,Logged In,Tegan,F,4,Levine,306.31138,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Mac...",80
Usher featuring w...,Logged In,Tegan,F,5,Levine,395.72853,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Mac...",80
Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Mac...",80
Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Mac...",80


---
# Create`users_table`
- change column names
- create `user_table`

In [None]:
users_fields = ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
users_table = log_df.selectExpr(users_fields).dropDuplicates()

In [None]:
users_table.printSchema()

In [None]:
users_table

# Save Table

```bash
CPU times: user 183 ms, sys: 37.1 ms, total: 220 ms
Wall time: 16min 56s
```

In [None]:
%%time
users_table.write.parquet(os.path.join(output_data, "users/") , mode="overwrite")

---
# Create `time_table`

## create `start_time` column
- use `ts` column
- convert int to timestamp

In [11]:
get_timestamp = udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
log_df = log_df.withColumn("start_time", get_timestamp("ts"))

In [12]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [13]:
log_df

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11...",26,2018-11-15 00:30:...
The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11...",26,2018-11-15 00:41:...
Train,Logged In,Ryan,M,2,Smith,205.45261,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11...",26,2018-11-15 00:45:...
Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,Houston-The Woodl...,PUT,NextSong,1540492941796.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Mac...",61,2018-11-15 03:44:...
Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Best Of Both Worl...,200,1542260935796,"""Mozilla/5.0 (Mac...",80,2018-11-15 05:48:...
Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Call Me If You Ne...,200,1542261224796,"""Mozilla/5.0 (Mac...",80,2018-11-15 05:53:...
Edward Sharpe & T...,Logged In,Tegan,F,4,Levine,306.31138,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Mac...",80,2018-11-15 05:55:...
Usher featuring w...,Logged In,Tegan,F,5,Levine,395.72853,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Mac...",80,2018-11-15 06:01:...
Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Mac...",80,2018-11-15 06:07:...
Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Mac...",80,2018-11-15 06:10:...


## create `time_table`
- extract data from `start_time` column
    

In [14]:
time_table = log_df.withColumn("hour",hour("start_time"))\
                .withColumn("day",dayofmonth("start_time"))\
                .withColumn("week",weekofyear("start_time"))\
                .withColumn("month",month("start_time"))\
                .withColumn("year",year("start_time"))\
                .withColumn("weekday",dayofweek("start_time"))\
                .select("start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

In [15]:
time_table

start_time,hour,day,week,month,year,weekday
2018-11-15 14:09:...,14,15,46,11,2018,5
2018-11-15 15:24:...,15,15,46,11,2018,5
2018-11-15 16:31:...,16,15,46,11,2018,5
2018-11-15 19:22:...,19,15,46,11,2018,5
2018-11-21 17:40:...,17,21,47,11,2018,4
2018-11-14 13:30:...,13,14,46,11,2018,4
2018-11-14 17:53:...,17,14,46,11,2018,4
2018-11-28 14:18:...,14,28,48,11,2018,4
2018-11-28 19:44:...,19,28,48,11,2018,4
2018-11-05 09:55:...,9,5,45,11,2018,2


# Save time table
```bash
CPU times: user 300 ms, sys: 68.8 ms, total: 369 ms
Wall time: 47min 54s
```

In [None]:
%%time
time_table.write.parquet(os.path.join(output_data, "time_table/"), mode='overwrite', partitionBy=["year","month"])

# load song table from prev notebook

In [16]:
%%time
song_df = spark.read.format("parquet").option("basePath", os.path.join(output_data, "songs/")).load(os.path.join(output_data, "songs/*/*/"))

CPU times: user 4.57 ms, sys: 0 ns, total: 4.57 ms
Wall time: 18.1 s


In [17]:
song_df

song_id,title,duration,year,artist_id
SOKTJDS12AF72A25E5,Drown In My Own T...,192.522,0,ARA23XO1187B9AF18F
SOEKAZG12AB018837E,I'll Slap Your Fa...,129.85424,2001,ARSVTNL1187B992A91
SOAFBCP12A8C13CC7D,King Of Scurf (20...,301.40036,1972,ARTC1LV1187B9A4858
SORRNOC12AB017F52B,The Last Beat Of ...,337.81506,2004,ARSZ7L31187FB4E610
SOQPWCR12A6D4FB2A3,A Poor Recipe For...,118.07302,2005,AR73AIO1187B9AD57B
SODZYPO12A8C13A91E,Burn My Body (Alb...,177.99791,0,AR1C2IX1187B99BF74
SOBRKGM12A8C139EF6,Welcome to the Pl...,821.05424,1985,ARXQBR11187B98A2CC
SOERIDA12A6D4F8506,I Want You (Album...,192.28689,2006,ARBZIN01187FB362CC
SOAPERH12A58A787DC,The One And Only ...,230.42567,0,ARZ5H0P1187B98A1DD
SOSMJFC12A8C13DE0C,Is That All There...,343.87546,0,AR1KTV21187B9ACD72


In [18]:
df = log_df

# extract columns from joined song and log datasets to create songplays table 

In [19]:
%%time
songplays_table = df.join(song_df, df.song == song_df.title, how='inner').select(monotonically_increasing_id().alias("songplay_id"),col("start_time"),col("userId").alias("user_id"),"level","song_id","artist_id", col("sessionId").alias("session_id"), "location", col("userAgent").alias("user_agent"))    

CPU times: user 8.75 ms, sys: 76 µs, total: 8.82 ms
Wall time: 104 ms


In [20]:
songplays_table

songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-15 16:19:...,97,paid,SOBLFFE12AF72AA5BA,ARJNIUY12298900C91,605,Lansing-East Lans...,"""Mozilla/5.0 (X11..."


In [21]:
%%time
songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner").select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent")

CPU times: user 3.42 ms, sys: 3.4 ms, total: 6.83 ms
Wall time: 180 ms


In [22]:
songplays_table

songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-15 16:19:...,97,paid,SOBLFFE12AF72AA5BA,ARJNIUY12298900C91,605,Lansing-East Lans...,"""Mozilla/5.0 (X11..."


# write songplays table to parquet files partitioned by year and month

```bash

CPU times: user 55.4 ms, sys: 24 ms, total: 79.4 ms
Wall time: 2min 4s
```

In [None]:
%%time
songplays_table.drop_duplicates().write.parquet(os.path.join(output_data, "songplays/"), mode="overwrite", partitionBy=["year","month"])