In [1]:
from pyspark.sql import SparkSession
import logging

In [2]:
%reload_kedro

In [4]:
# check that the spark hook was triggered and that the session is live
SparkSession.builder.getOrCreate() # OK

In [3]:
spark = SparkSession.builder.getOrCreate()
assert spark.conf.get("spark.app.name") == 'music-listening-behaviour'

In [15]:
tracksess = catalog.load('track-sessions')

In [5]:
tracksess.columns

In [6]:
tracksess.show(100)

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     userid|           timestamp|            artistid|          artistname|               traid|             traname|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|user_000001|2009-05-04T23:08:57Z|f1b1cf71-bd35-4e9...|           Deep Dish|                null|Fuck Me Im Famous...|
|user_000001|2009-05-04T13:54:10Z|a7f7df4a-77d8-4f1...|            坂本龍一|                null|Composition 0919 ...|
|user_000001|2009-05-04T13:52:04Z|a7f7df4a-77d8-4f1...|            坂本龍一|                null|Mc2 (Live_2009_4_15)|
|user_000001|2009-05-04T13:42:52Z|a7f7df4a-77d8-4f1...|            坂本龍一|                null|Hibari (Live_2009...|
|user_000001|2009-05-04T13:42:11Z|a7f7df4a-77d8-4f1...|            坂本龍一|                null|Mc1 (Live_2009_4_15)|
|user_000001|2009-05-04T13:38:31Z|a7f7df4a-77d8-4f1...|         

In [3]:
from pyspark.sql import SparkSession
# from pyspark.sql.functions import col  # Import the col function
import logging
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import unix_timestamp, lag, col
import pandas as pd
log = logging.getLogger("kedro")

In [25]:




#log.warning("Issue warning")
#log.info("Send information")


# Configure SparkSession with local mode and 6 worker threads
spark = SparkSession.builder.getOrCreate()
assert spark.conf.get("spark.app.name") == 'music-listening-behaviour'

# the aim is to group the playlist dataset by listening session 
# each song has been started within 15 minutes of the other 

# group by users and start with a user's minimum track session start time
# Define a new session ID everytime the time between consecutive tracks exceeds the treshold.
# Also define a global session ID, unique across users

tracksess = spark.read.csv('C:/GP/music-listening-behaviour/data/01_raw/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv', sep = '\t').withColumnRenamed("_c0", "userid") \
                     .withColumnRenamed("_c1", "timestamp") \
                     .withColumnRenamed("_c2", "artistid") \
                     .withColumnRenamed("_c3", "artistname") \
                     .withColumnRenamed("_c4", "traid") \
                     .withColumnRenamed("_c5", "traname").orderBy(['userid', "timestamp"]) # catalog.load('track-sessions')
# tracksess.show(10)
#tracksess.count()



# defined in conf/base/parameters.yml
session_maxidletime = 900 # seconds 
long_session_quantile = 0.9
log.info('starting pipeline with session_maxidletime '+ str(session_maxidletime)+ ' long_session_quantile '+ str(long_session_quantile))

# define basic assertions
nb_distinct_users = tracksess.select("userid").distinct().count()
nb_distinct_users_profiles = tracksess.select("userid").distinct().count()
assert nb_distinct_users==992, "Unexpected number of users"

# Convert timestamp column to timestamp datatype
ts = tracksess.withColumn("timestamp", unix_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'").cast("timestamp"))

# Define a window specification for user-based operations
user_window = Window.partitionBy("userid").orderBy("timestamp")

# Calculate time differences between consecutive rows for each user
ts = ts.withColumn("time_diff", F.unix_timestamp("timestamp") - F.unix_timestamp(F.lag("timestamp").over(user_window))).orderBy(['userid', "timestamp"])

# Calculate session boundaries: a new session starts if time difference is greater than 15 minutes
# or if the time_diff is null (start of a new user's sessions)
ts = ts.withColumn("is_new_session", F.when((F.col("time_diff") > session_maxidletime) | (F.col("time_diff").isNull()), 1).otherwise(0))

# Calculate session ID for each session using cumulative sum of "is_new_session" column
ts = ts.withColumn("session_id", F.sum("is_new_session").over(user_window)).orderBy(['userid', "timestamp"])
#ts.show(10)

# Calculate session duration for each session
session_duration = ts.groupBy("userid", "session_id").agg(
    F.min("timestamp").alias("start_time"),
    F.max("timestamp").alias("end_time")
).withColumn("session_duration", F.unix_timestamp("end_time") - F.unix_timestamp("start_time")).orderBy(['userid', "session_id"])


# Calculate a unique session_id across all users
session_duration = session_duration.withColumn(
    "unique_session_id",
    F.row_number().over(Window.orderBy("userid", "session_id"))
)
#session_duration.show(10)
session_duration.count()

# Determine the threshold for a long session duration (top 10 percentile)
long_session_threshold = session_duration.approxQuantile("session_duration", [0.9], 0)[0] # error can be increased for performance
log.info('long session threshold ' + str(long_session_threshold))

# Identify users with at least one long session
users_with_long_sessions = session_duration.filter(F.col("session_duration") >= long_session_threshold).select("userid").distinct()
#users_with_long_sessions.show(100)

# assert that the quantile was correctly approximated
quantile_check = session_duration.agg(
    F.count(F.when(F.col("session_duration") < long_session_threshold, F.col("session_duration"))).alias("short_sessions_sum"),
    F.count(F.when(F.col("session_duration") >= long_session_threshold, F.col("session_duration"))).alias("long_sessions_sum")
)
qcheck = quantile_check.toPandas()
qcheck['short_long_ratio'] = qcheck["short_sessions_sum"] / (qcheck["short_sessions_sum"]+qcheck["long_sessions_sum"])
qcheck['long_short_ratio'] = qcheck["long_sessions_sum"] / (qcheck["short_sessions_sum"]+qcheck["long_sessions_sum"])
assert abs(qcheck.short_long_ratio[0]-long_session_quantile)<0.001, "threshold long session quantiles were not correctly defined"

# Count the total number of users and users with at least one long session
total_users = ts.select("userid").distinct().count()
users_with_long_session_count = users_with_long_sessions.count()
users_with_long_session_count

# Calculate the proportion of users with at least one long session
proportion_with_long_sessions = users_with_long_session_count / total_users

# Print the proportion
log.info("Proportion of users with at least one long session:" + str(proportion_with_long_sessions))


In [13]:
uids = catalog.load('userid-profiles')
uids.columns

In [24]:
nb_distinct_users = tracksess.select("userid").distinct().count()
assert nb_distinct_users==len(uids['#id'].drop_duplicates())
log.info('nb_distinct_users '+str(nb_distinct_users))

In [28]:
# Convert timestamp column to timestamp datatype
ts = tracksess.withColumn("timestamp", unix_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'").cast("timestamp"))
ts.dtypes

In [33]:
dir(session)

In [52]:
%reload_kedro

In [5]:
session.run()

In [7]:
profiles = catalog.load('userid_profiles')

In [8]:
profiles

Unnamed: 0,#id,gender,age,country,registered
0,user_000001,m,,Japan,"Aug 13, 2006"
1,user_000002,f,,Peru,"Feb 24, 2006"
2,user_000003,m,22.0,United States,"Oct 30, 2005"
3,user_000004,f,,,"Apr 26, 2006"
4,user_000005,m,,Bulgaria,"Jun 29, 2006"
...,...,...,...,...,...
987,user_000996,f,,United States,"Jul 17, 2006"
988,user_000997,m,,United States,"Jan 5, 2007"
989,user_000998,m,,United Kingdom,"Sep 28, 2005"
990,user_000999,f,,Poland,"Jul 24, 2007"


In [None]:
nb_distinct_users = tracksess.select("userid").distinct().count()

In [None]:
nb_distinct_profiles = profiles.select("#id").distinct().count()

In [None]:
assert nb_distinct_users == nb_distinct_profiles

In [None]:
nb_distinct_users = tracksess.select("userid").distinct().count()
# nb_unique_users = 992
assert nb_distinct_users==len(userid_profiles['#id'].drop_duplicates())

In [9]:
session.run()