## Notebook de modelacion

Este libro de jupyter se utiliza para crear la matriz de coocurrencia, crear el modelo y generar las recomendaciones para cada usuario

In [None]:
# Lista los paquetes instalados en pyspark
sc.list_packages()

In [None]:
# Se instalan librerias necesarias
sc.install_pypi_package("pandas")
sc.install_pypi_package("fsspec")
sc.install_pypi_package("s3fs ")
sc.install_pypi_package("boto3 ")

In [None]:
# Importamos las librerias 
import pandas as pd
import boto3
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, StructType,StructField, StringType, DoubleType
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

In [None]:
# Se definie Path en s3 donde se encuentra la informacion
triplets_file = 's3://mineria-msd/small_data/x_train.csv'

In [None]:
# Capturamos los datos del bucket de s3
listened_songs_df = spark.read.format("csv").option("header", "true").load(triplets_file)

In [None]:
# Se renombran algunos campos necesarios
listened_songs_df = listened_songs_df.withColumnRenamed('play_count','listen_count')
listened_songs_df = listened_songs_df.withColumnRenamed('user','user_id')
listened_songs_df = listened_songs_df.withColumnRenamed('song','song_id')

In [None]:
# Top 50 de canciones mas escuchadas 
# Estas se usaran para los usuarios nuevos

Top50 =listened_songs_df.groupby("song_id").count().sort(desc("count")).limit(50)

In [None]:
Top50.show() # presentar en pantalla

In [None]:
## Realizamos un filtro de canciones que tengas reproducciones mayores a 1

listened_songs_df = listened_songs_df.filter(listened_songs_df.listen_count > 1)

In [None]:
# Se realiza un group by por cancion y se cuenta cuantas veces esta la cancion
# en el dataframe
listened_songs_df.groupby("song_id").count().sort(desc("count")).count()

In [None]:
# Se crea un nuevo dataset que utiliza las 5000 canciones mas escuchadas
Top = listened_songs_df.groupby("song_id").count().sort(desc("count")).limit(5000)

In [None]:
# Realizamos un group by por el id del song sumando las reproducciones

from pyspark.sql.types import IntegerType

listened_songs_df = listened_songs_df.withColumn("listen_count", listened_songs_df["listen_count"].cast(IntegerType()))
listened_songs_df = listened_songs_df.join(Top,"song_id").drop('count')
total_songs = listened_songs_df.groupby("song_id").sum("listen_count")

In [None]:
# Renombramos columna
total_songs = total_songs.withColumnRenamed('sum(listen_count)','count')
#total_songs.show(5)

In [None]:
# Inner join entre listen song y total songs

innerJoinDf = listened_songs_df.join(total_songs,"song_id")
#innerJoinDf.show(5)

In [None]:
## Realizamos una seleccion de la cancion y el usuario del dataframe anterior

xUsuario = innerJoinDf.select('song_id','user_id')
#xUsuario.show(5)

In [None]:
## Creo una lista de todos los usuarios por una cancion

users_songs_df = innerJoinDf.groupby("song_id").agg(collect_set("user_id").alias('Set_users'))
users_songs_df.show(5)

In [None]:
# Agregamos una columna de 1 al dataset de usuarios por cancion
users_songs_df = users_songs_df.withColumn('new_column', lit(1))

In [None]:
# Renombramos columnas 
users_songs_df_2 = users_songs_df.withColumnRenamed('song_id','song_id_2')
users_songs_df_2 = users_songs_df_2.withColumnRenamed('Set_users','Set_users_2')

In [None]:
# Configuracion para habilitar crossJoin en spark
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [None]:
# Datos que contiene cancion por cancion
data = users_songs_df.join(users_songs_df_2,on='new_column')

In [None]:
# Ahora definimos la funcion de similaridad de Jaccard
def jaccard_similarity(list1, list2): 
  set1 = set(list1) 
  set2 = set(list2) 
  intersection = len(set.intersection(set1, set2)) 
  union = len(set.union(set1, set2)) 
  return intersection/union

In [None]:
# Se define la función de Jaccard y se calcula la matriz de coocurrencia
jaccard = udf(lambda x: jaccard_similarity(x[0],x[1]), FloatType())
coocurrence=data.withColumn('jaccard_similarity', jaccard(array('Set_users', 'Set_users_2')))
coocurrence2 = coocurrence.drop('new_column','Set_users','Set_users_2')

In [None]:
listened_songs_pd = listened_songs_df.select("*").toPandas()
usuarios = pd.DataFrame(set(listened_songs_pd['user_id']))

In [None]:
# Claves de acceso AWS 
myaws_access_key_id='ASIATJCFBBZFZFAXQO5E'
myaws_secret_access_key='xzlNEW1TqJrMTy8U2o9pEak6uY4SqBmpEqJSJt3X'
myaws_session_token= 'FwoGZXIvYXdzEDQaDCK+ilwRBfxmRjIufyLDAaH5H8UH0daSj030009VtQkDPQlKn46G2flJhOJMz6chPCKpsi07HIsDtQd3obs8GD+PXDC078uMfTq9PmKKWNTd74kqwvOvq2HF1KwWdStAoxqI6nA41YUIXlmQXtotLnOvC2rsjGFaUhwxWdWmR3+fuStfLC2jcr68KMzZ/kS6WcSRBPVWi3rc3ggJ9TwFelVZGFkBV62z0HNleQO47XBM5709kkzlrtshDNyiH/4aOoaYjGFDrcD/v8lBBrk0SjmoFSjL1pX7BTItAg5TVcjHyA7Mz4YJQRkoh1YRJQYuqnMguk+KPsShKCJPEA8UxevVYaqzhyG6'
# Nombre del bucket
mybucket_name = 'resultados-mineria'

In [None]:
# Se coencta un cliente a S3.
s3client = boto3.client('s3',
    aws_access_key_id=myaws_access_key_id,
    aws_secret_access_key=myaws_secret_access_key,
    aws_session_token=myaws_session_token
)

s3resource = boto3.resource('s3',
    aws_access_key_id=myaws_access_key_id,
    aws_secret_access_key=myaws_secret_access_key,
    aws_session_token=myaws_session_token
)

In [None]:
# Se crean las funciones para acceder a S3.
def test_s3object(prefix):
    mybucket = s3resource.Bucket(mybucket_name) # just Bucket name
    obj = list(mybucket.objects.filter(Prefix=prefix))
    if len(obj) > 0:
        return True
    else:
        return False
    
def create_s3folder(folder_name):
    s3client.put_object(Bucket=mybucket_name, Key=(folder_name+'/'))
    
def list_s3dir(prefix):
    continuation_token = None
    Temp=[]
    while True:
        kwargs = {'Bucket': mybucket_name, 'Prefix': prefix}
        list_kwargs = dict(MaxKeys=1000, **kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
        response = s3client.list_objects_v2(**list_kwargs)
        temp1 = []
        temp1 = list(response.get('Contents', []))
        for i in temp1:
            Temp.append(i['Key'])
        #yield from response.get('Contents', [])
        if not response.get('IsTruncated'):  # At the end of the list?
            break
        continuation_token = response.get('NextContinuationToken')
    return Temp


def write_s3file(fname,fcontent):
    file = s3resource.Object(mybucket_name,fname)
    file.put(Body=fcontent)
    
def upload_s3file(file_name, object_name=None):
    """Upload a file to an S3 bucket

    :param bucket: Bucket to upload to
    :param file_name: File to upload
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = file_name

    # Upload the file
    try:
        response = s3client.upload_file(file_name, mybucket_name, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

def read_s3file(fname):
    data = s3client.get_object(Bucket = mybucket_name, Key = fname)
    return data['Body']


In [None]:
# Se crea carpeta para almacenar resultados
CONTENT_PATH = 'Resultados/'
if not test_s3object(CONTENT_PATH):
    create_s3folder(CONTENT_PATH)

In [None]:
 #Se define un esquema para la creación de un daaframe que almacene los resultados
schema = StructType([
  StructField('song_id', StringType(), True),
  StructField('jaccard_similarity', DoubleType(), True),
  StructField('new_column', StringType(), True),
  StructField('id_user', StringType(), True)])

In [None]:
# Se crea las recomendaciones para cada uno de los usuarios
ini=2250
fin=3000
for x in range(ini,fin,50):
    # Se intancia el dataFrame vacio
    Resultados = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
    for user in usuarios[x:x+50][0]:
        # Se escoge un usuario y se filtran sus canciones
        CancionesUsuario = listened_songs_df.filter(listened_songs_df['user_id'] == user).select(listened_songs_df['song_id'])
        # Se filtran las canciones de ese usuario de la matrix de coocurrencia en song_id (en la columna song_id_2 siguen estando todas las canciones)
        CancionesDeCanciones = coocurrence2.join(CancionesUsuario,"song_id")
        # Se calcula la suma de jaccard por columna
        Rank=CancionesDeCanciones.groupBy("song_id_2").sum("jaccard_similarity").withColumnRenamed('sum(jaccard_similarity)','jaccard_similarity')
        A  = Rank.orderBy(desc("jaccard_similarity")).limit(100)
        A = A.withColumnRenamed('song_id_2','song_id')
        # Se eliminan las canciones que ya el usuario ha escuchado
        CancionesRecomendadas = A.join(CancionesUsuario.withColumn('new_column',lit(1)), "song_id",'left')
        # Se escogen las 30 canciones con mejor suma de jaccard
        CancionesRecomendadas = CancionesRecomendadas.filter("new_column is null").limit(30)
        CancionesRecomendadas = CancionesRecomendadas.withColumn('id_user',lit(user))
        Resultados = Resultados.union(CancionesRecomendadas)
    ResultadosCollect = Resultados.groupby("id_user").agg(collect_set("song_id").alias('Recomendados'))
    # Se tranforma el DataFrame de Spark a uno en Python guardar los resultados en S3
    Results2 = ResultadosCollect.select("*").toPandas()
    for index,data in Results2.iterrows():
        # Se guardan los resultados en s3 donde el usuario es el nombre del archivo txt y las recomendaciones estan guardadas en una lista dentro de cada uno de los archivos.
        cname = CONTENT_PATH + data['id_user'] + '.txt'
        write_s3file(cname,str(data['Recomendados']))