# Implementacion del SAR | Spark.

Armo matrices de Similaridad Item-Item y Afinidad User-Item. Los scores son el resultado de multiplicar ambas.

* [Diagrama](https://app.mural.co/invitation/mural/cabj9579/1687786596047?sender=uf8c4081d0c866c8eb2174164&key=2164d5d5-7bc5-478e-a090-da611bb616c2)

<img src="img/diagrama-sar.png">

Este notebook tiene:

* 1. Lectura de datos

* 2. Implementacion del SAR (#BUILD):
   
   > [a] Armado de indices.<br>
   > [b] Armado de diccionario auxiliar para C.<br>
   > [c] Armado de matriz de co-ocurrencia.<br>  
   > [d] Armado de matriz de similaridad.<br>   
   > [e] Armado de matriz de afinidad.<br>
   > [f] Predicciones.<br>
   
* 3. Recomendaciones:
    > [a] Items populares.<br> 
    > [b] Items dado un usuario.<br>    
    > [c] Items dado un item.<br>

* 4. Testing

## [0] Importo librerias necesarias

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, DataFrame
from pyspark.sql.functions import lit
from pyspark.sql.functions import split, explode, monotonically_increasing_id

import pandas as pd
import numpy as np
from numpy import linalg as LA
from scipy.sparse import csr_matrix

import json
import datetime
import requests
from tqdm.notebook import trange, tqdm

## [1] Lectura de datos

* Leo el df (pyspark.DataFrame) y armo un dataset como subconjunto fijo de ese df.

In [2]:
# leo el dataframe
load_options = {"table": "view_events", "keyspace": "clarovideo"}
df = spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load()

In [3]:
# muestra
df.head(5)

                                                                                

[Row(user_id=39407448, event='view', event_time=datetime.datetime(2023, 5, 14, 2, 55, 56), group_id=1039138),
 Row(user_id=77488189, event='view', event_time=datetime.datetime(2023, 5, 15, 18, 44, 18), group_id=967246),
 Row(user_id=39796339, event='view', event_time=datetime.datetime(2023, 5, 20, 17, 2, 12), group_id=929874),
 Row(user_id=80962217, event='view', event_time=datetime.datetime(2023, 5, 18, 17, 46, 7), group_id=1079184),
 Row(user_id=51166585, event='view', event_time=datetime.datetime(2023, 5, 23, 2, 46, 8), group_id=1115324)]

In [4]:
# tamaño del dataset
df.count()

                                                                                

2927260

In [5]:
# armo un dataset, como subconjunto del df 
test, train = df.randomSplit([0.20, 0.80],seed=1)

In [6]:
dataset = train
dataset.count()

                                                                                

2342413

## [2.a] Armado de índices:

* Necesito asociar group_ids y user_ids a posiciones en las matrices.
* Ademas, aprovecho para leer la metadata de los group_ids.

### Levanto los indices preexistentes


In [7]:
def getItemIndexes() -> dict:
    file = open('tmp/item_indexes.json' , mode='r' , encoding='utf8')
    content_key_str:dict = json.load(file)
    return {int(k):v for k,v in content_key_str.items()}

def getUserIndexes() -> dict:
    file = open('tmp/user_indexes.json' , mode='r' , encoding='utf8')
    content_key_str:dict = json.load(file)
    return {int(k):v for k,v in content_key_str.items()}

In [8]:
itemIndexes:dict = getItemIndexes()
userIndexes:dict = getUserIndexes()

### Agrego los usuarios e items del dataset que no estan registrados


In [9]:
def decode_interaction(interaction:pyspark.sql.types.Row) -> tuple:
    # Dada una interaccion, devuelve la tripla (user_id, group_id, event_time)    
    d = interaction.asDict()
    return d["user_id"], d["group_id"], d["event_time"]

In [14]:
def getMetadata(group_id:int) -> dict():
    
    # Dado un group id, devuelve la metadata necesaria
    # para filtrar y potenciar contenido
    
    # Elastic con la metadata
    elastic_host:str = "host_donde_esta_la_metadata"     
    url:str = f"{elastic_host}/metadata/_doc/{group_id}"

    metadata:dict = dict() # respuesta    

    try:
        resp:dict = requests.get(url=url).json()
        if not resp['found']:
            # print(group_id, "not found")
            # mandar a popular
            pass
        else:
            filtros:list = resp["_source"]["infoFiltros"] 
            metadata.update({"filtros": filtros})
    
    except Exception:
        pass
    
    return metadata

### (Version 1) Usuarios e item en el mismo recorrido

In [15]:
def read_items_and_users(dataset:DataFrame, existing_users:dict, existing_items:dict):
    
    info:dict = { "missing_metadata": set(), "ignored_users": set() }
    dataset_ls = dataset.collect()
    
    for j in trange(len(dataset_ls)):
        
        interaction = dataset_ls[j]
        user_id, group_id, event_time = decode_interaction(interaction)
        
        # Agrego nuevo usuario
        if user_id not in existing_users.keys():            
            u_index = len(existing_users) # index del nuevo usuario
            existing_users.update({user_id:u_index})
        
        else:
            info["ignored_users"].add(user_id)
            
        # Agrego nuevo item
        if group_id not in existing_items.keys():
            i_index = len(existing_items) # index del nuevo item           
            metadata = getMetadata(group_id)
            
            if metadata == dict():
                info["missing_metadata"].add(group_id)
        
            else:
                existing_items.update({
                    group_id:{
                        "index": i_index,
                        "metadata": metadata
                    }
                })
                
        pass
    
    return info

In [19]:
# info = read_items_and_users(dataset, userIndexes, itemIndexes)
# [9:20:36<254:33:38, 2.44it/s] !! (9hs para 106319 datos)

### (Version 2) Usuarios e items por separado

### Items

In [20]:
def readNewItems(dataset:DataFrame,
                 existing_items:dict) -> dict():
    
    # Dado un dataset y un diccionario con los indices de item preexistentes,
    # agrega al indice los items del dataset que no estaban previamente
    
    # informacion de los datos perdidos
    info:dict = { "missing_metadata": [] }

    set_items = set([i.group_id for i in dataset.select("group_id").collect()])
    items_lss = list(set_items)
    
    for j in trange(len(set_items)):
        itemid = items_lss[j] 
        
        if itemid not in existing_items.keys():
            
            # el index es por aparicion
            index = len(existing_items)
            
            # leo la metadata segun el group_id
            metadata = getMetadata(itemid)
            
            if metadata == dict():
                # skip
                info["missing_metadata"].append(itemid)
        
            else:
                existing_items.update({
                    itemid:{
                        "index": index,
                        "metadata": metadata
                    }
                })
        
    return info

In [27]:
# info = readNewItems(dataset, itemIndexes)

In [29]:
print("* N° de items únicos: ", len(itemIndexes.keys()))
print("* Items omitidos: ", len(info["missing_metadata"]))

* N° de items únicos:  9783
* Items omitidos:  19172




### Usuarios

In [30]:
def readNewUsers(dataset:DataFrame,
                 existing_users:dict) -> tuple:
    
    # Dado un dataset y una lista de indices de usuarios existente,
    # Agrega a esta todos los usuarios de dataset que no estan en el dict
        
    set_usuarios:set = set([u.user_id for u in dataset.select("user_id").collect()])
    usuarios_lss:list = list(set_usuarios)
    
    contador:int = 0 # cantidad de user_ids ignorados
    
    for i in trange(len(set_usuarios)):
        userid = usuarios_lss[i]
        if userid not in existing_users.keys():
            index = len(existing_users)
            existing_users.update({userid:index})
        else:
            contador += 1
    
    return set_usuarios, contador


                                                                                

In [31]:
set_usuarios, ignored_users = readNewUsers(dataset, userIndexes) 

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

  0%|          | 0/496442 [00:00<?, ?it/s]

### Guardo los indices

In [12]:
def actualizarDocsIndices(itemIndexes, userIndexes):
    # actualizo item_indexes
    with open("tmp/item_indexes.json", "w") as file:
        json.dump(itemIndexes, file, indent=2)
    
    # actualizo user_indexes
    with open("tmp/user_indexes.json", "w") as file:
        json.dump(userIndexes, file, indent=2)
    pass

In [33]:
actualizarDocsIndices(itemIndexes, userIndexes)

## [2.b] Armo un diccionario auxiliar para armar la Co-ocurrence Matrix ($C$):


### C como matriz CSR ~ Item viewers

> {item_id: [user_id, user_id, ...]}

In [10]:
def build_item_viewers(dataset:DataFrame) -> dict():
    
    # Dado un dataset, y una lista de item_ids presentes en él
    # devuelve un diccionario con { item_id: [user_id, ---, user_id] }
    # siendo esta lista los usuarios que vieron item_id en ese dataset
    
    items_viewers:dict = dict()
    dataset_ls:list = dataset.collect()
    
    for i in trange(len(dataset_ls)):
        
        interaction = dataset_ls[i]
        user_id, group_id, event_time = decode_interaction(interaction)
        
        if group_id in itemIndexes.keys():
            if group_id in items_viewers.keys():
                items_viewers[group_id].add(user_id)
            else:
                if group_id in itemIndexes.keys(): # si es un group_id valido    
                    items_viewers.update({group_id:{user_id}})
            
    return items_viewers

In [11]:
items_viewers = build_item_viewers(dataset)

                                                                                

  0%|          | 0/2342154 [00:00<?, ?it/s]

In [14]:
with open("tmp/items_viewers.json", "w") as file:
    _items_viewers = dict()
    for k, v in items_viewers.items():
        _items_viewers.update({k:list(v)})
    json.dump(_items_viewers, file, indent=2)
pass

### Como dataframe de spark ~ Users Histories

> {user_id: [item_id, item_id, ...]}

In [168]:
def build_users_histories(dataset:DataFrame) -> dict():
       
    users_histories:dict = dict()
    dataset_ls:list = dataset.collect()
    
    for i in trange(len(dataset_ls)):
        
        interaction = dataset_ls[i]
        user_id, group_id, event_time = decode_interaction(interaction)
        
        if user_id not in users_histories.keys():
            users_histories.update({user_id: [group_id]})
        else:
            user_movies:set = set(users_histories[user_id])
            user_movies.add(group_id)
            users_histories[user_id] = list(user_movies)
    
    return users_histories

In [169]:
users_histories:dict = build_users_histories(dataset)


[Stage 445:>                                                        (0 + 1) / 1]

                                                                                

  0%|          | 0/100 [00:00<?, ?it/s]

## [2.c] Co-ocurrence Matrix ($C$):

* $S$ una matriz $C \in \Re^{MxM}$, en la que $C_{ij}$="# usuarios que vieron items i y j"

In [15]:
def buildCoocurrenceMatrix(itemIndexes:dict, items_viewers:dict) -> csr_matrix:
    
    # Dado el diccionario auxiliar users_per_item y 
    # el diccionario que asocia group_id con su indice en la matriz,
    # devuelve la matriz de Coocurrencia C
    
    M:int = len(itemIndexes.keys()) # cantidad de items en el dataset y registrados
    C:csr_matrix = csr_matrix((M,M)).tolil()
        
    print("* Armando matriz C...")
    
    for i, item_i in enumerate(items_viewers.keys()):
        
        index_i:int = itemIndexes[item_i]["index"] # index del item i
        item_i_viewers:set = items_viewers[item_i] # usuarios que vieron el item i

        for j, item_j in enumerate(items_viewers.keys()):
            
            index_j:int = itemIndexes[item_j]["index"] # index del item j
            item_j_viewers:set = items_viewers[item_j] # usuarios que vieron el item j
            
            C[index_j, index_i] = len(item_j_viewers.intersection(item_i_viewers))
           
    print("* Matriz creada ✔")
    return C

In [16]:
C = buildCoocurrenceMatrix(itemIndexes, items_viewers)

* Armando matriz C...
* Matriz creada ✔


In [233]:
# Test exhaustivo de C

ikeys = list(items_viewers.keys())
for a in ikeys:
    i = itemIndexes[a]["index"]    
    for b in ikeys:
        j = itemIndexes[b]["index"]
        viewers_a = items_viewers[a]
        viewers_b = items_viewers[b]
        viewers_ab = len(viewers_a.intersection(viewers_b))        
        if int(C[i,j]) != viewers_ab:
            print(i,j)

In [172]:
def buildCoocurrenceMatrix_df(users_histories:dict) -> DataFrame:
    
    # Dado un dataset y un listado de las peliculas que vio cada usuario,  
    # devuelve la matriz de Coocurrencia C como DataFrame
    
    # No respeta los indices puestos en itemIndexes
    
    ls_users_histories:list = list()
    for user, movies in users_histories.items():
        txt = str(movies)[1:-1]
        ls_users_histories.append(txt)
        
    ddf = spark.createDataFrame(
            ls_users_histories,
            "string"
          ).toDF("C")

    long = (ddf
            .withColumn("id", monotonically_increasing_id())
            .select("id", explode(split("C", ","))))

    C = long.withColumnRenamed("col", "col_").join(long, ["id"]).stat.crosstab("col_", "col")
    
    return C

In [173]:
C_df:DataFrame = buildCoocurrenceMatrix_df(users_histories)

                                                                                

In [174]:
C_df.select('1113875').show()

+-------+
|1113875|
+-------+
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      0|
|      1|
|      0|
|      0|
|      0|
|      0|
+-------+
only showing top 20 rows



## [2.d] Item-Similarity Matrix ($S$):

* $S \in \Re^{MxM}$  es el resultado de aplicar una métrica a la matriz de co-ocurrencia C.
* Obs: Una metrica aceptada es dejar exactamente igual a C.

In [17]:
def buildSimilarityMatrix(C:csr_matrix, metric="counts") -> csr_matrix:
    
    if metric == "counts":
        return C
    
    elif metric == "jaccard":
        raise Exception("Metric not implemented")
        
    elif metric == "lift":
        raise Exception("Metric not implemented")
    
    else:
        raise Exception("Metric not implemented")
        
S = buildSimilarityMatrix(C)

In [176]:
def buildSimilarityMatrix_df(C:DataFrame, metric="counts") -> DataFrame:
    
    # Aplico una metrica para C
    
    if metric=="counts":
        return C 
    
    elif metric=="jaccard":
        raise Exception("Metric not implemented")
        
    elif metric=="lift":
        raise Exception("Metric not implemented")
    
    else:
        raise Exception("Metric not implemented")
        
S_df:DataFrame = buildSimilarityMatrix(C)

## [2.e] User-Item Affinity Matrix ($A$):

* $A_{ij} = \sum_{k \in dataset} W_k 2^{-(\frac{t_0-t_k}{T})}$,

Donde: 

- $k$ es un evento del dataset que involucra al usuario i y al item j, es decir, una visualizacion de i a j. 
- $t_0$ es el tiempo actual en una determinada unidad.
- $t_k$ es el tiempo de la interaccion / evento k.

Con parametros: 
- $T$ es un parametro en las mismas unidades de  $t_0$ y $t_k$. Los eventos T unidades antes de t_0 tienen la mitad del peso.
- $W_k$ es un parametro que indica el peso del tipo de evento de k, en nuestro caso solo tenemos "view". $W_k = 1$

Es decir, para nuestro caso puede quedar:

* $A_{ij} = \sum_{k} 2^{-(\frac{t_0-t_k}{T})}$

OBS: $A \in \Re^{N x M}$

In [18]:
def actualTimestamp() -> float:
    return datetime.datetime.now().timestamp()

In [19]:
def scoring_function(event_time:datetime.datetime, T=(17/3)*10**8, W=1) -> float:

    # Devuelve el scoring de un evento visto en la fecha pasada como argumento
    # Teniendo como parametro a la constante T 
    
    t_k = event_time.timestamp()
    t_0 = actualTimestamp()
    exp = - (t_0 - t_k) / T
    
    return W * 2 ** exp

In [20]:
def compute_affinity_scores(dataset:DataFrame,
                            verb=False) -> np.ndarray:
    
    # Dado un dataset,
    # Recorre cada evento del dataset y
    # suma su scoring en el respectivo indice de A
    
    global itemIndexes, userIndexes
    
    M = len(itemIndexes.keys())   
    N = len(userIndexes.keys())
    
    A:csr_matrix = csr_matrix((N, M)).tolil()  
    ignored_counter:int = 0
    
    for interaction in dataset.collect():
        
        user_id, group_id, event_time = decode_interaction(interaction)
        
        if group_id in itemIndexes.keys() and user_id in userIndexes.keys():
            
            index_item = itemIndexes[group_id]["index"]
            index_user = userIndexes[user_id]

            if verb:
                print(index_user, index_item, event_time, scoring_function(event_time))

            A[index_user, index_item] += scoring_function(event_time)
            # pre_existing_value = readA(index_user, index_item)
            # assign_A(scoring_function(event_time) + pre_existing_value)

        else:
            ignored_counter += 1
    
    pass
        
    return ignored_counter, A        

In [21]:
ignored_interactions, A = compute_affinity_scores(dataset)

                                                                                

## [2.f] Predicciones:

In [22]:
def predictions(A,S) -> np.ndarray:
    return A @ S

def rating(user_id:int,item_id:int) -> float:
    
    # Dado un user_id y un group_id
    # devuelve el scoring que le da el user a ese contenido
    # segun el modelo 
    
    global itemIndexes, userIndexes
    global preds 
        
    user_index = userIndexes[user_id]
    item_index = itemIndexes[item_id]["index"]
    
    return preds[user_index,item_index]

In [240]:
preds = predictions(A,S)

In [37]:
rating(user_id=2784814, item_id=770988)

46.768611047741935

## [3.a] Recomendar items populares

In [23]:
invertedItemIndexes = {v["index"]: k for k, v in itemIndexes.items()}

In [24]:
def getBestTuple(item2item:list) -> tuple:
    if len(item2item) == 0:
        raise Exception("La lista de tuplas debe tener al menos un elemento")
    else:
        _max, _index = item2item[0]
        for n_views, index in item2item[1:]:
            if n_views > _max:
                (_max, _index) = (n_views, index)
        return _max, _index

In [25]:
def getTopLMovies(preds:np.ndarray, L:int, exclude:list=[]) -> list:
    
    # devuelve las L peliculas/items mas populares exluyendo los group_ids en la lista exclude dada
    # out = [(key1, "popular"), ..., (keyL, "popular")]
    
    global itemIndexes
    global invertedItemIndexes
    
    if preds.shape[1] < L:
        raise Exception("Y tiene menos peliculas de las necesarias")
    
    item2item:list = []
    for i in trange(preds.shape[1]):       
        valoraciones = preds[:, i].toarray()
        suma_valoraciones = sum(valoraciones.flatten())
        item2item.append((suma_valoraciones, i))
        
    out:list = []
           
    while (len(out) < L):
        n_views, index = getBestTuple(item2item)
        group_id = invertedItemIndexes[index]
        
        if group_id not in exclude:
            ## aca irian los filtros por metadata de que contenido puede ser                
            out.append((group_id, "popular"))
        
        item2item.remove((n_views,index))
        
    return out

In [26]:
# getTopLMovies(preds,10, exclude=[]) # 1hs dura!!

## [3.b] Recomendar items dado un usuario

In [93]:
def recommend_k_items_to_user(preds:np.ndarray,
                              user_id:int,
                              k:int,
                              include=[],
                              enhance=[],
                              exclude=[],
                              bias=10
                              )->np.ndarray:

    global itemsIndexes, userIndexes
    
    user_index:int = userIndexes[user_id]
    ratings_user:list = list(preds[0].toarray()[0])
    
    ## filtro y enhance
    """
    for group_id in itemIndexes.keys():
    
        item:dict = itemIndexes[group_id]
        metadata:dict = item["metadata"]
        ixd:int = item["index"]
         
        if metadata == dict():
            # si no pude leer los filtros
            # asumo que es contenido invalido
            ratings_user[idx] = -1
        
        else:
            # filtrar
            if metadata["filtros"] not in include:
                ratings_user[idx] = -1

            # potenciar
            if metadata["filtros"] in enhance:
                ratings_user[idx] += bias
    
        pass
    """

    # sort ratings
    ratings_user_sorted:list = ratings_user.copy()
    ratings_user_sorted = ratings_user    
    ratings_user_sorted.sort()
    ratings_user_sorted = ratings_user_sorted[::-1]
    
    top_k_recommendations:list = ratings_user_sorted[:k]
    
    # filter items with rating 0
    filtered = list(filter(lambda x: x!=0, top_k_recommendations))
    
    clean_top_k:list = []
    
    for rating in filtered:        
        idx = ratings_user.index(rating)
        item_id = list(itemIndexes.keys())[idx]
        clean_top_k.append((item_id,rating))
        ratings_user[idx] = -1
            
    if len(clean_top_k) < k:
        ## add popular items to top_k
        l:int = k - len(clean_top_k) # number of items to add
        
        # get popular L items, excluding items in clean_top_k
        _exclude:list = exclude.extend([k for k, score in clean_top_k]) 
        top_l_movies = getTopLMovies(preds, l, exclude=_exclude)
        clean_top_k.extend(top_l_movies)
        
    return clean_top_k

In [94]:
# recommend_k_items_to_user(preds,770988,10)

## [3.c] Recomendar items dado otro item

Utiliza la matriz de co-ocurrencia $C$ o de similaridad $S$ para elegir los índices con mayor similitud de la fila/columna del item dado.

In [27]:
def recommend_similar_k_items(group_id:int,
                              k:int,
                              include=[],
                              exclude=[],
                              enhance=[],
                             )->np.ndarray:
    
    global itemIndexes
    global invertedItemIndexes
    global S
    global preds      
    
    itemIndex = itemIndexes[group_id]["index"]
    
    # item2item = [(similarity, index_in_S), ..., ]
    # lo uso asi porque despues elimino elementos y necesito recuperar el index original
    item2item:list = [
        (similarity, i) for i, similarity in enumerate(list(S[itemIndex].toarray()[0]))
    ]    
        
    # eliminamos al elemento del que se buscan los similares
    item2item.pop(itemIndex)
    
    ## filtro y enhance
    """
    bias = 10
    for s, i in item2item:
        item = invertedItemIndexes[i]
        metadata = item["metadata"]
        
        # filtrar
        if metadata["filtros"] not in include:
            item2item[i] = -1
        
        # potenciar
        if metadata["filtros"] in enhance:
            item2item[i] += bias
        
        pass
    """
    
    # agarro items similares
    recommendations:list = []
    MAX:int = -1    
    while (MAX != 0) and (len(recommendations) < k):
        # agarro la tupla más vista
        tuple_most_similar = getBestTuple(item2item)
        MAX, indexC = tuple_most_similar      
        
        # la agrego a las recomendaciones
        # si no tiene scoring 0 y no esta en las que hay que excluir
        g_id:int = invertedItemIndexes[tuple_most_similar[1]]
        
        if MAX != 0 and (g_id not in exclude): 
            recommendations.append(tuple_most_similar)
            item2item.remove(tuple_most_similar) 
            
        if g_id in exclude:
            item2item.remove(tuple_most_similar)
    
    # los dejo en el formato valido 
    clean_top_k:list = []
    for n_views, index in recommendations:
        rec = (invertedItemIndexes[index], n_views)
        clean_top_k.append(rec)
    
    # si le faltan elementos, agrego los restantes con los mas populares
    if len(clean_top_k) < k:
        
        print("Agregando populares")

        # add popular items to top_k
        l:int = k - len(clean_top_k) # number of items to add
 
        # get popular L items, excluding items in clean_top_k
        _exclude:list = exclude.extend([k for k, score in clean_top_k])
        top_l_movies = getTopLMovies(preds, l, exclude=_exclude)

        clean_top_k.extend(top_l_movies)
    
    return clean_top_k

In [188]:
recommend_similar_k_items(group_id=526833, k=10)

[(834946, 27.0),
 (690537, 25.0),
 (835071, 24.0),
 (690812, 23.0),
 (907268, 23.0),
 (591740, 21.0),
 (790961, 20.0),
 (728118, 17.0),
 (835152, 17.0),
 (790938, 16.0)]

### Auxiliares para entender los datos

In [189]:
def item_mas_popular(C:np.ndarray) -> int:
    
    global itemIndexes
    
    M:int = C.shape[1] # numero de items en C
    views_per_item = [C[i].sum() for i in range (M)]
    n_views_most_watched = max(views_per_item)
    most_watched_index = views_per_item.index(n_views_most_watched)
    
    itemKey = list(itemIndexes.keys())[most_watched_index]
    
    return itemKey

## 4. Testing:

### Interpretar recomendaciones

In [29]:
def elegirItemAlAzar() -> int:
    global itemIndexes
    N = len(itemIndexes)
    return list(itemIndexes.keys())[np.random.randint(0,N)]

In [30]:
def describirItem(group_id:int) -> tuple:

    # Dado un group_id,
    # Describe que sus caracteristicas principales
    
    url = leerMetadata(group_id)
    
    resp = requests.get(url).json()
    proveedor = resp["_source"]["NOMBRE_PROVEEDOR"]
    nombre = resp["_source"]["TITULO_ESP"]
    genero = resp["_source"]["INFO_GENERO"][0]["GENERO_ESP"]
    descripcion = resp["_source"]["DESCRIPCION_ESP"]
 
    return group_id, nombre, genero, proveedor, descripcion

In [31]:
def interpretar_recomendaciones(group_id:int, recommendations:list) -> pd.DataFrame:
    
    print(f"item modelo:")
    print(describirItem(group_id))
    
    recs:list = []
    
    for g_id, score in recommendations:    
        values = describirItem(g_id)
        recs.append(values)
        
    descr = pd.DataFrame(recs, columns=["id", "nombre", "genero", "proveedor", "descripcion"])
    
    return descr

In [49]:
g_id = elegirItemAlAzar()
recs = recommend_similar_k_items(group_id=g_id, k=10, exclude=[])
res = interpretar_recomendaciones(g_id, recs)
res.head(10)

item modelo:
(905117, 'Ugly Americans', 'comedia', 'PARAMOUNT', 'Mark ayuda a diferentes criaturas a que se adapten a la vida de Nueva York.')


Unnamed: 0,id,nombre,genero,proveedor,descripcion
0,909791,La casa de los dibujos,comedia,PARAMOUNT,Los ocho personajes llegan y se conocen. La pr...
1,724938,Angry Birds: La película,Animación,AMCO,¿Por qué están tan enojados estos pajaritos? C...
2,919826,Bob Esponja,Infantil,PARAMOUNT,Bob Esponja construye un puesto para hacer bur...
3,617635,Son como niños,Comedia,AMCO,"Aunque hayan crecido, estos cinco amigos siemp..."
4,659627,Naruto,Series,AMCO,"Conoce a Naruto Uzumaki, un travieso niño que ..."
5,921692,Los Pingüinos de Madagascar,Infantil,PARAMOUNT,El doctor del zoológico está vacunando a los p...
6,888314,Victorious,comedia,PARAMOUNT,Tori descubre que sus amigos han estado yendo ...
7,573860,Niñas mal,Comedia,AMCO,Comedia mexicana donde una adolescente rebelde...
8,888262,Victorious,comedia,PARAMOUNT,Tori acepta ayudar a Jade a buscar un teatro. ...
9,883694,Grease,Drama,PARAMOUNT,John Travolta consolidó su prestigio como el a...


In [34]:
def cantidad_interacciones_usadas(items_viewers:dict) -> int:
    contador:int = 0
    for k,v in items_viewers.items():
        contador += len(v)
    return contador

cantidad_interacciones_usadas(items_viewers)

668312

In [205]:
a = 921352
b = 660870

viewers_A = items_viewers[a]
viewers_B = items_viewers[b]

viewers_AB = viewers_A.intersection(viewers_B)

print("# usuarios que vieron A:", len(viewers_A))
print("# usuarios que vieron B: ", len(viewers_B))
print("# usuarios que vieron A y B: ", len(viewers_AB))

index_A = itemIndexes[a]["index"] # indice en C
index_B = itemIndexes[b]["index"] # indice en C

C[index_A, index_B]

# usuarios que vieron A: 252
# usuarios que vieron B:  128
# usuarios que vieron A y B:  0


25.0

### ¿Recomienda algun contenido no visto?

In [185]:
users:set = set()
for key in items_viewers.keys():
    users = users.union(items_viewers[key])
    
stop = False
for item_id in items_viewers.keys():
    users_viewed_item:set = set(items_viewers[item_id])
    for us in users-users_viewed_item:
        if rating(us, item_id) != 0:
            print("Recomendo un item no visto! :)", us, item_id)
            stop = True
            break
    if stop:
        break

Recomendo un item no visto! :) 82313217 770988


### Testing por ECM:

La idea es que recorro todas las interacciones (usuario, contenido) del DataFrame que queda como test. 

Se considera que al usuario ($j$) se le recomienda el item ($i$) en el caso de test cuando el estimador del rating de ese contenido para el usuario ($\hat{Y_{(ij)}}$) es mayor a la media de los ratings de las demas peliculas para el usuario $j$: $\mu_{\hat{Y_{j}}} = \sum_{k=1}^{M} k \hat{Y_{(kj)}}$. 

Luego el $ECM$ queda: 

$ECM(\hat{Y}, Y) = \sum_{i,j \in TEST | \hat{Y_{(ij)}} < \mu_{\hat{Y_{j}}}} (1-\hat{Y_{(ij)}})$

In [None]:
def ECM(preds:np.ndarray, test_y:DataFrame) -> float:
    
    # args:
    #  preds: matriz N x M con las predicciones de rating  
    #  test_y: dataframe con las interacciones no incluidas en el training   
    
    global itemIndexes
    global userIndexes
    
    invertedItemIndexes:dict = {v: k for k, v in itemIndexes.items()}
    error:float = 0
    
    for interaction in test_y.collect():
        user_id, group_id, event_time = decode_interaction(interaction)
        
        item_index = itemIndexes[group_id]
        user_index = userIndexes[user_id]
        
        MEAN_RATINGS_USER = preds[user_index:].mean()
        MAX_RATING_USER = preds[user_index:].max()
        rating = preds[user_index, item_index]
        
        if rating < MEAN_RATINGS_USER:
            scala_pred = rating / MAX_RATING_USER
            error += (1 - scala_pred)**2
        else:
            ## se presupone que podría ser recomendado
            # print(rating, MEAN_MOVIE)
            pass
        
    return error 

In [None]:
ECM(preds, test)