In [1]:
from pyspark import SparkConf

In [2]:
from pyspark.sql import SparkSession

In [None]:
from pyspark.sql.types import IntegerType

In [None]:
from pyspark.sql.functions import udf
import datetime

In [4]:
spark = SparkSession.builder.appName('anewapp').getOrCreate()

In [11]:
in_song = "data/song_data/*/*/*/*.json"

In [9]:
in_log = "data/log-data"

In [54]:
songdata = spark.read.json(in_song)

In [55]:
songdata.head()

Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)

In [18]:
logs = spark.read.json(in_log)

In [20]:
logs.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')

In [21]:
logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Songplays

Not using sql style, instead using python style.

In [299]:
songplays = logs.join(songdata, (songdata.artist_name == logs.artist) & (songdata.title == logs.song) & (logs.length ==songdata.duration))

In [300]:
songplays = songplays.filter("page = 'NextSong'")

In [301]:
songplays = songplays.withColumn('start_time', get_time(songplays.ts)).select('start_time', songplays.userId.alias('user_id'), 'level', 'song_id', 'artist_id', songplays.sessionId.cast(IntegerType()).alias('session_id'), 'location', songplays.userAgent.alias('user_agent'))

In [302]:
songplays.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [303]:
songplays.show()

+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [304]:
songplays.select('artist_name', 'artist', 'title', 'song', 'length', 'duration','page').show()

AnalysisException: "cannot resolve '`artist_name`' given input columns: [session_id, user_id, song_id, artist_id, level, location, start_time, user_agent];;\n'Project ['artist_name, 'artist, 'title, 'song, 'length, 'duration, 'page]\n+- Project [start_time#3053, userId#145 AS user_id#3083, level#135, song_id#724, artist_id#717, cast(sessionId#140L as int) AS session_id#3084, location#136, userAgent#144 AS user_agent#3085]\n   +- Project [artist#128, auth#129, firstName#130, gender#131, itemInSession#132L, lastName#133, length#134, level#135, location#136, method#137, page#138, registration#139, sessionId#140L, song#141, status#142L, ts#143L, userAgent#144, userId#145, artist_id#717, artist_latitude#718, artist_location#719, artist_longitude#720, artist_name#721, duration#722, ... 5 more fields]\n      +- Filter (page#138 = NextSong)\n         +- Join Inner, (((artist_name#721 = artist#128) && (title#725 = song#141)) && (length#134 = duration#722))\n            :- Relation[artist#128,auth#129,firstName#130,gender#131,itemInSession#132L,lastName#133,length#134,level#135,location#136,method#137,page#138,registration#139,sessionId#140L,song#141,status#142L,ts#143L,userAgent#144,userId#145] json\n            +- Relation[artist_id#717,artist_latitude#718,artist_location#719,artist_longitude#720,artist_name#721,duration#722,num_songs#723L,song_id#724,title#725,year#726L] json\n"

# time

In [306]:
time = logs.select(logs.ts).dropDuplicates()

In [307]:
time.dtypes

[('ts', 'bigint')]

In [308]:
get_time = udf(lambda x: str(datetime.datetime.fromtimestamp(x / 1000.0)))

In [309]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)

In [310]:
get_week = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).isocalendar()[1])

In [311]:
get_year = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).year)


In [312]:
get_month = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).month)

In [313]:
get_weekday = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).isoweekday())

In [314]:
get_day = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)

In [315]:
time = time.withColumn('start_time', get_time(time.ts))

In [316]:
time = time.withColumn('hour', get_hour(time.ts))

In [317]:
time = time.withColumn('day', get_day(time.ts))

In [318]:
time = time.withColumn('week', get_week(time.ts))

In [319]:
time = time.withColumn('month', get_month(time.ts))

In [320]:
time = time.withColumn('year', get_year(time.ts))

In [321]:
time = time.withColumn('weekday', get_weekday(time.ts))

In [322]:
time = time.select('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [323]:
time.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-21 10:13:...|  10| 21|  47|   11|2018|      3|
|2018-11-21 11:01:...|  11| 21|  47|   11|2018|      3|
|2018-11-21 18:34:...|  18| 21|  47|   11|2018|      3|
|2018-11-21 20:44:...|  20| 21|  47|   11|2018|      3|
|2018-11-14 03:37:...|   3| 14|  46|   11|2018|      3|
|2018-11-14 04:13:...|   4| 14|  46|   11|2018|      3|
|2018-11-14 15:15:...|  15| 14|  46|   11|2018|      3|
|2018-11-14 17:13:...|  17| 14|  46|   11|2018|      3|
|2018-11-28 16:10:...|  16| 28|  48|   11|2018|      3|
|2018-11-05 04:40:...|   4|  5|  45|   11|2018|      1|
|2018-11-05 14:39:...|  14|  5|  45|   11|2018|      1|
|2018-11-30 02:19:...|   2| 30|  48|   11|2018|      5|
|2018-11-16 09:57:...|   9| 16|  46|   11|2018|      5|
|2018-11-16 11:35:...|  11| 16|  46|   11|2018|      5|
|2018-11-16 12:42:...|  12| 16|  46|   11|2018| 

# Users

In [324]:
users = logs.select(logs.userId.cast(IntegerType()).alias('user_id'), logs.firstName.alias('first_name'), logs.lastName.alias('last_name'), 'gender', 'level').dropDuplicates().dropna().sort('user_id')

In [325]:
users.dtypes

[('user_id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('level', 'string')]

In [326]:
users.filter(users.level != 'paid').count()

83

In [328]:
users.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|      4|    Alivia|  Terrell|     F| free|
|      5|    Elijah|    Davis|     M| free|
|      6|   Cecilia|    Owens|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|      8|    Kaylee|  Summers|     F| free|
|      9|     Wyatt|    Scott|     M| free|
|     10|    Sylvie|     Cruz|     F| free|
|     11| Christian|   Porter|     F| free|
|     12|    Austin|  Rosales|     M| free|
|     13|       Ava| Robinson|     F| free|
|     14|  Theodore|   Harris|     M| free|
|     15|      Lily|     Koch|     F| paid|
|     15|      Lily|     Koch|     F| free|
|     16|     Rylan|   George|     M| free|
|     16|     Rylan|   George|     M| paid|
|     17|  Makinley|    Jones|     F| free|
|     18|     Jacob|   Rogers|     M| free|
|     19|   Zachary|   Thomas|  

In [327]:
songdata.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



# songs

In [329]:
songdata.dtypes

[('artist_id', 'string'),
 ('artist_latitude', 'double'),
 ('artist_location', 'string'),
 ('artist_longitude', 'double'),
 ('artist_name', 'string'),
 ('duration', 'double'),
 ('num_songs', 'bigint'),
 ('song_id', 'string'),
 ('title', 'string'),
 ('year', 'bigint')]

In [330]:
songs = songdata.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates(['song_id']).sort('title')

In [331]:
songs.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOFFKZS12AB017F194|A Higher Place (A...|ARBEBBY1187B9B43DB|1994|236.17261|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SORAMLE12AB017C8B0|      Auguri Cha Cha|ARHHO3O1187B989413|   0|191.84281|
|SOZHPGD12A8C1394FE|     Baby Come To Me|AR9AWNF1187B9AB0B4|   0|236.93016|
|SOTUKVB12AB0181477|   Blessed Assurance|AR7ZKHQ1187B98DD73|1993|  270.602|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOXLBJT12A8C140925|   Caught In A Dream|ARYKCQI1187FB3B18F|2004|290.29832|
|SOINLJW12A8C13314C|       City Slickers|AR8IEZO1187B99055E|2008|149.86404|
|SOQOTLQ12AB01868D0|  Clementina Santafè|ARGCY1Y1187B9A4FA5|   0|153.33832|
|SOVYKGO12AB

# artists

In [332]:
artists = songdata.select('artist_id', songdata.artist_name.alias('name'), songdata.artist_location.alias('location'), songdata.artist_latitude.alias('latitude'), songdata.artist_longitude.alias('longitude')).dropDuplicates(['artist_id']).sort('name')

In [333]:
artists.show()

+------------------+----------------+--------------------+--------+----------+
|         artist_id|            name|            location|latitude| longitude|
+------------------+----------------+--------------------+--------+----------+
|AR558FS1187FB45658|         40 Grit|                    |    null|      null|
|AR7G5I41187FB4CE6C|        Adam Ant|     London, England|    null|      null|
|ARI3BMM1187FB4255E|    Alice Stuart|          Washington| 38.8991|   -77.029|
|ARL7K851187B99ACD2|       Andy Andy|                    |    null|      null|
|AR3JMC51187B9AE49D| Backstreet Boys|         Orlando, FL|28.53823| -81.37739|
|AR0RCMP1187FB3F427|Billie Jo Spears|        Beaumont, TX|30.08615| -94.10158|
|AREVWGE1187B9B890A|      Bitter End|           Noci (BA)| -13.442|  -41.9952|
|ARGSAFR1269FB35070|      Blingtones|                    |    null|      null|
|ARD842G1187B997376|      Blue Rodeo|Toronto, Ontario,...|43.64856| -79.38533|
|ARHHO3O1187B989413|       Bob Azzam|               

In [334]:
artists.count()

69