In [1]:
!pip install pyspark==3.3.2


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local[*]").appName("SparkMusicHabits").config("spark.driver.memory", "6g").getOrCreate()
# I had to tune the driver (which is also the executor) memory to avoid OOM errors, since I ran on local machine

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/02 13:26:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

tsv_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("artist_id", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("track_id", StringType(), True),
    StructField("track_name", StringType(), True)
])

In [5]:
df = spark.read.load("userid-timestamp-artid-artname-traid-traname.tsv",
                     format="csv", sep="\t", schema=tsv_schema, header="false")
df.show(30)  # Sanity check for our data

+-----------+-------------------+--------------------+---------------+--------------------+--------------------+
|    user_id|          timestamp|           artist_id|    artist_name|            track_id|          track_name|
+-----------+-------------------+--------------------+---------------+--------------------+--------------------+
|user_000001|2009-05-05 02:08:57|f1b1cf71-bd35-4e9...|      Deep Dish|                null|Fuck Me Im Famous...|
|user_000001|2009-05-04 16:54:10|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Composition 0919 ...|
|user_000001|2009-05-04 16:52:04|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Mc2 (Live_2009_4_15)|
|user_000001|2009-05-04 16:42:52|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Hibari (Live_2009...|
|user_000001|2009-05-04 16:42:11|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Mc1 (Live_2009_4_15)|
|user_000001|2009-05-04 16:38:31|a7f7df4a-77d8-4f1...|       坂本龍一|                null|To Stanford (Live...|
|us

In [6]:
from pyspark.sql.functions import col, sum

# fancy way to aggregate every col, and count the nulls, this is just EDA
null_counts = df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()



+-------+---------+---------+-----------+--------+----------+
|user_id|timestamp|artist_id|artist_name|track_id|track_name|
+-------+---------+---------+-----------+--------+----------+
|      0|        0|   602166|          0| 2168588|         1|
+-------+---------+---------+-----------+--------+----------+



                                                                                

# Ok so we can NOT count on artist_id or track_id, but the rest seems pretty good. Great!
Note that fallback to rely on track_name only has drawbacks - say we have two songs with same title (which is quite common: https://www.thetoptens.com/music/different-songs-with-same-title/). More robust approach would be to rely on the artist_name & track_name combination

In [7]:
from pyspark.sql.functions import concat

# let's create robust id, add new col: song_fullname
df = df.withColumn('song_fullname', concat(col('artist_name'),col('track_name')))

In [8]:
df.show(10)

+-----------+-------------------+--------------------+-----------+--------+--------------------+------------------------+
|    user_id|          timestamp|           artist_id|artist_name|track_id|          track_name|           song_fullname|
+-----------+-------------------+--------------------+-----------+--------+--------------------+------------------------+
|user_000001|2009-05-05 02:08:57|f1b1cf71-bd35-4e9...|  Deep Dish|    null|Fuck Me Im Famous...|    Deep DishFuck Me ...|
|user_000001|2009-05-04 16:54:10|a7f7df4a-77d8-4f1...|   坂本龍一|    null|Composition 0919 ...|坂本龍一Composition 0...|
|user_000001|2009-05-04 16:52:04|a7f7df4a-77d8-4f1...|   坂本龍一|    null|Mc2 (Live_2009_4_15)|坂本龍一Mc2 (Live_200...|
|user_000001|2009-05-04 16:42:52|a7f7df4a-77d8-4f1...|   坂本龍一|    null|Hibari (Live_2009...|坂本龍一Hibari (Live_...|
|user_000001|2009-05-04 16:42:11|a7f7df4a-77d8-4f1...|   坂本龍一|    null|Mc1 (Live_2009_4_15)|坂本龍一Mc1 (Live_200...|
|user_000001|2009-05-04 16:38:31|a7f7df4a-77d8-4f1...|  

# Part A: Create a list of user IDs, along with the number of distinct songs each user has played.

# Approach - If it was pure SQL - than Select user id, count(DISTINCT(song_fullname)) Group by user id

In [9]:
from pyspark.sql.functions import countDistinct

distinct_songs_per_user = df.groupBy('user_id').agg(countDistinct('song_fullname').alias('distinct_songs_count'))

distinct_songs_per_user.show(10)

# you many want to write the df to a sink to get the full results, or to convert it to pandas df (if small enough) if you want to use it in python code
distinct_songs_per_user.write \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .mode("overwrite") \
    .save("distinct_songs_user_played.tsv")

                                                                                

+-----------+--------------------+
|    user_id|distinct_songs_count|
+-----------+--------------------+
|user_000066|                 666|
|user_000113|                2133|
|user_000098|                 254|
|user_000372|                4789|
|user_000424|                2004|
|user_000577|               18227|
|user_000708|                4743|
|user_000289|                 994|
|user_000319|                6294|
|user_000445|                3718|
+-----------+--------------------+
only showing top 10 rows



                                                                                

# Part B: Create a list of the 100 most popular songs (artist and title) in the dataset, with the number of times each was played.

Approach: SQL solution would be like: Select artist_name, track_name, count(song_fullname) as times_played GROUP BY song_fullname order by times_played LIMIT 100

In [10]:
df.createOrReplaceTempView("songs_table")  # temporary view for Spark SQL

top_100_songs = spark.sql("""
    SELECT artist_name, track_name, COUNT(*) as times_played
    FROM songs_table
    GROUP BY artist_name, track_name
    ORDER BY times_played DESC
    LIMIT 100
""")

top_100_songs.show()



+-------------------+--------------------+------------+
|        artist_name|          track_name|times_played|
+-------------------+--------------------+------------+
| The Postal Service|  Such Great Heights|        3992|
|       Boy Division|Love Will Tear Us...|        3663|
|          Radiohead|        Karma Police|        3534|
|               Muse|Supermassive Blac...|        3483|
|Death Cab For Cutie|     Soul Meets Body|        3479|
|          The Knife|          Heartbeats|        3156|
|               Muse|           Starlight|        3060|
|        Arcade Fire|    Rebellion (Lies)|        3048|
|     Britney Spears|          Gimme More|        3004|
|        The Killers| When You Were Young|        2998|
|           Interpol|                Evil|        2989|
|         Kanye West|       Love Lockdown|        2950|
|     Massive Attack|            Teardrop|        2948|
|Death Cab For Cutie|I Will Follow You...|        2947|
|               Muse| Time Is Running Out|      

                                                                                

# If you prefer the DataFrame API then:

In [11]:
from pyspark.sql.functions import desc
songs_popularity = df.groupBy('artist_name', 'track_name').count()
top_100_songs_dataframe_api = songs_popularity.orderBy(desc('count')).limit(100)
top_100_songs_dataframe_api.show()

top_100_songs_dataframe_api.write \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .mode("overwrite") \
    .save("100_most_popular_songs.tsv")

                                                                                

+-------------------+--------------------+-----+
|        artist_name|          track_name|count|
+-------------------+--------------------+-----+
| The Postal Service|  Such Great Heights| 3992|
|       Boy Division|Love Will Tear Us...| 3663|
|          Radiohead|        Karma Police| 3534|
|               Muse|Supermassive Blac...| 3483|
|Death Cab For Cutie|     Soul Meets Body| 3479|
|          The Knife|          Heartbeats| 3156|
|               Muse|           Starlight| 3060|
|        Arcade Fire|    Rebellion (Lies)| 3048|
|     Britney Spears|          Gimme More| 3004|
|        The Killers| When You Were Young| 2998|
|           Interpol|                Evil| 2989|
|         Kanye West|       Love Lockdown| 2950|
|     Massive Attack|            Teardrop| 2948|
|Death Cab For Cutie|I Will Follow You...| 2947|
|               Muse| Time Is Running Out| 2945|
|         Bloc Party|             Banquet| 2906|
|        Arcade Fire|Neighborhood #1 (...| 2826|
|          Radiohead

                                                                                

# Part C:
Say we define a user’s “session” of Last.fm usage to be composed of one or more
songs played by that user, where each song is started within 20 minutes of the previous
song’s start time.
Create a list of the top 10 longest sessions, with the following information about each
session:
● userid
● timestamp of first and last songs in the session
● List of songs played in the session (in order of play)

Approach: First say we talk on specific user (GROUP BY/Partition BY user), If I have for every song the time the previous song (ORDER BY time) played, and it's lower than 20 min than they both part of the same session, to do this we can use LAG with offset of 1 to create "previous_song_playtime". using "previous_song_playtime" it's very easy to create "time_from_prev_song". then the challenge is to "number" the session, it's easy to detect each new session -> if time from prev_song>20 min, for each new session we will create a flag col "new session", if we count those flags until each record (window function sum, similar to the way rank works, but with sum) we will get session_number col. Now in order find the longest one - we need to find the start/end time - smallest and highest timestamps of the session, calculate the diff -> session_length, we already have the user_id, session number and now the session_length, so all that remains is to fetch all the songs in this list. then get the top 10 of the session length

In [12]:
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
from pyspark.sql import functions as F


time_considered_new_session = 20 * 60  # 20 minutes in seconds

# Calculate time_from_prev_song
user_window = Window.partitionBy('user_id').orderBy('timestamp')
df = df.withColumn('sec_from_prev_song', (F.col('timestamp') - F.lag('timestamp', 1).over(user_window)).cast('long'))
# print(df.columns)
# df.select("user_id", "timestamp", "sec_from_prev_song").show(10)  # health check


# Number the sessions, Logic: each time we find more than 'session_window' diff we raise a flag,
# session_num is the counter of all raised flags so far == current session num
session_flag = F.when(col('sec_from_prev_song').isNull() | (col('sec_from_prev_song') >= time_considered_new_session), lit(1)).otherwise(lit(0))
df = df.withColumn('session_num', F.sum(session_flag).over(user_window))

# print(df.columns)
# df.select("user_id", "timestamp", "sec_from_prev_song", "user_session_num").show(200)  # health check

# For each session - calculations in order to extract session_duration_sec, and songs_in_session
sessions = df.groupBy('user_id', 'session_num').agg(
    F.min('timestamp').alias('start_time'),
    F.max('timestamp').alias('end_time'),
    F.sort_array(F.collect_list(F.struct('timestamp', 'artist_name', 'track_name'))).alias('songs_in_session')  # sort_array to maintain order
)

# spark doesn't support creating session_duration_sec in the above declaration, therefore added here
sessions = sessions.withColumn('session_duration_sec', F.col('end_time').cast('long') - F.col('start_time').cast('long'))
# Note: if the fact that songs_in_session starts with timestamp bother us we can use: transform(songs_in_session, x -> struct(x.artist_name, x.track_name))
# print(sessions.columns)
# sessions.show(40, truncate=False) # health check

# Eventually get top10 by session_duration_sec
top_10_sessions = sessions.orderBy(F.desc('session_duration_sec')).limit(10)
top_10_sessions.select('user_id', 'start_time', 'end_time', 'session_duration_sec', 'songs_in_session').show(10)  # health check

# Write to file
top_10_sessions_to_dump = top_10_sessions.withColumn('songs_in_session', F.col('songs_in_session').cast("string"))  # you can't write list to csv, convert to string
top_10_sessions_to_dump.write \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .mode("overwrite") \
    .save("top_10_longest_sessions.tsv")

                                                                                

+-----------+-------------------+-------------------+--------------------+--------------------+
|    user_id|         start_time|           end_time|session_duration_sec|    songs_in_session|
+-----------+-------------------+-------------------+--------------------+--------------------+
|user_000949|2006-02-12 19:49:31|2006-02-27 13:29:37|             1273206|[{2006-02-12 19:4...|
|user_000997|2007-04-26 03:36:02|2007-05-10 20:55:03|             1271941|[{2007-04-26 03:3...|
|user_000949|2007-05-01 05:41:15|2007-05-14 03:05:52|             1113877|[{2007-05-01 05:4...|
|user_000544|2007-02-12 15:03:52|2007-02-23 02:51:08|              906436|[{2007-02-12 15:0...|
|user_000949|2005-12-09 10:26:38|2005-12-18 06:40:04|              764006|[{2005-12-09 10:2...|
|user_000949|2005-11-11 05:30:37|2005-11-19 00:50:07|              674370|[{2005-11-11 05:3...|
|user_000949|2006-03-19 01:04:14|2006-03-26 20:13:45|              673771|[{2006-03-19 01:0...|
|user_000544|2007-01-06 03:07:04|2007-01

                                                                                

In [13]:
spark.stop()

In [14]:
print("Done!")

Done!
