In [None]:
import pandas as pd
import numpy as np
import tqdm
import json
from pyspark.sql import SparkSession
import pyspark.sql.functions as spf
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import pairwise_distances

In [None]:
spark = SparkSession.builder.appName("dnikanorova").getOrCreate()

In [None]:
sc

## User-based Collaborative Filtering

#### Основная идея: 
Рекомендовать пользователю треки, которые понравились похожим на него пользователям

$$\hat r_{ui} = h^{-1} \left( \frac{\sum_{v \in N_i(u)} w_{uv} h(r_{vi})}{\sum_{v \in N_i(u)} w_{uv}} \right)$$

$N_i(u)$ - соседи пользователя $u$, которые оценили айтем $i$, 
$w_{uv}, w_{ij}$ - веса соседей, 
$h$ - функция нормализации



**Нормализация**: В качестве функции нормализации используем среднее время прослушивания

**Веса**: Похожих пользователей будем искать по *cosine similarity*

**Отсутствующие данные** заполним средним времнем прослушивания по пользователю


In [None]:
DATA_DIR = "../data/"

In [None]:
data = spark.read.json("/user/dnikanorova/data/top_pop_50k/")

data.printSchema()

In [None]:
df = (data
          .withColumn("rating", spf.when(spf.col("time") > 0.5, 1.0).otherwise(0.0))
          .filter(spf.col("experiments.TOP_POP").isin("T3"))
          .select(spf.col("user"), spf.col("time"), spf.col("recommendation").cast("int").alias("track"))
          .filter(spf.col("track").isNotNull())
          .dropDuplicates(["user", "track"])
          .toPandas()
     )
df.head()

### Подготовка данных
На этом этапе соберем несколько вспомогательных датасетов:

1) ***norm*** - датасет с нормализованными данными

2) ***interactions_raw*** - матрица взаимодействий user-item 

3) ***interactions*** - матрица взаимодействий user-item с заполненными значениями

4) ***user_similarity_cosine*** - матрица похожести пользователей

5) ***sim_user_30_u*** - топ-30 ближаших соседей для пользователя

6) ***tracks_by_user*** - треки, прослушанные пользователями

In [None]:
# Adjust time by substracting mean value
def normalize(df, group_col, value_col): 
    df['avg'] = df.groupby(group_col)[value_col].transform('mean')
    df['time_adj'] = df[value_col] - df['avg']
    return df


# Create user-item interaction matrix and replace NaN by user or item average
def create_interactions(
    df, 
    value_col='time_adj',
    user_col="user" ,
    item_col="track"
):

    interactions = pd.pivot_table(df, values=value_col, index=user_col, columns=item_col)
    
    print("Interaction matrix consists of {} users and {} items".format(interactions.shape[0], interactions.shape[1]))
    return interactions
  

def fill_na(interactions, user_based=False):
    return interactions.fillna(0)


# Calculate cosine similarity
def calculate_cosine_similarity(interactions, user_based=False):
    if user_based:
        interactions_t = interactions.copy()
    else:
        interactions_t = interactions.T
        
    similarity_matrix = cosine_similarity(interactions_t)
    np.fill_diagonal(similarity_matrix, 0 )
    similarity_df = pd.DataFrame(similarity_matrix,index=interactions_t.index)
    similarity_df.columns=interactions_t.index
    return similarity_df

# find most similar users
def find_n_neighbours(df,n):
    order = np.argsort(df.values, axis=1)[:, :n]
    df = df.apply(lambda x: pd.Series(x.sort_values(ascending=False)
           .iloc[:n].index, 
          index=['top{}'.format(i) for i in range(1, n + 1)]), axis=1)
    return df

# check interests
def get_user_similar_movies( user1, user2 ):
    common_tracks = df[df.user == user1].merge(
    df[df.user == user2],
    on = "track",
    how = "inner" )
    return common_tracks

# collect tracks listened by users
def collect_tracks_by_user(df):
    df = df.astype({"track": str})
    tracks_by_user = df.groupby(by = 'user')['track'].apply(lambda x:','.join(x))
    return tracks_by_user
    

In [None]:
norm = normalize(df, value_col="time", group_col="user")

In [None]:
interactions_raw = create_interactions(norm)
interactions = fill_na(interactions_raw, user_based = False)

In [None]:
%%time
user_similarity_cosine = calculate_cosine_similarity(interactions, user_based=True)
sim_user_30_u = find_n_neighbours(user_similarity_cosine,30)

In [None]:
tracks_by_user = collect_tracks_by_user(df)

### Построение рекомендаций
На этом этапе рассчитаем скоры айтемов по формуле 

$$\hat r_{ui} = h^{-1} \left( \frac{\sum_{v \in N_i(u)} w_{uv} h(r_{vi})}{\sum_{v \in N_i(u)} w_{uv}} \right)$$

$N_i(u)$ - соседи пользователя $u$, которые оценили айтем $i$, 
$w_{uv}, w_{ij}$ - веса соседей, 
$h$ - функция нормализации

In [None]:
# score tracks
def score_tracks(
    user,
    tracks_by_user, 
    interactions_raw,
    interactions,
    similar_users_by_user,
    norm
):
    # get tracks already listened by user
    tracks_listen_by_user = interactions_raw.loc[user, :].dropna().index.tolist()
    
    # get similar users
    similar_users = similar_users_by_user.loc[user, :].values.tolist()
    
    # get tracks from similar users
    tracks_of_similar_users = tracks_by_user[tracks_by_user.index.isin(similar_users)]
    all_tracks_of_similar_users = list(map(int, ','.join(tracks_of_similar_users.values).split(',')))
    
    # only take tracks that were not listened by a user
    tracks_under_consideration = list(set(all_tracks_of_similar_users) - set(tracks_listen_by_user))
    
    scores=[]
    for item in tracks_under_consideration:
        score = score_track(item, interactions_raw, similar_users, norm)
        scores.append(score)
        
    top = np.array(tracks_under_consideration)[np.argsort(scores)[-100:]]

    return top


def score_track(item, interactions_raw, similar_users, norm):
    
    item_ratings_by_similars = interactions_raw.loc[similar_users, item].dropna()
    similars_rated_item = item_ratings_by_similars.index.values.tolist()
    avg_user = norm.query('user == @user')['avg'].values[0]
    weights = user_similarity_cosine.loc[user, similars_rated_item]
    
    numerator = sum(item_ratings_by_similars * weights)
    denominator = weights.sum()
    score = avg_user + (numerator / denominator)
    
    return score
    

In [None]:
users = df['user'].unique()

tracks_by_user = collect_tracks_by_user(df)

with open(DATA_DIR + "recommendations_5k.json", "w") as rf:
    for user in tqdm.tqdm(users):
        top = score_tracks(user, tracks_by_user, interactions_raw, interactions, sim_user_30_u, norm)
        recommendation = {
                "user": int(user),
                "tracks": top.tolist()
        }
        rf.write(json.dumps(recommendation) + "\n")
