In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
input_data = config['DATALAKE']['INPUT_DATA']
output_data = config['DATALAKE']['OUTPUT_DATA']


In [3]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

In [5]:
dfSourceSong = spark.read.json('./data/song-data/*/*/*/*.json').drop_duplicates()
dfSourceSong.printSchema()
dfSourceSong.show(3)

dfDimSongs = dfSourceSong.select('song_id', 'title', 'artist_id', 'duration').drop_duplicates()
dfDimSongs.printSchema()
dfDimSongs.show(3)

dfDimArtists = dfSourceSong.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').drop_duplicates()
dfDimArtists.printSchema()
dfDimArtists.show(3)

# dfDimUsers = dfSourceSong.select('user_id', 'first_name', 'last_name', 'gender', 'level').drop_duplicates()
# dfDimUsers.show(3)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------------+----+
|ARPFHN61187FB575F6|       41.88415|    Chicago, IL|       -87.63241|Lupe Fiasco|279.97995|        1|SOWQTQZ12A58A7B63E|Streets On Fire (.

In [7]:
dfLog = spark.read.json('./data/log-data/*.json')
dfLog = dfLog.filter(dfLog.page == "NextSong")
dfLog.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
dfDimUser = dfLog.select('userId', 'firstName', 'lastName', 'gender','level').drop_duplicates();
dfDimUser.printSchema()
dfDimUser.show(10)

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
|    15|     Lily|    Koch|     F| paid|
|    37|   Jordan|   Hicks|     F| free|
|    98|   Jordyn|  Powell|     F| free|
|    48|   Marina|  Sutton|     F| free|
|    17| Makinley|   Jones|     F| free|
+------+---------+--------+------+-----+
only showing top 10 rows



In [9]:
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import *

get_timestamp = udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
dfLog = dfLog.withColumn("parsed_ts", get_timestamp("ts"))
dfLog.show(5)
dfDimTimeTable = dfLog.withColumn("hour", hour("parsed_ts"))\
            .withColumn("day", dayofmonth("parsed_ts"))\
            .withColumn("week", weekofyear("parsed_ts"))\
            .withColumn("month", month("parsed_ts"))\
            .withColumn("year", year("parsed_ts"))\
            .withColumn("weekday", dayofweek("parsed_ts"))\
            .select("ts","hour", "day", "week", "month", "year", "weekday").drop_duplicates()
dfDimTimeTable.printSchema()
dfDimTimeTable.show(3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|           parsed_ts|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|  

In [10]:
from pyspark.sql.functions import monotonically_increasing_id
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
# join songplay_id, song_id, artist_id
dfFactSongPlays = dfLog.join(dfSourceSong, dfLog.song == dfSourceSong.title, how='inner')\
                    .select(monotonically_increasing_id().alias("songplay_id"),col("ts"),col("userId").alias("user_id"),"level","song_id","artist_id", col("sessionId").alias("session_id"), "location", col("userAgent").alias("user_agent"))

#TODO: join hour|day|week|month|year|weekday
dfFactSongPlays = dfFactSongPlays.join(dfDimTimeTable, dfFactSongPlays.ts == dfDimTimeTable.ts)\
                .select("songplay_id", dfFactSongPlays.ts, "user_id", "song_id", "artist_id", "location", "user_agent", "hour", "day", "week", "month", "year", "weekday")
dfFactSongPlays.printSchema()
dfFactSongPlays.show(10)

root
 |-- songplay_id: long (nullable = false)
 |-- ts: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-----------+-------------+-------+------------------+------------------+--------------------+--------------------+----+---+----+-----+----+-------+
|songplay_id|           ts|user_id|           song_id|         artist_id|            location|          user_agent|hour|day|week|month|year|weekday|
+-----------+-------------+-------+------------------+------------------+--------------------+--------------------+----+---+----+-----+----+-------+
|          0|1542837407796|     15|SOZCTXZ12AB0

In [13]:
dfFactSongPlays.createOrReplaceTempView("songs")
dfDimUser.createOrReplaceTempView("dim_user")
dfDimArtists.createOrReplaceTempView("dim_artist")
spark.sql("""
select a.artist_name, count(a.artist_name) as c from songs as s 
left join dim_user as u on s.user_id = u.userId 
left join dim_artist as a on s.artist_id = a.artist_id 
where u.gender = 'F'
GROUP BY a.artist_name
ORDER BY c desc
""").show(3)

+-----------+---+
|artist_name|  c|
+-----------+---+
|    40 Grit|  4|
|      Elena|  2|
+-----------+---+

