In [1]:
# pip install sparksql-magic

In [1]:
import os
import json
from pyspark.sql import SparkSession
%load_ext sparksql_magic

# Create Spark Session

In [2]:
spark = SparkSession\
    .builder\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
    .getOrCreate()

In [22]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, TimestampType

# define song schema
song_schema = StructType([
  StructField("artist_id", StringType()),
  StructField("artist_name", StringType()),
  StructField("artist_latitude", DoubleType()),
  StructField("artist_longitude", DoubleType()),
  StructField("artist_location", StringType()),
  StructField("song_id", StringType()),
  StructField("duration", DoubleType()),
  StructField("title", StringType()),
  StructField("year", IntegerType()),
  StructField("num_songs", IntegerType())
])

# read song files
df_song = spark.read.json("data/song_data/A/A/*/*.json", schema=song_schema)
df_song.createOrReplaceTempView("songs_dataset")

# define log schema
log_schema = StructType([
  StructField("artist", StringType()),
  StructField("auth", StringType()),
  StructField("firstName", StringType()),
  StructField("gender", StringType()),
  StructField("itemInSession", IntegerType()),
  StructField("lastName", StringType()),
  StructField("length", DoubleType()),
  StructField("level", StringType()),
  StructField("location", StringType()),
  StructField("method", StringType()),
  StructField("page", StringType()),
  StructField("registration", DoubleType()),
  StructField("sessionId", IntegerType()),
  StructField("song", StringType()),
  StructField("status", IntegerType()),
  StructField("ts", DoubleType()),
  StructField("userAgent", StringType()),
  StructField("userId", StringType())
])

# read log files
df_log = spark.read.json("data/log-data/*.json", schema=log_schema)
df_log.createOrReplaceTempView("logs_dataset")

In [23]:
spark.sql("""
    SELECT DISTINCT
      to_timestamp(l.ts/1000) as start_time,
      YEAR(to_timestamp(l.ts/1000)) as year,
      MONTH(to_timestamp(l.ts/1000)) as month,
      l.userId as user_id,
      l.level as level,
      s.song_id as song_id,
      s.artist_id as artist_id,
      l.sessionId as session_id,
      s.artist_location as artist_location,
      l.userAgent as user_agent
    FROM songs_dataset s
    JOIN logs_dataset l
      ON s.title = l.song 
      AND s.artist_name = l.artist 
      AND s.duration = l.length
    WHERE l.page = 'NextSong'
    AND l.ts IS NOT NULL
    AND l.userId IS NOT NULL
    AND s.song_id IS NOT NULL
  """).toPandas()

Unnamed: 0,start_time,year,month,user_id,level,song_id,artist_id,session_id,artist_location,user_agent
0,2018-11-21 21:56:47.796,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


# Extracting Cubes

In [3]:
df_users = spark.read.parquet("spark-warehouse/users.parquet")
df_users.printSchema()
df_users.createOrReplaceTempView("users")

df_artists = spark.read.parquet("spark-warehouse/artists.parquet")
df_artists.printSchema()
df_artists.createOrReplaceTempView("artists")

df_songs = spark.read.parquet("spark-warehouse/songs.parquet")
df_songs.printSchema()
df_songs.createOrReplaceTempView("songs")

df_times = spark.read.parquet("spark-warehouse/times.parquet")
df_times.printSchema()
df_times.createOrReplaceTempView("times")

df_songplays = spark.read.parquet("spark-warehouse/songplays.parquet")
df_songplays.printSchema()
df_songplays.createOrReplaceTempView("songplays")

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

root
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nu

### Time table

In [6]:
%%sparksql -c results
    SELECT * FROM times LIMIT 10

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1,2,3,4,5,6
start_time,hour,day,week,month,year,weekday
2018-11-15 19:38:52.796000,19,15,46,11,2018,3
2018-11-21 01:16:52.796000,1,21,47,11,2018,2
2018-11-21 05:33:28.796000,5,21,47,11,2018,2
2018-11-21 09:26:12.796000,9,21,47,11,2018,2
2018-11-21 17:41:13.796000,17,21,47,11,2018,2
2018-11-21 19:16:24.796000,19,21,47,11,2018,2
2018-11-14 05:45:26.796000,5,14,46,11,2018,2
2018-11-14 07:00:15.796000,7,14,46,11,2018,2
2018-11-14 08:00:54.796000,8,14,46,11,2018,2


### Users table

In [5]:
%%sparksql -c results
    SELECT * FROM users LIMIT 10

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1,2,3,4
user_id,first_name,last_name,gender,level
88,Mohammad,Rodriguez,M,free
88,Mohammad,Rodriguez,M,paid
75,Joseph,Gutierrez,M,free
69,Anabelle,Simpson,F,free
11,Christian,Porter,F,free
53,Celeste,Williams,F,free
77,Magdalene,Herman,F,free
89,Kynnedi,Sanchez,F,free
61,Samuel,Gonzalez,M,free


### Artists table

In [7]:
%%sparksql -c results
    SELECT * FROM artists LIMIT 10

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1,2,3,4
artist_id,artist_name,location,latitude,longitude
ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
ARD842G1187B997376,Blue Rodeo,"Toronto, Ontario, Canada",43.64856,-79.38533
AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158
ARIG6O41187B988BDD,Richard Souther,United States,37.16793,-95.84502
AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955
ARAJPHH1187FB5566A,The Shangri-Las,"Queens, NY",40.7038,-73.83168


### Songs table

In [8]:
%%sparksql -c results
    SELECT * FROM songs LIMIT 10

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1,2,3,4
song_id,title,artist_id,year,duration
SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco Hell),ARDNS031187B9924F0,2005,186.48771
SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
SOUDSGM12AC9618304,Insatiable (Instrumental Version),ARNTLGG11E2835DDB9,0,266.39628
SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
SOBLGCN12AB0183212,James (Hold The Ladder Steady),AR47JEX1187B995D81,1985,124.86485
SOGNCJP12A58A80271,Do You Finally Need A Friend,ARB29H41187B98F0EF,1972,342.56934
SOGXHEG12AB018653E,It Makes No Difference Now,AR0RCMP1187FB3F427,1992,133.32853
SOBONFF12A6D4F84D8,Tonight Will Be Alright,ARIK43K1187B9AE54C,1986,307.3824


### Songplays table

In [9]:
%%sparksql -c results
    SELECT * FROM songplays LIMIT 10

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1,2,3,4,5,6,7,8,9
start_time,year,month,user_id,level,song_id,artist_id,session_id,artist_location,user_agent
2018-11-21 21:56:47.796000,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"""


### Users by level

In [23]:
%%sparksql -c results
    SELECT 
        DISTINCT level,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM users
    GROUP BY level

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1
level,percentage
free,78.85
paid,21.15


### Users by gender

In [24]:
%%sparksql -c results
    SELECT 
        DISTINCT gender,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM users
    GROUP BY gender

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1
gender,percentage
F,57.69
M,42.31


### Songs by artists

In [25]:
%%sparksql -c results
    SELECT 
        DISTINCT a.artist_name as artist_name,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM songs s
    JOIN artists a ON a.artist_id = s.artist_id
    GROUP BY artist_name

cache dataframe with lazy load
capture dataframe to local variable `results`
only showing top 20 row(s)


0,1
artist_name,percentage
Clp,5.56
The Box Tops,2.78
Faye Adams,2.78
Richard Souther,2.78
Five Bolt Main,2.78
Mike Jones (Featuring CJ_ Mello & Lil' Bran),2.78
Terry Callier,2.78
Billie Jo Spears,2.78
Faiz Ali Faiz,2.78


### Most executed songs

In [29]:
%%sparksql -c -l 100 results
    SELECT 
        COUNT(sp.*) as executions,
        s.song_id,
        s.title as song_title,
        a.artist_name,
        s.year as song_year
    FROM songplays sp
    JOIN artists a ON a.artist_id = sp.artist_id
    JOIN songs s ON s.song_id = sp.song_id
    GROUP BY s.song_id, song_title, artist_name, song_year
    ORDER BY executions DESC;

cache dataframe with lazy load
capture dataframe to local variable `results`


0,1,2,3,4
executions,song_id,song_title,artist_name,song_year
1,SOZCTXZ12AB0182364,Setanta matins,Elena,0
