In [15]:
from pyspark.sql import SparkSession
import pandas as pd
from zipfile import ZipFile
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, TimestampType
import datetime
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
import os

# with ZipFile('data/song-data.zip','r') as songFileRead:
    #songFileRead.extractall('data/readSongs')
    
# with ZipFile('data/log-data.zip','r') as logFileRead:
    #logFileRead.extractall('data/readLogs')

In [None]:
input_data = 's3://blak/'
joined = os.path.join(input_data,'log_data')
print(joined)

In [16]:
spark = SparkSession.builder.config('spark.ui.port',3000).appName("P4").getOrCreate()

In [None]:
spark

In [17]:
    # get filepath to song data file
song_data = spark.read.json("data/readSongs/song_data/A/A/A/*.json")
log_data = spark.read.json("data/readLogs")
# print(song_data.count())
#print(song_data.show(5))

In [18]:
log_data = log_data.filter(log_data.page=="NextSong")
#print(log_data.show(5))

In [34]:
print(song_data.printSchema())
print(log_data.printSchema())

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

None
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullab

In [34]:
# song_data.head(5)
# songs_table = song_data.select(['song_id', 'title', 'artist_id', 'year', 'duration']).groupBy("song_id").show(15) ## I need to learn how to use groupBy()

songs_table = song_data.dropDuplicates(['song_id']).select(['song_id', 'title', 'artist_id', 'year', 'duration'])
#songs_table = song_data.select(['song_id', 'title', 'artist_id', 'year', 'duration'])

songs_table.printSchema()
#print(type(songs_table))
# songs_table = spark.createDataFrame(songs_table1)
# print(type(songs_table))

# songsParquet = songs_table.write.partitionBy('year','duration').parquet('songs.parquet')


root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [32]:
artists_table = song_data.select(['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']) # create quality check for null and blank
artists_table.printSchema()
# artistsParquet = artists_table.write.parquet('artist.parquet')

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [None]:
spark.read.parquet('artist.parquet').show()

In [23]:
   
users_table = log_data.dropDuplicates(['userId']).select(col('userId').alias('user_id'),\
                            'firstName', 'lastName', 'gender','level',)
users_table.printSchema()
# usersParquest = users_table.write.parquet('users.parquet')

root
 |-- user_id: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [19]:
df = log_data.ts
print(datetime.datetime.fromtimestamp(1542241826796/1000))

get_timestamp = udf(lambda x : datetime.datetime.fromtimestamp(x/1000), TimestampType())
y = 1542241826796

hours_added = log_data.withColumn("hours",get_timestamp(df))
#print(get_timestamp(df))
onlyHours = hours_added.select(col('hours').alias('Start_Time'),\
                               date_format('hours', 'E').alias('Date'), 
                               hour('hours').alias('Hour'), 
                               dayofmonth('hours').alias('Day'),\
                               weekofyear('hours').alias("Week"),
                               month('hours').alias('Month'),\
                               year('hours').alias('Year'), 
                               dayofweek('hours').alias('Day_of_Week'))
onlyHours.printSchema()
#print(hours_added.select('hours').show(n=10,truncate = False))

#print(log_data.lastName)

2018-11-15 00:30:26.796000
root
 |-- Start_Time: timestamp (nullable = true)
 |-- Date: string (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Day_of_Week: integer (nullable = true)



In [None]:
timeParquet = onlyHours.write.partitionBy("Year","Month").parquet("time.parquet")

In [14]:
song_df = log_data.join(song_data, (log_data.artist == song_data.artist_name) & (log_data.length == song_data.duration) & (log_data.song == song_data.title), 'full')
# Inner join has too few of records

print(song_df.show())

+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+---------+---------------+---------------+----------------+-----------+--------+---------+-------+-----+----+
|              artist|     auth| firstName|gender|itemInSession| lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|artist_id|artist_latitude|artist_location|artist_longitude|artist_name|duration|num_songs|song_id|title|year|
+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+---------+---------------+---------------+----------------+-----------+--------+---------+-------+-----+-

In [30]:
song_df = song_df.withColumn("start_time", get_timestamp(song_df.ts))
song_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)

In [31]:
songplays_table = song_df.select(\
                            song_df.start_time,
                           col("userId").alias("user_id"),
                        'level',
                        'song_id',
                        'artist_id',
                        col("sessionId").alias("session_id"),
                        'location',
                        col("userAgent").alias("user_agent"),
                        year(song_df.start_time).alias('year'),
                        month(song_df.start_time).alias('month')
                            )
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [None]:
songplays_table111 = song_df.select(['level','sessionId']).show(5)

In [None]:
songplaysParquet = songplays_table.write.partitionBy('year','month').parquet('songplays.parquet')

In [None]:
spark.read.json("data/readSongs/song_data/A/A/A/*.json").show()