# Set up

In [1]:
!git clone https://github.com/aliswh/lastfm
!cd lastfm; pip install -r requirements.txt

Cloning into 'lastfm'...
remote: Enumerating objects: 430, done.[K
remote: Counting objects: 100% (430/430), done.[K
remote: Compressing objects: 100% (291/291), done.[K
remote: Total 430 (delta 274), reused 279 (delta 136), pack-reused 0[K
Receiving objects: 100% (430/430), 3.11 MiB | 9.97 MiB/s, done.
Resolving deltas: 100% (274/274), done.
Collecting pylast
  Downloading pylast-4.5.0-py3-none-any.whl (25 kB)
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
Collecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.8 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=85b262d84b52a880aaa3e7868c02bbb800e5634c76b995d9f59c15ca02375eb8
  Stored in directory: /root/.cache/pip/w

In [2]:
!wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
!cp gcs-connector-hadoop3-latest.jar /usr/local/lib/python3.7/dist-packages/pyspark/jars

--2022-03-01 10:19:30--  https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.13.240, 172.217.15.80, 142.250.81.208, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.13.240|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 31607894 (30M) [application/java-archive]
Saving to: ‘gcs-connector-hadoop3-latest.jar’


2022-03-01 10:19:30 (78.5 MB/s) - ‘gcs-connector-hadoop3-latest.jar’ saved [31607894/31607894]



In [3]:
import json
import datetime

import pyspark
from pyspark.sql import SparkSession
from py4j.java_gateway import UserHelpAutoCompletion
from lastfm.src.ingestion_layer.googlestorage import *
from lastfm.src.ingestion_layer.pylastsource import *
from lastfm.src.ingestion_layer.config import *
from lastfm.src.ingestion_layer.batchwriter import *
from lastfm.src.ingestion_layer.pysparkreader import *

# PySpark Context onto Google Cloud Storage

In [4]:
spark = SparkSession.builder.appName('A4BD Project').getOrCreate()
sc = pyspark.SparkContext.getOrCreate()
reader = PySparkReader(sc)
source = PyLastSource(AUTH_DATA) # for tests

In [5]:
storage = GoogleStorageJSON('./lastfm/src/ingestion_layer/creds.json',BUCKET_NAME)
writer = BatchWriter(source, storage)
# writer.write('test', SEED_USER, 3, 30, debug=True)

# Read from storage

In [6]:
recent_tracks_rdd = reader.read('test/recent_tracks',dir=True)
recent_tracks_rdd.count()

3

In [7]:
tracks_rdd = reader.read('test/tracks',dir=True)
tracks_rdd.count()

85

In [8]:
artists_rdd = reader.read('test/artists',dir=True)
artists_rdd.count()

60

# User features

1. Extract listening sessions from users:
  * list of concatenated songs with a play events +-10 sec after the actual song completed
2. Know statistics about these sessions:
  * average number of tracks
  * average session per user
  * session lengths

## Extract listening sessions

Preprocessing

In [9]:
def to_datetime(x):
  timestamp_date = lambda ts: datetime.fromtimestamp(int(ts))
  date_date = lambda ts: datetime.strptime(ts, '%d %b %Y, %H:%M')
  x[1]['date'] = date_date(x[1]['date'])
  x[1]['timestamp'] = timestamp_date(x[1]['timestamp'])
  return x

sessions = recent_tracks_rdd.map(lambda x: (x['user'], x['recent_tracks'])) \
                            .flatMap(lambda x: map(lambda val: (x[0], val), x[1])) \
                            .map(lambda x: to_datetime(x))
                            
sessions.take(1)

[('kurtphyre',
  {'album': "She Ain't Here: A Tribute to R.L. Burnside",
   'artist': 'The Juke Joint Highball',
   'date': datetime.datetime(2022, 2, 21, 18, 20),
   'id': '7605616a40ba65675bc43e4c3eb6954eaecfc420',
   'timestamp': datetime.datetime(2022, 2, 21, 18, 20, 35),
   'title': "Goin' Down South"})]

We keep only the `timestamp` because it gives more information w.r.t. `date`, which doesn't include the seconds.

Compute the elapsed time between each song on the sessions as the difference between timestamps (in milliseconds). Keep only the minimal value and the title and artist of the first song: this way, we get the track and the *time it passed from its starting time till the next track*.



In [10]:
def timestamp_to_delta(ts1, ts2):
  return int((ts2 - ts1).total_seconds() * 1000)

sessions = sessions.join(sessions) \
                   .filter(lambda x: x[1][0]['date'] < x[1][1]['date']) \
                   .map(lambda x: ((x[0], x[1][0]['title'], x[1][0]['artist'], x[1][0]['timestamp']), timestamp_to_delta(x[1][0]['timestamp'], x[1][1]['timestamp']))) \
                   .reduceByKey(lambda x,y: min(x,y))

sessions.take(1)  

[(('kurtphyre',
   "Goin' Down South",
   'North Mississippi Allstars',
   datetime.datetime(2022, 2, 21, 18, 19, 57)),
  6000)]

In [11]:
sessions = sessions.map(lambda x: ((x[0][1], x[0][2]), (x[0][0], x[0][3], x[1])))
sessions.take(1)

[(("Goin' Down South", 'North Mississippi Allstars'),
  ('kurtphyre', datetime.datetime(2022, 2, 21, 18, 19, 57), 6000))]

Now take the duration of each track to get, in the end, the difference between the elapsed time between two tracks and the duration of the first: 
* if the difference is `0`, it means the track has been played and another track started immediately after
* if the difference is `< 0`, it means the track has not been played in full
* if the difference is `> 0`, it means that some time passed after the track has been played in full, before getting to the next song. *This is what we are looking for.* If this value is bigger than some threshold, we consider the session finished. 

In [12]:
tr_rdd = tracks_rdd.map(lambda x: ((x['title'],x['artist']),x['duration']))
tr_rdd.take(10)

[(('Folsom Prison Blues  - January 1968)', 'Johnny Cash'), 0),
 (("La Chucha / Goin' Down South", 'Tres Radio Express Service'), 404000),
 (("Goin' Down South", 'North Mississippi Allstars'), 370000),
 (('Love Is Stronger', 'Ifi Ude'), 184000),
 (('This Ole House  - January 1968)', 'Johnny Cash'), 0),
 (('Tańczyć', 'Rozen'), 237000),
 (("Goin' Down South (feat. Russ Schulz)", 'Above & Beyond'), 0),
 (('Najpiękniejszy Koniec Świata', 'szymonmówi'), 184000),
 (('Requiem: Pie Jesu', 'Jeno Jando'), 0),
 (('Do Przodu', 'KASHELL'), 209000)]

In [13]:
sessions_tracks = sessions.join(tr_rdd)\
                          .mapValues(lambda x: (x[0][0], x[0][1], x[1], x[0][2]))
sessions_tracks.take(10) # sx from listening session, dx from track

[(("Goin' Down South", 'North Mississippi Allstars'),
  ('kurtphyre', datetime.datetime(2022, 2, 21, 18, 19, 57), 370000, 6000)),
 (('WSZYSTKO OK?', 'Rubens'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 22, 32, 58), 0, 250000)),
 (('Go All The Way', 'The Raspberries'),
  ('abernes', datetime.datetime(2021, 8, 20, 17, 2, 9), 199000, 707199000)),
 (('Turandot: Nessun dorma', 'Thomas Harper'),
  ('abernes', datetime.datetime(2021, 2, 22, 23, 8, 48), 0, 135000)),
 (('Blizny', 'Swiernalis'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 22, 17, 57), 210000, 211000)),
 (('Leaving Earth', 'Clint Mansell'),
  ('abernes', datetime.datetime(2021, 6, 1, 1, 22, 25), 122000, 160000)),
 (('The Old Spinning Wheel  - January 1968)', 'Johnny Cash'),
  ('kurtphyre', datetime.datetime(2022, 2, 21, 16, 28, 54), 0, 88000)),
 (('Before I died', 'Bass Astral'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 23, 37, 12), 374000, 375000)),
 (('Blue Suede Shoes  - January 1968)', 'Johnny Cash'),
  ('kur

In [14]:
tr_rdd.filter(lambda x: x[1] == 0).take(3)

[(('Folsom Prison Blues  - January 1968)', 'Johnny Cash'), 0),
 (('This Ole House  - January 1968)', 'Johnny Cash'), 0),
 (("Goin' Down South (feat. Russ Schulz)", 'Above & Beyond'), 0)]

**Problem**: missing duration for some tracks

**Fix**: take max duration from the tracks by that artist in the listening session to fill missing values.

This could still produce some imprecision, but we need to put a patch.

In [15]:
artist_max = sessions_tracks.map(lambda x: (x[0][1], x[1][2])) \
                            .reduceByKey(lambda x,y: max(x,y))
artist_max.take(10)

[('Clint Mansell & Sam Hulick', 0),
 ('Los Telez', 275000),
 ('Thomas Harper', 0),
 ('Rubens', 0),
 ('Bass Astral', 374000),
 ('North Mississippi Allstars', 370000),
 ('Russian State Symphony Orchestra', 105000),
 ('The Fruitcakes', 202000),
 ('Bobby Hutcherson', 425000),
 ('Rozen', 237000)]

Some artist still don't have a duration value, but if they don't have it in the recent tracks, maybe they have in their top tracks! 

This could still not work, so we put a default value, like the average of all values for the tracks listened by that user in the recent tracks, because we presume that a user usually listens to songs of similar length.

In [16]:
artist_avg = artist_max.filter(lambda x: x[1] != 0).map(lambda x: (x[1],1)).reduce(lambda x,y: ((x[0]+y[0]),(x[1]+y[1])) )
artist_avg = artist_avg[0] // artist_avg[1]
artist_avg

253421

In [17]:
artist_max = artists_rdd.map(lambda x: (x['artist'], max([int(track['duration']) for track in x['top_tracks'] ]) )) \
                        .join(artist_max) \
                        .mapValues(lambda x: x[1] if x[1] != 0 else artist_avg)
artist_max.take(10)

[('Thomas Harper', 253421),
 ('Maja Koman', 222000),
 ('Blue Swede', 174000),
 ('Ladaco', 253421),
 ('Sean Rowe', 189000),
 ('Jeno Jando', 253421),
 ('Bass Astral', 374000),
 ('Budapest Haydn Quartet', 253421),
 ('JuCho', 243000),
 ('Russian State Symphony Orchestra', 105000)]

Now merge duration per track in `tr_rdd`.

In [18]:
fixed_tr_rdd = tr_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))) \
                     .join(artist_max) \
                     .map(lambda x: ((x[1][0][0],x[0]), (x[1][0][1], x[1][1]))) 
fixed_tr_rdd.take(10)

[(('Spirit in the Sky', 'Norman Greenbaum'), (240000, 240000)),
 (('Nunca Es Suficiente', 'Los Ángeles azules, Natalia Lafourcade'),
  (0, 253421)),
 (('Do Przodu', 'KASHELL'), (209000, 209000)),
 (("Goin' Down South", 'Bobby Hutcherson'), (425000, 425000)),
 (("La Chucha / Goin' Down South", 'Tres Radio Express Service'),
  (404000, 404000)),
 (('Agathy', 'Runforrest'), (282000, 282000)),
 (('Felicidad', 'Los Socios Del Ritmo'), (197000, 218000)),
 (('Llorar', 'Los Socios Del Ritmo'), (218000, 218000)),
 (('Profet', 'Nanga'), (0, 253421)),
 (('Todo Me Gusta De Ti', 'Aarón Y Su Grupo Ilusión'), (243000, 243000))]

In [19]:
fixed_tr_rdd = fixed_tr_rdd.mapValues(lambda x: x[0] if x[0] != 0 else x[1])
fixed_tr_rdd.take(10)

[(('Spirit in the Sky', 'Norman Greenbaum'), 240000),
 (('Nunca Es Suficiente', 'Los Ángeles azules, Natalia Lafourcade'), 253421),
 (('Do Przodu', 'KASHELL'), 209000),
 (("Goin' Down South", 'Bobby Hutcherson'), 425000),
 (("La Chucha / Goin' Down South", 'Tres Radio Express Service'), 404000),
 (('Agathy', 'Runforrest'), 282000),
 (('Felicidad', 'Los Socios Del Ritmo'), 197000),
 (('Llorar', 'Los Socios Del Ritmo'), 218000),
 (('Profet', 'Nanga'), 253421),
 (('Todo Me Gusta De Ti', 'Aarón Y Su Grupo Ilusión'), 243000)]

Finally compute the listening sessions.

In [20]:
sessions_tracks = sessions.join(fixed_tr_rdd)\
                          .mapValues(lambda x: (x[0][0], x[0][1], x[1], x[0][2]))
sessions_tracks.take(10) # sx from listening session, dx from track

[(('WSZYSTKO OK?', 'Rubens'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 22, 32, 58), 253421, 250000)),
 (('Jackson  - January 1968)', 'Johnny Cash'),
  ('kurtphyre', datetime.datetime(2022, 2, 21, 16, 37, 17), 253421, 192000)),
 (('Muszę Iść Tam Sam', 'Kamil Hussein'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 22, 43, 57), 219000, 221000)),
 (('Nie wyjadę', 'Rozen'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 23, 33, 31), 219000, 221000)),
 (('Tańczyć', 'Rozen'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 22, 58, 41), 237000, 239000)),
 (('Love Is Stronger', 'Ifi Ude'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 22, 29, 52), 184000, 186000)),
 (('Love Is Stronger', 'Ifi Ude'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 23, 57, 2), 184000, 186000)),
 (('Agathy', 'Runforrest'),
  ('Gunthar666', datetime.datetime(2022, 2, 20, 23, 18, 25), 282000, 284000)),
 (("Goin' Down South", 'Bobby Hutcherson'),
  ('kurtphyre', datetime.datetime(2022, 2, 21, 18, 10, 31)

In [21]:
to_min = lambda x: divmod(x//1000, 60)
to_date_string = lambda x: x.strftime("%Y-%m-%d, %H:%M:%S")

sessions_tracks = sessions_tracks.mapValues(lambda x: (x[0], (x[2]-x[3])//1000, to_min(x[2]), to_date_string(x[1]))) \
                                 .groupBy(lambda x: x[1][0]) \
                                 .map(lambda x : (x[0], list(x[1])))
sessions_tracks.take(1)

[('abernes',
  [(('Hooked on a Feeling', 'Blue Swede'),
    ('abernes', -21, (2, 54), '2021-08-20, 16:58:54')),
   (('Hooked on a Feeling', 'Blue Swede'),
    ('abernes', 65, (2, 54), '2021-06-01, 01:31:48')),
   (('Yo Soy Tu Maestro', 'Los Telez'),
    ('abernes', 28, (4, 35), '2021-03-02, 02:16:47')),
   (('Turandot: Nessun dorma', 'Thomas Harper'),
    ('abernes', 118, (4, 13), '2021-02-22, 23:08:48')),
   (('Nunca Es Suficiente', 'Los Ángeles azules, Natalia Lafourcade'),
    ('abernes', 56, (4, 13), '2021-03-02, 02:10:15')),
   (('Requiem: Pie Jesu', 'Jeno Jando'),
    ('abernes', -23, (4, 13), '2021-02-22, 22:22:55')),
   (('III. Notturno: Andante', 'Budapest Haydn Quartet'),
    ('abernes', -614423, (4, 13), '2021-02-22, 23:21:13')),
   (('Llorar', 'Los Socios Del Ritmo'),
    ('abernes', 25, (3, 38), '2021-03-02, 02:33:31')),
   (('Jalwa', 'Daler Mehndi'),
    ('abernes', -6966875, (4, 50), '2021-06-01, 01:39:29')),
   (('Enigma Variations: Nimrod', 'Bournemouth Symphony Orches

if the difference is > 0, it means that some time passed after the track has been played in full, before getting to the next song. This is what we are looking for. If this value is bigger than some threshold, we consider the session finished.

If the song has a value `> 0 + threshold` (let's say 15 seconds), it means that it is the last song of the listening session, so from the next song we create a new listening session.

In [22]:
threshold = 15 # seconds

def get_sessions_per_user(x):
  #all_tracks = sorted(x[1], key=lambda x: x[1][3])
  all_tracks = x[1]
  sessions = []
  temp = []
  for track in all_tracks:
    temp.append(track)
    if track[1][1] > threshold:
      sessions.append(temp)
      temp = []
  return sessions

user_sessions = sessions_tracks.map(lambda x: (x[0],get_sessions_per_user(x)))
user_sessions.take(20)

[('abernes',
  [[(('Hooked on a Feeling', 'Blue Swede'),
     ('abernes', -21, (2, 54), '2021-08-20, 16:58:54')),
    (('Hooked on a Feeling', 'Blue Swede'),
     ('abernes', 65, (2, 54), '2021-06-01, 01:31:48'))],
   [(('Yo Soy Tu Maestro', 'Los Telez'),
     ('abernes', 28, (4, 35), '2021-03-02, 02:16:47'))],
   [(('Turandot: Nessun dorma', 'Thomas Harper'),
     ('abernes', 118, (4, 13), '2021-02-22, 23:08:48'))],
   [(('Nunca Es Suficiente', 'Los Ángeles azules, Natalia Lafourcade'),
     ('abernes', 56, (4, 13), '2021-03-02, 02:10:15'))],
   [(('Requiem: Pie Jesu', 'Jeno Jando'),
     ('abernes', -23, (4, 13), '2021-02-22, 22:22:55')),
    (('III. Notturno: Andante', 'Budapest Haydn Quartet'),
     ('abernes', -614423, (4, 13), '2021-02-22, 23:21:13')),
    (('Llorar', 'Los Socios Del Ritmo'),
     ('abernes', 25, (3, 38), '2021-03-02, 02:33:31'))],
   [(('Jalwa', 'Daler Mehndi'),
     ('abernes', -6966875, (4, 50), '2021-06-01, 01:39:29')),
    (('Enigma Variations: Nimrod', 'Bou

## Statistics about listening sessions

average number of tracks

average session per user

session lengths

In [23]:
def avg_tracks(x):
  r,c = 0, 1
  for y in x:
    r += len(y)
    c += 1
  return round(r/c, 3)

user_avg_tracks = user_sessions.map(lambda x: (x[0], avg_tracks(x[1])))
user_avg_tracks.take(10)

[('abernes', 1.833), ('kurtphyre', 1.471), ('Gunthar666', 4.0)]

In [24]:
def avg_session(x):
  r,c = 0, 1
  for y in x:
    r += len(x)
    c += 1
  return round(r/c, 3)

user_avg_session = user_sessions.map(lambda x: (x[0], avg_session(x[1])))
user_avg_session.take(10)

[('abernes', 10.083), ('kurtphyre', 15.059), ('Gunthar666', 2.25)]

In [25]:
to_sec = lambda x: x[0]*60+x[1]

def avg_session_len(x):
  r,c = 0, 1
  for y in x:
    r += to_sec(y[0][1][2])
    c += 1
  return round(r/c, 3)

user_avg_session_len = user_sessions.map(lambda x: (x[0], avg_session_len(x[1])))
user_avg_session_len.take(10)

[('abernes', 217.833), ('kurtphyre', 243.471), ('Gunthar666', 179.75)]

# Save results to Data Lake

In [26]:
user_sessions_collect = user_sessions.collect()

user_sessions_stats = {
    'user_avg_tracks': user_avg_tracks.collect(),
    'user_avg_session': user_avg_session.collect(),
    'user_avg_session_len' : user_avg_session_len.collect()
    }

In [28]:
dest_path = 'listening_sessions' + '/'
ls_path = dest_path + 'listening_sessions' + '/'
ls_stats_path = dest_path + 'listening_sessions_statistics' + '/'

for key,value in user_sessions_stats.items():
  storage.write(value, ls_stats_path+f"listening_sessions_{key}")

for user in user_sessions_collect:
  storage.write(user, ls_path+f"listening_sessions_{user[0]}")