In [1]:
from typing import TypedDict, Type, Any, Callable

import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)

import pandas as pd
import numpy as np
from matplotlib import pyplot as plt

from spark.config import views
from spark.create_session import create_session

from IPython.display import display

from fitter import Fitter, get_common_distributions

## 1. Pobranie danych

In [2]:
VIEWS = views("stolen")
spark = create_session()

for view, file in VIEWS.items():
    df = spark.read.json(file)
    df.createOrReplaceTempView(view)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/27 23:36:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/27 23:36:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/12/27 23:36:13 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

Podgląd:

In [3]:
sessions = spark.sql(f"SELECT DISTINCT user_id, track_id FROM sessions WHERE event_type like 'LIKE' order by user_id, track_id").toPandas()
tracks = spark.sql(
    f"SELECT DISTINCT id, id_artist, acousticness, danceability, duration_ms, energy, instrumentalness, key, liveness, loudness, popularity, EXTRACT(year from `release_date`) as release_year, speechiness, tempo, valence FROM tracks ").toPandas()

                                                                                

In [4]:
display(sessions)
display(tracks)

Unnamed: 0,user_id,track_id
0,101,08dATSKGXhGASauLBtCoO8
1,101,09jtIFItoNKnC86zlzBZ29
2,101,0FDjpGYB8iVHXZiWY7E4AM
3,101,0KAaslGdPc5I6WxmKe3whe
4,101,0NWPxcsf5vdjdiFUI8NgkP
...,...,...
1402213,20100,7r1i1TZUGZQDxR5QHX4Mmx
1402214,20100,7svwP4tC0UYJbCkiCo6Itz
1402215,20100,7tVQg3ov9G0CnXTzqmZVsZ
1402216,20100,7yC5SaMeZJfvFL6lICCulP


Unnamed: 0,id,id_artist,acousticness,danceability,duration_ms,energy,instrumentalness,key,liveness,loudness,popularity,release_year,speechiness,tempo,valence
0,1d3KXNYriNnjSdBcTBeam8,3TVifQ5FPcIzzcYSUuJkp9,0.648000,0.301,408000,0.268,0.000000,2,0.0638,-17.810,13,1977,0.0638,177.468,0.275
1,79pNwy5Q95QfeGUyGNXx11,3BCJyAgxvYyeIjQyoBU8XL,0.237000,0.462,336000,0.343,0.021400,9,0.0935,-15.477,10,1983,0.0287,103.597,0.342
2,6OoGZwmtpHuG6FIVkGjfKW,25uGmqV7NJt81bSYlEMKB0,0.092400,0.456,213467,0.932,0.004540,2,0.3220,-7.088,15,1979,0.0804,185.954,0.603
3,6aLmvz0CPeCNHCXK2H5QIC,3vbKDsSS70ZX9D2OcvbZmS,0.303000,0.611,343360,0.597,0.008200,0,0.0605,-7.864,56,1999,0.0764,147.507,0.385
4,63tInNaRRc44t4vxCQ65JA,5ujwfg1AKpM7CGPZhHxs22,0.798000,0.331,247987,0.293,0.000058,2,0.1030,-14.062,12,1980,0.0288,105.270,0.231
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
129643,7bwPQVVelqsawx9pA5gAwF,1owt6WYWjy94FlqNcj1x4U,0.907000,0.303,224160,0.292,0.841000,7,0.1760,-10.707,18,2004,0.0308,64.724,0.277
129644,3Ad4Vd2MLBBpApduPsydXk,32vWCbZh0xZ4o9gkz4PsEU,0.845000,0.536,165533,0.274,0.000089,0,0.1180,-10.758,40,1974,0.0281,86.915,0.462
129645,4z5eRXrNKYAhjtuJA49REb,5vngPClqofybhPERIqQMYd,0.423000,0.793,164133,0.544,0.000210,6,0.0830,-11.928,49,1992,0.0265,105.537,0.942
129646,0JiY190vktuhSGN6aqJdrt,1KCSPY1glIKqW2TotWuXOR,0.000329,0.534,215160,0.870,0.000000,11,0.2410,-3.078,78,2008,0.0425,126.019,0.462


Sesje scalone z atrybutami utworów

In [5]:
d = spark.sql(
    """
    SELECT s.user_id, s.track_id, s.weight, acousticness, danceability, duration_ms, energy, instrumentalness, key, liveness, loudness, popularity, EXTRACT(year from `release_date`) as release_year, speechiness, tempo, valence
    FROM (
        select user_id, track_id, sum(event_weight) as weight
        from (
            SELECT user_id, track_id, 1 as event_weight
            FROM sessions
            WHERE event_type like 'LIKE'
            ) 
        group by user_id, track_id
    ) s
    inner join tracks t on s.track_id = t.id
    order by s.user_id, t.id
    """).toPandas()
d

                                                                                

Unnamed: 0,user_id,track_id,weight,acousticness,danceability,duration_ms,energy,instrumentalness,key,liveness,loudness,popularity,release_year,speechiness,tempo,valence
0,101,08dATSKGXhGASauLBtCoO8,1,0.882000,0.799,166693,0.559,0.381000,0,0.0996,-4.033,1,1944,0.0353,106.396,0.696
1,101,09jtIFItoNKnC86zlzBZ29,1,0.000480,0.509,254840,0.796,0.460000,2,0.1610,-11.728,34,1983,0.0359,143.664,0.652
2,101,0FDjpGYB8iVHXZiWY7E4AM,1,0.011000,0.503,214107,0.864,0.003820,5,0.5990,-6.829,34,1978,0.1620,154.979,0.551
3,101,0KAaslGdPc5I6WxmKe3whe,1,0.000101,0.169,314560,0.949,0.618000,7,0.2970,-10.596,41,1990,0.1140,159.678,0.147
4,101,0NWPxcsf5vdjdiFUI8NgkP,1,0.006030,0.346,210160,0.768,0.380000,9,0.0244,-5.695,72,1967,0.0377,169.492,0.532
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1402213,20100,7r1i1TZUGZQDxR5QHX4Mmx,1,0.002940,0.570,329979,0.777,0.181000,7,0.0477,-6.537,47,1987,0.0270,107.982,0.487
1402214,20100,7svwP4tC0UYJbCkiCo6Itz,1,0.062300,0.457,213000,0.915,0.000003,3,0.0778,-5.876,19,1976,0.0808,126.695,0.446
1402215,20100,7tVQg3ov9G0CnXTzqmZVsZ,1,0.242000,0.247,671760,0.742,0.370000,2,0.9680,-12.678,24,1978,0.0571,143.533,0.466
1402216,20100,7yC5SaMeZJfvFL6lICCulP,1,0.150000,0.417,176000,0.667,0.000000,4,0.2170,-6.404,21,1978,0.0367,131.796,0.556


## 2. Model dla pojedynczego użytkownika

Przygotowanie danych dla modelu

In [6]:
from scipy import stats

# tylko ID
users = d['user_id']
items = d['track_id']
feature_names = d.drop(['user_id', 'track_id', 'weight'], axis=1).columns  # todo znormalizować

In [7]:
from lightfm.cross_validation import random_train_test_split
from model.recommender import Recommender
import model.recommender

import importlib
importlib.reload(model)

model = Recommender(
    users=users,
    items=tracks['id'],
    item_features=tracks.drop('id', axis=1),
    item_feature_names=feature_names
)

(interactions, weights) = model._dataset.build_interactions(d[['user_id', 'track_id']].apply(tuple, axis=1))
(train, test) = random_train_test_split(interactions)
model.set_interactions(train)

num_users, num_items = model._dataset.interactions_shape()
print('Num users: {}, num_items {}.'.format(num_users, num_items))

Num users: 20000, num_items 129648.


Trenowanie modelu

In [8]:
model.fit(
    interactions=train,
    loss='warp',
    learning_rate=0.003,
    no_components=30,
    epochs=5,
    num_threads=12
)

Epoch: 100%|██████████| 5/5 [00:36<00:00,  7.32s/it]


Ewaluacja modelu

In [9]:
model.evaluate_submodel(test_set=test, train_set=train)

AUC: train 0.982875, test 0.982170.
Precision: train 0.197030, test 0.083316.
Recall: train 0.051661, test 0.083676.
Reciprocal rank: train 0.309651, test 0.178934.


## 3. Model dla wielu użytkowników

Uwaga: ponieważ nie mamy jeszcze embeddingów dla użytkowników, musimy przetrenować model na całym zbiorze
https://stats.stackexchange.com/questions/258559/interpreting-results-of-lightfm-factorization-machines-for-collaborative-filter
https://stackoverflow.com/questions/45451161/evaluating-the-lightfm-recommendation-model

In [39]:
from lightfm.cross_validation import random_train_test_split
from model.recommender import Recommender
import model.recommender

model1 = Recommender(
    users=users,
    items=tracks['id'],
    item_features=tracks.drop('id', axis=1),
    item_feature_names=feature_names
)
model1.set_interactions(interactions)

In [40]:
# rate 01 epochs 20
model1.fit(
    interactions=interactions,
    loss='warp',
    learning_rate=0.1,
    no_components=30,
    epochs=20,
    num_threads=12
)

Epoch: 100%|██████████| 20/20 [01:25<00:00,  4.26s/it]


Generowanie rekomendacji dla trzech użytkowników:

In [41]:
def _recommend_multiple(users, number, num_threads=1):
    indices = model1._predict_multiple_indices(users, number, num_threads)

    return model1._in_items.loc[indices]
model1.recommend_multiple = _recommend_multiple
model1.recommend_multiple(users=[123, 124, 125], number=10000, num_threads=12)  # todo magic numbers

98304     1vqU9vERXHl25q2w7wikzu
40961     78JmElAFmrPNhLjovDR9Jm
122882    59On35IawOw3kdsUv6wiu6
32775     6exfsW3tiYeKR8tV5AUPlb
65544     1fSdAkbofT03q7ddO2jdss
                   ...          
49140     7oNg2UTfQIs0IafmyzRc0g
81910     2SrVGpjv0mqCR6j9EZPSm2
40951     56Zbt8YXJgkeoVMdQLpfaW
90106     5u99eyhU43QoLeXXG6caoL
49150     5HDvQApGjUbbIMqRBeRm2e
Name: id, Length: 4649, dtype: object

Ewaluacja - porównanie klasteryzacji

In [42]:
def _recommend_single(user: int, number: int, num_threads=1):
    indices = model1._predict_single_indices(user, number, num_threads=num_threads)
    return model1._in_items.loc[indices]
model1.recommend_single = _recommend_single

In [43]:
model1.recommend_single(123, 10, 12)

12185    1daDRI9ahBonbWD8YcxOIB
72186    3ZCTVFBt2Brf31RLEnCkWJ
56607    6ocbgoVGwYJhOv1GgI9NsF
50640    127QTOFJsJQp5LbJbu3A1y
70793    2b8fOow8UzyDFAE27YhOZM
75007    7qEHsqek33rTcFNT9PFqLf
64185    7lPN2DXiMsVn7XUKtOW1CS
39347    5NU40QTlXrDUJzDBdv79bg
93000    73nAK3HgQK8dak83Y2WQ8F
78621    3JvKfv6T31zO0ini8iNItO
Name: id, dtype: object

In [44]:
model1.recommend_single(124, 10, 12)


93000     73nAK3HgQK8dak83Y2WQ8F
12185     1daDRI9ahBonbWD8YcxOIB
60449     6i0V12jOa3mr6uu4WYhUBr
119206    2iUXsYOEPhVqEBwsqP70rE
50640     127QTOFJsJQp5LbJbu3A1y
75007     7qEHsqek33rTcFNT9PFqLf
75831     5E30LdtzQTGqRvNd7l6kG5
56607     6ocbgoVGwYJhOv1GgI9NsF
39347     5NU40QTlXrDUJzDBdv79bg
129342    158ukl5rih76dOEsp0I91H
Name: id, dtype: object

Dla modelu:

In [45]:
indices = model1._predict_multiple_indices(users=[123, 124, 125], number=50, num_threads=12)  # todo magic numbers
print(len(indices))
model1.measure_clustering(indices)

28


0.8429495

Dla losowego próbkowania historii sesji użytkowników:

In [46]:
filtered = pd.concat(
    (d.loc[d['user_id'] == 123], d.loc[d['user_id'] == 124], d.loc[d['user_id'] == 125])).drop_duplicates(subset=['track_id']).sample(20)
tracks_extended = pd.merge(tracks, filtered, how='inner', left_on='id', right_on='track_id', validate="1:1")
sampled_indices = tracks[tracks.id.isin(tracks_extended['track_id'])].index
model1.measure_clustering(sampled_indices)

2.172852

Dla w pełni losowego wyboru:

In [47]:
random_indices = np.random.randint(low=0, high=len(model1._model.get_item_representations()[1]), size=20)
model1.measure_clustering(random_indices)

1.3292798