In [6]:
import json
import pandas as pd
import pyspark
from pyspark.sql.types import *
import boto

Retreiving data from s3 bucket and loading into spark dataframe

In [7]:
filepath = "s3a://anagi-spark8/input"
df = spark.read.json(filepath)

In [8]:
df.show(10)

+-----------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12| 

In [9]:
df.count()

8056

# First DataFrame - includes atomic data about the event logs

In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.orderBy("ts") 
songDF = df.withColumn("id", row_number().over(w)).selectExpr('id','ts', 'userId AS user_id', 'song')

In [14]:
songDF.show(10)

+---+-------------+-------+--------------------+
| id|           ts|user_id|                song|
+---+-------------+-------+--------------------+
|  1|1541105830796|     39|                null|
|  2|1541106106796|      8|                null|
|  3|1541106106796|      8|        You Gotta Be|
|  4|1541106132796|      8|                null|
|  5|1541106352796|      8|             Flat 55|
|  6|1541106496796|      8|Quem Quiser Encon...|
|  7|1541106673796|      8|           Eriatarka|
|  8|1541107053796|      8|     Becoming Insane|
|  9|1541107493796|      8|     Congratulations|
| 10|1541107734796|      8|          Once again|
+---+-------------+-------+--------------------+
only showing top 10 rows



# second DataFrame - consists of your user data (firstName, lastName, gender, location, level, userId) for this(only distinct users)

In [15]:
userDF = df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender","location","level").dropDuplicates(["user_id"])

In [16]:
userDF.show()

+-------+----------+---------+------+--------------------+-----+
|user_id|first_name|last_name|gender|            location|level|
+-------+----------+---------+------+--------------------+-----+
|     51|      Maia|    Burke|     F|Houston-The Woodl...| free|
|      7|    Adelyn|   Jordan|     F|Chicago-Napervill...| free|
|     15|      Lily|     Koch|     F|Chicago-Napervill...| paid|
|     54|     Kaleb|     Cook|     M|       Yuba City, CA| free|
|    101|    Jayden|      Fox|     M|New Orleans-Metai...| free|
|     11| Christian|   Porter|     F|  Elkhart-Goshen, IN| free|
|     29|Jacqueline|    Lynch|     F|Atlanta-Sandy Spr...| paid|
|     69|  Anabelle|  Simpson|     F|Philadelphia-Camd...| free|
|     42|    Harper|  Barrett|     M|New York-Newark-J...| paid|
|     73|     Jacob|    Klein|     M|Tampa-St. Petersb...| paid|
|     87|    Dustin|      Lee|     M|Myrtle Beach-Conw...| free|
|     64|    Hannah|  Calhoun|     F|Los Angeles-Long ...| free|
|      3|     Isaac|   Va

# The third DataFrame - contains a composite key the id and artist.

In [17]:
import pyspark.sql.functions as f
temp = df.groupBy('artist').count().select('artist', f.col('count'))

In [18]:
from pyspark.sql.window import Window  
w = Window.orderBy("artist") 
df3 = temp.withColumn('id', row_number().over(w)).selectExpr('id','artist','count')

In [19]:
df3.show()

+---+--------------------+-----+
| id|              artist|count|
+---+--------------------+-----+
|  1|                null| 1236|
|  2|                 !!!|    1|
|  3|'N Sync/Phil Collins|    1|
|  4|  + / - {Plus/Minus}|    2|
|  5|                 +44|    1|
|  6|        1 Mile North|    1|
|  7|            10 Years|    1|
|  8|      10_000 Maniacs|    1|
|  9|                10cc|    3|
| 10|           12 Stones|    3|
| 11|           1200 Mics|    1|
| 12|              22-20s|    1|
| 13|           23 Skidoo|    2|
| 14|                2Mex|    1|
| 15|        3 Doors Down|   13|
| 16|                 311|    5|
| 17|          38 Special|    2|
| 18|               3OH!3|    4|
| 19|             4 Skins|    3|
| 20|               49ers|    1|
+---+--------------------+-----+
only showing top 20 rows



# Writing these three to S3 in Parquet format

In [23]:
songDF.write.parquet("s3a://anagi-spark8/output/songDF/")
userDF.write.parquet("s3a://anagi-spark8/output/userDF/")
df3.write.parquet("s3a://anagi-spark8/output/artistDF/")

# using the artist DataFrame, compute an aggrate to show the top 10 artists

In [24]:
df3.filter((df3.artist != 'null')).sort('count', ascending=False).show(10)

+----+--------------------+-----+
|  id|              artist|count|
+----+--------------------+-----+
| 580|            Coldplay|   58|
|1504|       Kings Of Leon|   55|
| 823|       Dwight Yoakam|   38|
|2660|      The Black Keys|   36|
|1003|Florence + The Ma...|   35|
|1263|        Jack Johnson|   35|
|1916|                Muse|   35|
| 310|            BjÃÂ¶rk|   33|
|1350|          John Mayer|   31|
|2739|         The Killers|   31|
+----+--------------------+-----+
only showing top 10 rows

