# Top 10 Songs in Top 50 Longest Sessions

## Problem Statement
What are the top 10 songs played in the top 50 longest sessions by tracks count?

## Session Definition
A user "session" consists of one or more songs played by a given user, where each song is started within 20 minutes of the previous song's start time.

In [1]:
import sys
sys.path.append('..')

from src.common.definition import (
    create_spark_session,
    load_track_data,
    add_sessions_id_columns,
)
from src.analysis_tracks_by_sessions import (
    top_tracks_from_longest_sessions,
    compute_session_duration
)

# Configuration
DATA_PATH = "../userid-timestamp-artid-artname-traid-traname.tsv"
SESSION_GAP_SEC = 20 * 60  # 20 minutes
TOP_N_SESSIONS = 50
TOP_N_TRACKS = 10

In [2]:
spark = create_spark_session("exercise_1_top_songs")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/12/03 18:04:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
track_list = load_track_data(spark, DATA_PATH)
print(f"Total records loaded: {track_list.count():,}")
track_list.printSchema()

[Stage 0:>                                                        (0 + 12) / 19]





25/12/03 18:04:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Total records loaded: 19,150,868
root
 |-- userid: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- musicbrainz_artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- musicbrainz_track_id: string (nullable = true)
 |-- track_name: string (nullable = true)



                                                                                

In [4]:
df_sessions = add_sessions_id_columns(track_list, SESSION_GAP_SEC)
df_sessions = df_sessions.select("userid", "timestamp", "track_name", "session_id")

print(f"Total sessions created: {df_sessions.select('userid', 'session_id').distinct().count():,}")
df_sessions.show(10)

[Stage 3:>                                                        (0 + 12) / 19]









[Stage 5:>                                                          (0 + 8) / 8]

25/12/03 18:04:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/03 18:04:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:04:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/03 18:04:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:04:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/03 18:04:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.








                                                                                

Total sessions created: 1,041,883


[Stage 9:>                                                        (0 + 12) / 19]

[Stage 9:===>                                                     (1 + 12) / 19]







[Stage 11:>                                                         (0 + 1) / 1]

                                                                                

+-----------+-------------------+--------------------+----------+
|     userid|          timestamp|          track_name|session_id|
+-----------+-------------------+--------------------+----------+
|user_000011|2005-09-22 20:02:48|2 Dope 2 Miss Int...|         0|
|user_000011|2005-09-22 20:26:14|           Cowabunga|         1|
|user_000011|2005-09-22 20:31:04|           Radiohead|         1|
|user_000011|2005-09-22 20:35:59|           The Theme|         1|
|user_000011|2005-09-22 20:40:19|   Return Of Da Baby|         1|
|user_000011|2005-09-22 20:48:04|Planet Rock 2K (O...|         1|
|user_000011|2005-09-22 20:49:34|     First Communion|         1|
|user_000011|2005-09-22 20:52:49|       Chevy Ride By|         1|
|user_000011|2005-09-22 20:56:25|       I'M A Gangsta|         1|
+-----------+-------------------+--------------------+----------+
only showing top 10 rows



In [5]:
session_durations = compute_session_duration(df_sessions)
session_durations.orderBy("session_duration_sec", ascending=False).show(20)

print("\nSession Duration Statistics:")
session_durations.select("session_duration_sec").describe().show()

[Stage 12:>                                                       (0 + 12) / 19]







[Stage 14:>                                                         (0 + 8) / 8]

25/12/03 18:05:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/03 18:05:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:05:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.










                                                                                

+-----------+----------+--------------------+
|     userid|session_id|session_duration_sec|
+-----------+----------+--------------------+
|user_000949|       150|             1273206|
|user_000997|        17|             1271941|
|user_000949|       558|             1113877|
|user_000544|        74|              906436|
|user_000949|       138|              764006|
|user_000949|       124|              674370|
|user_000949|       188|              673771|
|user_000544|        54|              651041|
|user_000250|      1284|              625578|
|user_000949|       151|              612307|
|user_000949|       147|              608594|
|user_000885|        64|              600514|
|user_000949|       148|              585599|
|user_000949|       117|              562357|
|user_000544|        55|              555344|
|user_000949|       211|              550256|
|user_000974|         5|              536912|
|user_000949|       143|              534994|
|user_000997|        18|          

[Stage 15:>                                                       (0 + 12) / 19]









[Stage 17:>                                                         (0 + 8) / 8]

25/12/03 18:06:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:06:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:06:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:06:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.








+-------+--------------------+
|summary|session_duration_sec|
+-------+--------------------+
|  count|             1041883|
|   mean|   4496.944506244943|
| stddev|    9884.01997959857|
|    min|                   0|
|    max|             1273206|
+-------+--------------------+



In [6]:
top_10_tracks = top_tracks_from_longest_sessions(
    df_sessions,
    top_n_sessions=TOP_N_SESSIONS,
    top_n_tracks=TOP_N_TRACKS
)

print(f"\nTop {TOP_N_TRACKS} songs played in the top {TOP_N_SESSIONS} longest sessions:\n")
top_10_tracks.show(truncate=False)



Top 10 songs played in the top 50 longest sessions:



[Stage 21:>               (0 + 12) / 19][Stage 22:>                (0 + 0) / 19]

[Stage 21:>               (1 + 12) / 19][Stage 22:>                (0 + 0) / 19]































[Stage 24:>                                                         (0 + 8) / 8]

25/12/03 18:07:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:07:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:07:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:07:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:07:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.








                                                                                

[Stage 27:>                                                         (0 + 8) / 8]

25/12/03 18:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:07:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


25/12/03 18:07:13 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.














+-------------------------------------+----------+
|track_name                           |play_count|
+-------------------------------------+----------+
|Jolene                               |1215      |
|Heartbeats                           |864       |
|How Long Will It Take                |809       |
|Anthems For A Seventeen Year Old Girl|659       |
|St. Ides Heaven                      |646       |
|Bonus Track                          |644       |
|Starin' Through My Rear View         |616       |
|Beast Of Burden                      |613       |
|The Swing                            |604       |
|When You Were Young                  |520       |
+-------------------------------------+----------+



                                                                                

## Cleanup

In [7]:
spark.stop()