In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, TimestampType
from pyspark.sql.window import Window

work/inputs/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv


In [2]:
# create a SparkSession
spark = SparkSession.builder \
    .appName("LastFM Top Songs Challenge") \
    .getOrCreate()

In [3]:
# userid \t timestamp \t musicbrainz-artist-id \t artist-name \t musicbrainz-track-id \t track-name
schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("timestamp", TimestampType(), True),
    StructField("musicbrainz_trackid", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("musicbrainz-track-id", StringType(), True),
    StructField("track_name", StringType(), True)
])

# read the data into a DataFrame, with schema option set to the custom schema
data = spark.read.format("csv") \
    .option("header", "false") \
    .option("delimiter", "\t") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .load("work/inputs/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv")

data.show(5)

+-----------+-------------------+--------------------+-----------+--------------------+--------------------+
|    user_id|          timestamp| musicbrainz_trackid|artist_name|musicbrainz-track-id|          track_name|
+-----------+-------------------+--------------------+-----------+--------------------+--------------------+
|user_000001|2009-05-05 01:08:57|f1b1cf71-bd35-4e9...|  Deep Dish|                null|Fuck Me Im Famous...|
|user_000001|2009-05-04 15:54:10|a7f7df4a-77d8-4f1...|   坂本龍一|                null|Composition 0919 ...|
|user_000001|2009-05-04 15:52:04|a7f7df4a-77d8-4f1...|   坂本龍一|                null|Mc2 (Live_2009_4_15)|
|user_000001|2009-05-04 15:42:52|a7f7df4a-77d8-4f1...|   坂本龍一|                null|Hibari (Live_2009...|
|user_000001|2009-05-04 15:42:11|a7f7df4a-77d8-4f1...|   坂本龍一|                null|Mc1 (Live_2009_4_15)|
+-----------+-------------------+--------------------+-----------+--------------------+--------------------+
only showing top 5 rows



In [5]:
# convert timestamp to datetime format
data = data.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'"))

# create a session id for each song played by a user
session_window = Window.partitionBy("user").orderBy("timestamp")
data = data.withColumn("prev_timestamp", lag("timestamp", 1).over(session_window))
data = data.withColumn("session_id", sum(when(col("timestamp").cast("long") - col("prev_timestamp").cast("long") > 20*60, 1).otherwise(0)).over(session_window))

# filter out sessions with less than 2 songs
session_count_window = Window.partitionBy("user", "session_id").orderBy("timestamp")
data = data.withColumn("song_count", count("*").over(session_count_window))
data = data.filter(col("song_count") > 1)

# get the top 50 longest sessions by tracks count
session_window = Window.partitionBy("user", "session_id").orderBy("timestamp")
data = data.withColumn("track_count", count("*").over(session_window))
session_window = Window.partitionBy("user").orderBy(desc("track_count"), desc("timestamp"))
top_50_sessions = data.withColumn("rank", dense_rank().over(session_window)).filter(col("rank") <= 50)

# get the top 10 songs played in the top 50 longest sessions by tracks count
top_songs = top_50_sessions.groupBy("artist_name", "track_name").agg(count("*").alias("play_count")).orderBy(desc("play_count"), "artist_name", "track_name").limit(10)

top_songs.show(10, False)

+----------------+------------------------------------------------------+----------+
|artist_name     |track_name                                            |play_count|
+----------------+------------------------------------------------------+----------+
|Def Leppard     |Me & My Wine (Remix)                                  |2         |
|Vanessa Williams|You Are Everything (Junior Vasquez Mirrorball Clubmix)|2         |
|311             |Large In The Margin                                   |1         |
|Cypress Hill    |(Rock) Superstar                                      |1         |
|Cypress Hill    |A Man                                                 |1         |
|Cypress Hill    |Can'T Get The Best Of Me                              |1         |
|Cypress Hill    |Dust                                                  |1         |
|David Guetta    |The World Is Mine (Radio Edit)                        |1         |
|David Guetta    |When Love Takes Over (Feat. Kelly Rowland)     