### Import Needed Tools

In [87]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType, DateType, IntegerType

### Read Configrats

In [5]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']= config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

## Def create spark session Function
Returns a spark session ready 

In [7]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


In [8]:
spark = create_spark_session()

In [9]:
input_data = "data/"
output_data = ""

In [61]:
log_df = spark.read.json("data/logs/*.json")

In [62]:
log_df.count()

8056

In [102]:
song_df = spark.read.json("data/song_data/*/*/*/*.json" ) 

In [103]:
song_df.count()

71

## Wrangling log file

In [44]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [63]:
log_df = log_df.filter(log_df.page == 'NextSong')

In [65]:
log_df.limit(10).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [66]:
log_df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


## User Table Data Prep

In [82]:
uniqueUser = log_df.dropDuplicates((['userId'])).selectExpr('userId as user_id' , 'firstName as first_name' , 'lastName as last_name' , 'gender', 'level')

In [83]:
uniqueUser.limit(10).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free
5,11,Christian,Porter,F,free
6,29,Jacqueline,Lynch,F,paid
7,69,Anabelle,Simpson,F,free
8,42,Harper,Barrett,M,paid
9,73,Jacob,Klein,M,paid


## Time Table Data Prep

In [88]:
import  pyspark.sql.functions as F


get_timestamp = F.udf(lambda ts: datetime.fromtimestamp(ts/1000).isoformat())

log_df = log_df.withColumn('Start_Time', get_timestamp('ts').cast(TimestampType()))
log_df.limit(2).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,Start_Time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796


In [96]:
# time_table = log_df.select('start_time')
# time_table = time_table.withColumn('hour', F.hour('start_time'))
# time_table = time_table.withColumn('day', F.dayofmonth('start_time'))
# time_table = time_table.withColumn('week', F.weekofyear('start_time'))
# time_table = time_table.withColumn('month', F.month('start_time'))
# time_table = time_table.withColumn('year', F.year('start_time'))
# time_table = time_table.withColumn('weekday', F.dayofweek('start_time'))


In [93]:
log_df.createOrReplaceTempView("log_view")

timeData =spark.sql("""
SELECT  DISTINCT start_time,
            EXTRACT(hour FROM start_time)    AS hour,
            EXTRACT(day FROM start_time)     AS day,
            EXTRACT(week FROM start_time)    AS week,
            EXTRACT(month FROM start_time)   AS month,
            EXTRACT(year FROM start_time)    AS year,
            EXTRACT(week FROM start_time)    AS weekday
FROM    log_view
""")



In [94]:
timeData.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 15:46:12.796,15,15,46,11,2018,46
1,2018-11-15 16:54:20.796,16,15,46,11,2018,46
2,2018-11-15 18:30:33.796,18,15,46,11,2018,46
3,2018-11-21 03:17:55.796,3,21,47,11,2018,47
4,2018-11-21 18:56:50.796,18,21,47,11,2018,47


In [95]:
timeData.write.parquet(os.path.join('', 'time'))

## Song DATA PREP

In [104]:
song_df.createOrReplaceTempView("song_view")

In [None]:
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [106]:
songplays_table = spark.sql("""
        SELECT DISTINCT   se.start_time,
                          se.userId  as user_id , 
                          se.level  as level,
                          ss.song_id  as song_id,
                          ss.artist_id  as artist_id,
                          se.sessionId  as session_id,
                          se.location  as location,
                          se.userAgent  as user_agent
        FROM  log_view se JOIN song_view ss 
                                      ON  (se.song = ss.title) AND (se.artist = ss.artist_name) AND (ss.duration = se.length)
        """)

In [115]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
songplays_table2 = songplays_table.withColumn("row_num", row_number().over(w))



### songplays_table2.select('row_num','start_time','user_id','level','song_id','artist_id','session_id','location','user_agent').limit(1).toPandas()


In [None]:
song_id, title, artist_id, year, duration

In [109]:
songtable =song_df.select('song_id' , 'title' , 'artist_id' , 'year' , 'duration')

In [110]:
songtable.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771


In [111]:
song_df.limit(1).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0


In [None]:
artist_id, name, location, lattitude, longitude

In [113]:
artisttable =song_df.selectExpr('artist_id' , 'artist_name as name' , 'artist_location as location' , 'artist_latitude as lattitude' , 'artist_longitude as longitude')

In [114]:
artisttable.limit(10).toPandas()

Unnamed: 0,artist_id,name,location,lattitude,longitude
0,ARDR4AC1187FB371A1,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
1,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
2,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
3,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
4,ARDNS031187B9924F0,Tim Wilson,Georgia,32.67828,-83.22295
5,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",40.79086,-73.96644
6,ARLTWXK1187FB5A3F8,King Curtis,"Fort Worth, TX",32.74863,-97.32925
7,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",41.88415,-87.63241
8,ARI2JSK1187FB496EF,Nick Ingman;Gavyn Wright,"London, England",51.50632,-0.12714
9,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158
