In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark=SparkSession.builder.appName('demo').getOrCreate()
spark

### read the user_id data

In [6]:
df_user = spark.read.csv('userid-profile.tsv',sep='\t', header=True)
df_user.show()

+-----------+------+----+------------------+------------+
|        #id|gender| age|           country|  registered|
+-----------+------+----+------------------+------------+
|user_000001|     m|null|             Japan|Aug 13, 2006|
|user_000002|     f|null|              Peru|Feb 24, 2006|
|user_000003|     m|  22|     United States|Oct 30, 2005|
|user_000004|     f|null|              null|Apr 26, 2006|
|user_000005|     m|null|          Bulgaria|Jun 29, 2006|
|user_000006|  null|  24|Russian Federation|May 18, 2006|
|user_000007|     f|null|     United States|Jan 22, 2006|
|user_000008|     m|  23|          Slovakia|Sep 28, 2006|
|user_000009|     f|  19|     United States|Jan 13, 2007|
|user_000010|     m|  19|            Poland| May 4, 2006|
|user_000011|     m|  21|           Finland| Sep 8, 2005|
|user_000012|     f|  28|     United States|Mar 30, 2005|
|user_000013|     f|  25|           Romania|Sep 25, 2006|
|user_000014|  null|null|              null|Jan 27, 2006|
|user_000015| 

### read user_song track data

In [4]:
df_track = spark.read.csv('userid-timestamp-artid-artname-traid-traname.tsv',sep='\t',header=None)
df_track.show()

+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
|        _c0|                 _c1|                 _c2|            _c3|                 _c4|                 _c5|
+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
|user_000001|2009-05-04T23:08:57Z|f1b1cf71-bd35-4e9...|      Deep Dish|                null|Fuck Me Im Famous...|
|user_000001|2009-05-04T13:54:10Z|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Composition 0919 ...|
|user_000001|2009-05-04T13:52:04Z|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Mc2 (Live_2009_4_15)|
|user_000001|2009-05-04T13:42:52Z|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Hibari (Live_2009...|
|user_000001|2009-05-04T13:42:11Z|a7f7df4a-77d8-4f1...|       坂本龍一|                null|Mc1 (Live_2009_4_15)|
|user_000001|2009-05-04T13:38:31Z|a7f7df4a-77d8-4f1...|       坂本龍一|                null|To Stanford (Liv

### check the count and schema

In [10]:
# the numner of users
print(f"The number of users: {df_user.distinct().count()}")

The number of users: 992


In [11]:
# the row numbers in userid-timestamp-artid-artname-traid-traname.tsv
print(f"There are {df_track.count()} rows in the track.")

There are 19150868 rows in the track.


In [12]:
# schema
df_track.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



### find top 50 longest sessions, where song interval < 20 minutes

#### 1. convert timestamp from string to timestamp

In [17]:
df_track= df_track.select('_c0', to_timestamp(df_track._c1).alias('_c1'), '_c2','_c3','_c4','_c5').sort("_c0", "_c1")

#### 2. check the time_interval between sequencial track, groupby user_id

In [19]:
df_track = df_track.withColumn(
        "time_interval_sec", df_track._c1.cast("bigint") - lag(df_track._c1, 1).over(Window.partitionBy("_c0").orderBy(col("_c1"))).cast("bigint")  )

In [20]:
df_track.show()

+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|        _c0|                _c1|                 _c2|                 _c3|                 _c4|                 _c5|time_interval_sec|
+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|user_000034|2005-09-16 00:01:31|2feb192c-2363-46d...|    The Afghan Whigs|                null|How Soon Is Now (...|             null|
|user_000034|2005-09-16 00:05:03|fe3503fb-146f-4d6...|    Afrika Bambaataa|8e025002-e43c-41d...|Zulu Nation Throw...|              212|
|user_000034|2005-09-16 00:18:12|fe3503fb-146f-4d6...|    Afrika Bambaataa|d081e710-a4cf-43d...|Jazzy Sensation (...|              789|
|user_000034|2005-09-16 00:27:51|fe3503fb-146f-4d6...|    Afrika Bambaataa|19f25ddb-77f7-448...|         Planet Rock|              579|
|user_000034|2005-09-16 16:30:04|db612997-f11e-4

In [21]:
# check nulls in 'time_interval_sec', agree with the number of users
# since the time_interval_sec of first record of each user is always null.
df_track.filter((df_track['time_interval_sec'].isNull())).count()

992

#### 3. define "session" where time_interval_sec < 20min (1200 seconds)
##### sessions 0: within the same session of previous song
##### sessions 1: start a new session, > 20 mins

In [23]:
df_track = df_track.withColumn("sessions", when( (df_track['time_interval_sec']>1200) | (df_track['time_interval_sec']==None) , 1).otherwise(0))

#### 3.1 "session_id": cumulative sum of session
##### e.g., 1,2,3,4...n


In [25]:
df_track = df_track.withColumn('session_id', sum(df_track.sessions).over(Window.partitionBy('_c0').orderBy().rowsBetween(-sys.maxsize, 0)))

In [26]:
df_track.show()

+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------+----------+
|        _c0|                _c1|                 _c2|                 _c3|                 _c4|                 _c5|time_interval_sec|sessions|session_id|
+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------+----------+
|user_000034|2005-09-16 00:01:31|2feb192c-2363-46d...|    The Afghan Whigs|                null|How Soon Is Now (...|             null|       0|         0|
|user_000034|2005-09-16 00:05:03|fe3503fb-146f-4d6...|    Afrika Bambaataa|8e025002-e43c-41d...|Zulu Nation Throw...|              212|       0|         0|
|user_000034|2005-09-16 00:18:12|fe3503fb-146f-4d6...|    Afrika Bambaataa|d081e710-a4cf-43d...|Jazzy Sensation (...|              789|       0|         0|
|user_000034|2005-09-16 00:27:51|fe3503fb-146f-4d6...|    Afrika

#### 4. Count the frequency of each session_id, groupby by user_id, find top 50 longest sessions
##### the freqency presents number of songs in each session_id

In [29]:
df_track_count = df_track.groupBy('_c0','session_id').count().orderBy('count',ascending=False)
df_track_count50 = df_track_count.limit(50)
df_track_count50.show()

+-----------+----------+-----+
|        _c0|session_id|count|
+-----------+----------+-----+
|user_000949|       150| 5360|
|user_000544|        74| 5350|
|user_000949|       138| 4956|
|user_000949|       558| 4705|
|user_000997|        17| 4357|
|user_000544|        55| 3809|
|user_000544|        54| 3651|
|user_000949|       124| 3077|
|user_000262|      1119| 2862|
|user_000949|       188| 2834|
|user_000554|       545| 2701|
|user_000949|       151| 2652|
|user_000949|       147| 2643|
|user_000250|      1284| 2600|
|user_000949|       148| 2541|
|user_000544|        69| 2541|
|user_000949|       707| 2436|
|user_000008|       235| 2435|
|user_000974|         5| 2423|
|user_000008|       116| 2394|
+-----------+----------+-----+
only showing top 20 rows



#### 5. filter df_track, such that it only contain rows that are in top 50 longest session: df_track_count50

In [31]:
df_join = df_track.join(df_track_count50, ['_c0', 'session_id'], 'leftsemi')

##### test does the join make sense:

In [35]:
print("test 1: when user_000949 and session_id==150, it should contain 5360 rows")
print(df_join.filter(( (df_join['_c0'] == 'user_000949') & (df_join['session_id'] == 150 ))).count())
print("test 2: the length of df_join should equal to df_track_count50.count.sum()")
print(f"test 2: length of df_join: {df_join.count()}; df_track_count50.count.sum(): {df_track_count50.groupBy().sum('count').collect()[0][0]}")

test 1: when user_000949 and session_id==150, it should contain 5360 rows
5360
test 2: the length of df_join should equal to df_track_count50.count.sum()
test 2: length of df_join: 129813; df_track_count50.count.sum(): 129813


#### 6. find the top 10 songs that in the filtered (top 50 longest session) df_track

In [36]:
df_join_count = df_join.groupBy('_c4','_c5').count().orderBy('count',ascending=False)
df_join_count.show()

+--------------------+--------------------+-----+
|                 _c4|                 _c5|count|
+--------------------+--------------------+-----+
|60f0bfa4-8da9-484...|              Jolene| 1214|
|db4c9220-df76-4b4...|          Heartbeats|  868|
|c2b14074-15d6-40c...|How Long Will It ...|  726|
|91951530-d978-464...|Anthems For A Sev...|  659|
|45c773c3-c8f0-41a...|     St. Ides Heaven|  646|
|                null|         Bonus Track|  644|
|b649b4ba-4912-4ad...|Starin' Through M...|  617|
|55956d3e-c5d4-433...|     Beast Of Burden|  613|
|9ad11ca6-c5b9-4a2...|           The Swing|  604|
|82558949-cd98-4c5...|See You In My Nig...|  536|
|03bd74d1-e7f9-471...|        Say You Will|  535|
|986077d0-162b-43b...|           Heartless|  532|
|20327ab0-f6c6-440...|Amazing (Feat. Yo...|  530|
|4c498872-70d2-4b0...|      Coldest Winter|  530|
|153d8ca0-dc23-454...|       Love Lockdown|  529|
|cc086205-4e55-474...|Welcome To Heartb...|  526|
|b08d2a9c-17ec-4d3...|Paranoid (Feat. M...|  526|


#### save the top 10 songs into csv file

In [37]:
df_join_count.limit(10).toPandas().to_csv('top10_songs.csv')