In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from zipfile import ZipFile

In [2]:
from datetime import datetime
from pyspark.sql.functions import udf, col, to_date,to_timestamp, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType,DateType

In [11]:
with ZipFile('data/log-data.zip','r') as zip_ref:
    zip_ref.extractall('data/log-Testdata')

In [12]:
with ZipFile('data/song-data.zip','r') as zip_ref:
    zip_ref.extractall('data/local-Songdata')

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df_log = spark.read.json("data/log-Testdata")

In [15]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [5]:
df_song = spark.read.json("data/local-Songdata/song_data/*/*/*/*.json")

In [6]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [18]:
df_song.count()

71

In [19]:
df_log.count()

8056

In [9]:
#spark = create_spark_session()
df = spark.read.json("data/local-Songdata/song_data/*/*/*/*.json")

In [10]:
df.createOrReplaceTempView("song_table_data")

In [11]:
songs_table =  spark.sql(""" SELECT DISTINCT 
                             STD.SONG_ID AS SONG,
                             STD.TITLE,
                             STD.ARTIST_ID, 
                             STD.YEAR, 
                             STD.DURATION
                             FROM song_table_data STD
                             WHERE STD.SONG_ID IS NOT NULL 
                            """)

In [12]:
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|              SONG|               TITLE|         ARTIST_ID|YEAR| DURATION|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTUKVB12AB0181477|   Blessed Assurance|AR7ZKHQ1187B98DD73|1993|  270.602|
|SOMVWWT12A58A7AE05|Knocked Out Of Th...|ARQ9BO41187FB5CF1F|   0|183.17016|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOILPQQ12AB017E82A|Sohna Nee Sohna Data|AR1ZHYZ1187FB3C717|   0|599.24853|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOBCOSW12A8

In [25]:
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet('songs_table/')

In [128]:
artists_table = spark.sql("""  SELECT DISTINCT 
                               STD.ARTIST_ID, 
                               STD.ARTIST_NAME as NAME,
                               STD.ARTIST_LOCATION as LOCATION,
                               STD.ARTIST_LATITUDE as LATITUDE,
                               STD.ARTIST_LONGITUDE as LONGITUDE
                               FROM song_table_data STD
                               WHERE STD.ARTIST_ID IS NOT NULL
                               
                              """)
    

In [167]:
artists_table.show()

+------------------+--------------------+--------------------+---------------+----------------+
|         ARTIST_ID|         ARTIST_NAME|     ARTIST_LOCATION|ARTIST_LATITUDE|ARTIST_LONGITUDE|
+------------------+--------------------+--------------------+---------------+----------------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|ARBEBBY1187B9B43DB|           Tom Petty|     Gainesville, FL|           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|ARMBR4Y1187B9990EB|        David Martin|     California - SF|       37.77916|      -122.42005|
|ARD0S291187B9B7BF5|             Rated R|                Ohio|           null|            null|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|ARKRRTF1187B9984DA|    Sonora Santanera|                    |           null|            null|
|ARHHO3O1187B989413|           Bob Azzam

In [168]:
artists_table.write.mode('overwrite').parquet('artists_table/')

In [26]:
#spark = create_spark_session()
spark = SparkSession.builder.getOrCreate()
df = spark.read.json("data/log-Testdata")

In [27]:
df = df.filter(df.page == 'NextSong')

In [28]:
df.head(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [29]:
df.createOrReplaceTempView("log_table_data")

In [173]:
users_table = spark.sql("""
                            SELECT DISTINCT LTD.userID as user_id,
                            LTD.firstName as first_name,
                            LTD.lastName as last_name,
                            LTD.gender as gender,
                            LTD.level as level
                            FROM  log_table_data LTD
                            WHERE LTD.userId is NOT NULL
                            """
                            ) 

In [174]:
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
|     63|      Ayla|  Johnson|     F| free|
|     37|    Jordan|    Hicks|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     27|    Carlos|   Carter|     M| free|
|     89|   Kynnedi|  Sanchez|     F| free|
|     57| Katherine|      Gay|     F| free|
|     74|    Braden|   Parker|     M| free|
|     29|Jacqueline|    Lynch|     F| paid|
|     75|    Joseph|Gutierrez|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     88|  Mohammad|Rodriguez|     M| free|
|     64|    Hannah|  Calhoun|     F| free|
|     15|      Lily|     Koch|     F| free|
|     95|      Sara|  Johnson|  

In [175]:
users_table.write.mode('overwrite').parquet('users_table/')

In [30]:
get_timestamp = udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
df = df.withColumn("start_time", get_timestamp("ts"))

In [33]:
time_table = df.withColumn("hour",hour("start_time"))\
                    .withColumn("day",dayofmonth("start_time"))\
                    .withColumn("week",weekofyear("start_time"))\
                    .withColumn("month",month("start_time"))\
                    .withColumn("year",year("start_time"))\
                    .withColumn("weekday",dayofweek("start_time"))\
                    .select("ts","start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

In [34]:
time_table.show()

+-------------+--------------------+----+---+----+-----+----+-------+
|           ts|          start_time|hour|day|week|month|year|weekday|
+-------------+--------------------+----+---+----+-----+----+-------+
|1542279962796|2018-11-15 11:06:...|  11| 15|  46|   11|2018|      5|
|1542299805796|2018-11-15 16:36:...|  16| 15|  46|   11|2018|      5|
|1542765178796|2018-11-21 01:52:...|   1| 21|  47|   11|2018|      4|
|1542765513796|2018-11-21 01:58:...|   1| 21|  47|   11|2018|      4|
|1542778890796|2018-11-21 05:41:...|   5| 21|  47|   11|2018|      4|
|1542824767796|2018-11-21 18:26:...|  18| 21|  47|   11|2018|      4|
|1542171517796|2018-11-14 04:58:...|   4| 14|  46|   11|2018|      4|
|1543420593796|2018-11-28 15:56:...|  15| 28|  48|   11|2018|      4|
|1543444194796|2018-11-28 22:29:...|  22| 28|  48|   11|2018|      4|
|1543445588796|2018-11-28 22:53:...|  22| 28|  48|   11|2018|      4|
|1542099785796|2018-11-13 09:03:...|   9| 13|  46|   11|2018|      3|
|1543562532796|2018-

In [50]:
df_song = spark.read.json("data/local-Songdata/song_data/*/*/*/*.json")
df_song.createOrReplaceTempView("song_data_table")
df.createOrReplaceTempView("log_data_table")

In [51]:
songplays_table = spark.sql(""" SELECT monotonically_increasing_id() as songplay_id,
                                to_timestamp(logT.ts/1000) as start_time,
                                month(to_timestamp(logT.ts/1000)) as month,
                                year(to_timestamp(logT.ts/1000)) as year,
                                logT.userId as user_id,
                                logT.level as level,
                                songT.song_id as song_id,
                                songT.artist_id as artist_id,
                                logT.sessionId as session_id,
                                logT.location as location,
                                logT.userAgent as user_agent
                                FROM log_data_table logT
                                JOIN song_data_table songT on logT.artist = songT.artist_name and logT.song = songT.title
                            """)

In [52]:
songplays_table.show()

+-----------+--------------------+-----+----+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|month|year|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-----+----+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          0|2018-11-21 21:56:...|   11|2018|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+--------------------+-----+----+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [53]:
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet('songplays_table/')