In [1]:
import configparser
from datetime import datetime
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,from_unixtime
from pyspark.sql.types import *


In [2]:
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 500)

In [3]:
#spark = SparkSession.builder.appName("dev").master("local[*]").getOrCreate()
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [6]:
spark.sparkContext.getConf().getAll()

[('spark.submit.pyFiles',
  '/root/.ivy2/jars/org.apache.hadoop_hadoop-aws-2.7.0.jar,/root/.ivy2/jars/org.apache.hadoop_hadoop-common-2.7.0.jar,/root/.ivy2/jars/com.fasterxml.jackson.core_jackson-databind-2.2.3.jar,/root/.ivy2/jars/com.fasterxml.jackson.core_jackson-annotations-2.2.3.jar,/root/.ivy2/jars/com.amazonaws_aws-java-sdk-1.7.4.jar,/root/.ivy2/jars/org.apache.hadoop_hadoop-annotations-2.7.0.jar,/root/.ivy2/jars/com.google.guava_guava-11.0.2.jar,/root/.ivy2/jars/commons-cli_commons-cli-1.2.jar,/root/.ivy2/jars/org.apache.commons_commons-math3-3.1.1.jar,/root/.ivy2/jars/xmlenc_xmlenc-0.52.jar,/root/.ivy2/jars/commons-httpclient_commons-httpclient-3.1.jar,/root/.ivy2/jars/commons-codec_commons-codec-1.4.jar,/root/.ivy2/jars/commons-io_commons-io-2.4.jar,/root/.ivy2/jars/commons-net_commons-net-3.1.jar,/root/.ivy2/jars/commons-collections_commons-collections-3.2.1.jar,/root/.ivy2/jars/javax.servlet_servlet-api-2.5.jar,/root/.ivy2/jars/org.mortbay.jetty_jetty-6.1.26.jar,/root/.ivy2

In [5]:
df_song_data = spark.read.json("data/song_data/*/*/*/*.json")
df_log_data = spark.read.json("data/log_data/*.json")

In [44]:
df_song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [45]:
df_song_data.createOrReplaceTempView('songs_dataset')

## Table schemas
- Fact Table
   songplays: records in log data associated with song plays i.e. records with page NextSong
   - songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

### Dimension Tables
- users - users in the app
   - user_id, first_name, last_name, gender, level
- songs - songs in music database
   - song_id, title, artist_id, year, duration
- artists - artists in music database
  - artist_id, name, location, lattitude, longitude
- time - timestamps of records in songplays broken down into specific units
  - start_time, hour, day, week, month, year, weekday

In [56]:
## creating song table

dim_songs = spark.sql("""
    SELECT
        Distinct
        song_id,
        title,
        artist_id, 
        year,
        duration
        
    FROM songs_dataset 
    
    where song_id IS NOT NULL

""")


In [58]:
dim_songs.drop_duplicates().count()

71

In [59]:
# creating artists table
dim_artists = spark.sql("""
                            SELECT DISTINCT 
                            artist_id, 
                            artist_name,
                            artist_location,
                            artist_latitude,
                            artist_longitude
                            FROM songs_dataset
                            WHERE artist_id IS NOT NULL
                        """)

In [64]:
dim_artists.count()

69

In [37]:
df_log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [65]:
df_log_filter = df_log_data.filter(df_log_data.page == 'NextSong')

In [67]:
df_log_data.count()


8056

In [68]:
df_log_filter.count()

6820

In [69]:
df_log_filter.createOrReplaceTempView("log_dataset")

In [71]:
  # extract columns for users table    
dim_users = spark.sql("""
                        SELECT DISTINCT 
                        userId as user_id, 
                        firstName as first_name,
                        lastName as last_name,
                        gender as gender,
                        level as level
                        FROM log_dataset 
                        WHERE userId IS NOT NULL
                        
                    """)


In [72]:
dim_users.count()

104

In [75]:
# extract columns to create time table

dim_time = spark.sql("""


                with A as (
                SELECT to_timestamp(ts/1000) as ts
                                        FROM log_dataset 
                                        WHERE ts IS NOT NULL

                )

                SELECT 
                    A.ts as start_time,
                    hour(A.ts) as hour,
                    dayofmonth(A.ts) as day,
                    weekofyear(A.ts) as week,
                    month(A.ts) as month,
                    year(A.ts) as year,
                    dayofweek(A.ts) as weekday

                FROM A

""")

In [76]:
dim_time.limit(10).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,5
1,2018-11-15 00:41:21.796,0,15,46,11,2018,5
2,2018-11-15 00:45:41.796,0,15,46,11,2018,5
3,2018-11-15 03:44:09.796,3,15,46,11,2018,5
4,2018-11-15 05:48:55.796,5,15,46,11,2018,5
5,2018-11-15 05:53:44.796,5,15,46,11,2018,5
6,2018-11-15 05:55:56.796,5,15,46,11,2018,5
7,2018-11-15 06:01:02.796,6,15,46,11,2018,5
8,2018-11-15 06:07:37.796,6,15,46,11,2018,5
9,2018-11-15 06:10:33.796,6,15,46,11,2018,5


In [77]:
dim_time.count()

6820

In [78]:
# creating Fact table
songplays_table = spark.sql("""
                                SELECT 
                                monotonically_increasing_id() as songplay_id,
                                to_timestamp(logD.ts/1000) as start_time,
                                month(to_timestamp(logD.ts/1000)) as month,
                                year(to_timestamp(logD.ts/1000)) as year,
                                logD.userId as user_id,
                                logD.level as level,
                                songD.song_id as song_id,
                                songD.artist_id as artist_id,
                                logD.sessionId as session_id,
                                logD.location as location,
                                logD.userAgent as user_agent
                                FROM log_dataset logD
                                JOIN songs_dataset songD
                                on 
                                logD.artist = songD.artist_name
                                                and logD.song = songD.title
                            """)

In [79]:
songplays_table.toPandas()

Unnamed: 0,songplay_id,start_time,month,year,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,2018-11-21 21:56:47.796,11,2018,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [16]:
test = df_log_data.select(df_log_data.ts).limit(10)


In [52]:
d =test.withColumn('timeStamp', from_unixtime(test.ts/1000))

In [53]:
d.printSchema()

root
 |-- ts: long (nullable = true)
 |-- timeStamp: string (nullable = true)



In [2]:
import configparser
import os
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [20]:
df = spark.read.json("s3a://udacity-dend/log_data/*/*/*.json")

In [21]:
df.count()

8056

In [22]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [17]:
!aws s3 ls udacity-dend/log_data/2018/11/


2019-04-17 12:03:13       7151 2018-11-01-events.json
2019-04-17 12:03:13      83585 2018-11-02-events.json
2019-04-17 12:03:13      54084 2018-11-03-events.json
2019-04-17 12:03:13      85671 2018-11-04-events.json
2019-04-17 12:03:13     189295 2018-11-05-events.json
2019-04-17 12:03:13      85373 2018-11-06-events.json
2019-04-17 12:03:13      97519 2018-11-07-events.json
2019-04-17 12:03:13     102218 2018-11-08-events.json
2019-04-17 12:03:13     134804 2018-11-09-events.json
2019-04-17 12:03:13      44076 2018-11-10-events.json
2019-04-17 12:03:13      43711 2018-11-11-events.json
2019-04-17 12:03:13      99854 2018-11-12-events.json
2019-04-17 12:03:13     186826 2018-11-13-events.json
2019-04-17 12:03:13     217264 2018-11-14-events.json
2019-04-17 12:03:13     243143 2018-11-15-events.json
2019-04-17 12:03:13     175491 2018-11-16-events.json
2019-04-17 12:03:13      66164 2018-11-17-events.json
2019-04-17 12:03:13      75763 2018-11-18-events.json
2019-04-17 12:03:13     1507

2019-04-17 03:20:41        225 TRAAAAK128F9318786.json
2019-04-17 03:20:41        284 TRAAAAV128F421A322.json
2019-04-17 03:20:41        249 TRAAABD128F429CF47.json
2019-04-17 03:20:41        243 TRAAACN128F9355673.json
2019-04-17 03:20:41        289 TRAAAEA128F935A30D.json
2019-04-17 03:20:41        231 TRAAAED128E0783FAB.json
2019-04-17 03:20:41        228 TRAAAEM128F93347B9.json
2019-04-17 03:20:41        248 TRAAAEW128F42930C0.json
2019-04-17 03:20:41        225 TRAAAFD128F92F423A.json
2019-04-17 03:20:41        250 TRAAAGR128F425B14B.json
2019-04-17 03:20:41        256 TRAAAHD128F42635A5.json
2019-04-17 03:20:41        271 TRAAAHJ128F931194C.json
2019-04-17 03:20:41        263 TRAAAHZ128E0799171.json
2019-04-17 03:20:41        254 TRAAAIR128F1480971.json
2019-04-17 03:20:41        278 TRAAAJN128F428E437.json
2019-04-17 03:20:41        223 TRAAAND12903CD1F1B.json
2019-04-17 03:20:41        259 TRAAANK128F428B515.json
2019-04-17 03:20:41        303 TRAAAOF128F429C156.json
2019-04-17

### Reading Data from DW

In [4]:
artist  = spark.read.parquet('./data/dw/artists/*.parquet')
artist.limit(10).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
1,AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
2,ARI2JSK1187FB496EF,Nick Ingman;Gavyn Wright,"London, England",51.50632,-0.12714
3,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
4,ARD842G1187B997376,Blue Rodeo,"Toronto, Ontario, Canada",43.64856,-79.38533
5,ARDR4AC1187FB371A1,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
6,AR47JEX1187B995D81,SUE THOMPSON,"Nevada, MO",37.83721,-94.35868
7,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158
8,ARIG6O41187B988BDD,Richard Souther,United States,37.16793,-95.84502
9,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955


In [8]:
songplays  = spark.read.parquet('./data/dw/songplays_table/year=*/month=*/*.parquet')
songplays.limit(10).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [10]:
songs_table  = spark.read.parquet('./data/dw/songs_table/year=*/artist_id=*/*.parquet')
songs_table.limit(10).toPandas()

Unnamed: 0,song_id,title,duration
0,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281
1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,186.48771
2,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),326.00771
3,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),267.7024
4,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),279.97995
5,SOUDSGM12AC9618304,Insatiable (Instrumental Version),266.39628
6,SOPEGZN12AB0181B3D,Get Your Head Stuck On Your Neck,45.66159
7,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,207.77751
8,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,511.16363
9,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),173.66159


In [14]:
time_table = spark.read.parquet("./data/dw/time_table/year=*/month=*/*.parquet")
time_table.limit(10).toPandas()

Unnamed: 0,start_time,hour,day,week,weekday
0,2018-11-15 00:30:26.796,0,15,46,5
1,2018-11-15 00:41:21.796,0,15,46,5
2,2018-11-15 00:45:41.796,0,15,46,5
3,2018-11-15 03:44:09.796,3,15,46,5
4,2018-11-15 05:48:55.796,5,15,46,5
5,2018-11-15 05:53:44.796,5,15,46,5
6,2018-11-15 05:55:56.796,5,15,46,5
7,2018-11-15 06:01:02.796,6,15,46,5
8,2018-11-15 06:07:37.796,6,15,46,5
9,2018-11-15 06:10:33.796,6,15,46,5


In [15]:
users_table = spark.read.parquet("./data/dw/users_table/*.parquet")
users_table.limit(10).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,88,Mohammad,Rodriguez,M,free
1,88,Mohammad,Rodriguez,M,paid
2,53,Celeste,Williams,F,free
3,11,Christian,Porter,F,free
4,69,Anabelle,Simpson,F,free
5,77,Magdalene,Herman,F,free
6,75,Joseph,Gutierrez,M,free
7,61,Samuel,Gonzalez,M,free
8,89,Kynnedi,Sanchez,F,free
9,45,Dominick,Norris,M,free


In [16]:
users_table.count()

104