#Implementación de al menos un algoritmo de recomendación avanzado

#Instalando Dependencias

Instalando Spark

In [1]:
import requests
import subprocess
import os
import re
import socket
import shutil
import time
import sys

def run(cmd):
    # run a shell command
    try:
        # Run the command and capture stdout and stderr
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        # Access stdout (stderr redirected to stdout)
        stdout_result = subprocess_output.stdout.strip().splitlines()[-1]
        # Process the results as needed
        print(f'✅ {stdout_result}')
        return stdout_result
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print(f"Command failed with return code {e.returncode}")
        print("stdout:", e.stdout)

def is_java_installed():
    return shutil.which("java")

def install_java():
    # Uncomment and modify the desired version
    # java_version= 'openjdk-11-jre-headless'
    # java_version= 'default-jre'
    # java_version= 'openjdk-17-jre-headless'
    # java_version= 'openjdk-18-jre-headless'
    java_version= 'openjdk-19-jre-headless'
    os.environ['JAVA_HOME'] = ' /usr/lib/jvm/java-19-openjdk-amd64'
    print(f"Java not found. Installing {java_version} ... (this might take a while)")
    try:
        cmd = f"apt install -y {java_version}"
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        stdout_result = subprocess_output.stdout
        # Process the results as needed
        print(f'✅ Done installing Java {java_version}')
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print(f"Command failed with return code {e.returncode}")
        print("stdout:", e.stdout)

print("\n0️⃣   Install Java if not available")
if is_java_installed():
    print("✅ Java is already installed.")
else:
    install_java()

print("\n1️⃣   Download and install Hadoop and Spark")
# URL for downloading Hadoop and Spark
SPARK_VERSION = "3.5.1"
HADOOP_SPARK_URL = "https://dlcdn.apache.org/spark/spark-" + SPARK_VERSION + \
                   "/spark-" + SPARK_VERSION + "-bin-hadoop3.tgz"
r = requests.head(HADOOP_SPARK_URL)
if r.status_code >= 200 and r.status_code < 400:
    print(f'✅ {HADOOP_SPARK_URL} was found')
else:
    SPARK_CDN = "https://dlcdn.apache.org/spark/"
    print(f'⚠️ {HADOOP_SPARK_URL} was NOT found. \nCheck for available Spark versions in {SPARK_CDN}')

# set some environment variables
os.environ['SPARK_HOME'] = os.path.join(os.getcwd(), os.path.splitext(os.path.basename(HADOOP_SPARK_URL))[0])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'bin'), os.environ['PATH']])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'sbin'), os.environ['PATH']])

# download Spark
# using --no-clobber option will prevent wget from downloading file if already present
# shell command: wget --no-clobber $HADOOP_SPARK_URL
cmd = f"wget --no-clobber {HADOOP_SPARK_URL}"
run(cmd)

# uncompress
try:
    # Run the command and capture stdout and stderr
    cmd = "([ -d $(basename {0}|sed 's/\.[^.]*$//') ] && echo -n 'Folder already exists') || (tar xzf $(basename {0}) && echo 'Uncompressed Spark distribution')"
    subprocess_output = subprocess.run(cmd.format(HADOOP_SPARK_URL), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    # Access stdout (stderr redirected to stdout)
    stdout_result = subprocess_output.stdout
    # Process the results as needed
    print(f'✅ {stdout_result}')

except subprocess.CalledProcessError as e:
    # Handle the error if the command returns a non-zero exit code
    print(f"Command failed with return code {e.returncode}")
    print("stdout:", e.stdout)


print("\n2️⃣   Start Spark engine")
# start master
# shell command: $SPARK_HOME/sbin/start-master.sh
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-master.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-master.sh')
out = run(cmd)

# start one worker (first stop it in case it's already running)
# shell command: $SPARK_HOME/sbin/start-worker.sh spark://${HOSTNAME}:7077
cmd = [os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-worker.sh')]
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-worker.sh') + ' ' + 'spark://'+socket.gethostname()+':7077'
run(cmd)

print("\n3️⃣   Start Master Web UI")
# get master UI's port number
# the subprocess that's starting the master with start-master.sh
# might still not be ready with assigning the port number at this point
# therefore we check the logfile a few times (attempts=5) to see if the port
# has been assigned. This might take 1-2 seconds.

master_log = out.partition("logging to")[2].strip()
print("Search for port number in log file {}".format(master_log))
attempts = 10
search_pattern = "Successfully started service 'MasterUI' on port (\d+)"
found = False
for i in range(attempts):
  if not found:
   with open(master_log) as log:
      found = re.search(search_pattern, log.read())
      if found:
          webUIport = found.group(1)
          print(f"✅ Master UI is available at localhost:{webUIport} (attempt nr. {i})")
          break
      else:
          time.sleep(2) # need to try until port information is found in the logfile
          i+=1
if not found:
  print("Could not find port for Master Web UI\n")

IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    # serve the Web UI on Colab
    print("Click on the link below to open the Spark Web UI 🚀")
    from google.colab import output
    output.serve_kernel_port_as_window(webUIport)

print("\n4️⃣   Start history server")
# start history server
# shell command: mkdir -p /tmp/spark-events
# shell command: $SPARK_HOME/sbin/start-history-server.sh
spark_events_dir = os.path.join('/tmp', 'spark-events')
if not os.path.exists(spark_events_dir):
    os.mkdir(spark_events_dir)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-history-server.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-history-server.sh')
run(cmd)

if IN_COLAB:
    # serve the History Server
    print("Click on the link below to open the Spark History Server Web UI 🚀")
    output.serve_kernel_port_as_window(18080)


0️⃣   Install Java if not available
✅ Java is already installed.

1️⃣   Download and install Hadoop and Spark
✅ https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz was found
✅ File ‘spark-3.5.1-bin-hadoop3.tgz’ already there; not retrieving.
✅ Folder already exists

2️⃣   Start Spark engine
✅ stopping org.apache.spark.deploy.master.Master
✅ starting org.apache.spark.deploy.master.Master, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-e851e84495b5.out
✅ stopping org.apache.spark.deploy.worker.Worker
✅ starting org.apache.spark.deploy.worker.Worker, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.worker.Worker-1-e851e84495b5.out

3️⃣   Start Master Web UI
Search for port number in log file /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-e851e84495b5.out
✅ Master UI is available at localhost:8081 (attempt nr. 4)
Click on the link below to open the Spark

<IPython.core.display.Javascript object>


4️⃣   Start history server
✅ stopping org.apache.spark.deploy.history.HistoryServer
✅ starting org.apache.spark.deploy.history.HistoryServer, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-e851e84495b5.out
Click on the link below to open the Spark History Server Web UI 🚀


<IPython.core.display.Javascript object>

In [2]:
pip install opendatasets

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [3]:
pip install pyspark

Collecting pyspark
  Using cached pyspark-3.5.1.tar.gz (317.0 MB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=86b6b7b7dd50e003df41dcd8623eeb7f5188efbccb89b68a40577a37a6d5f6f3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
pip install tensorflow



In [5]:
pip install  petastorm

Collecting petastorm
  Downloading petastorm-0.12.1-py2.py3-none-any.whl (284 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m284.0/284.0 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill>=0.2.1 (from petastorm)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting diskcache>=3.0.0 (from petastorm)
  Downloading diskcache-5.6.3-py3-none-any.whl (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: diskcache, dill, petastorm
Successfully installed dill-0.3.8 diskcache-5.6.3 petastorm-0.12.1


In [6]:
#import opendatasets as od
import pandas as pd
import pyspark
import opendatasets as od
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from google.colab import drive
import os

In [7]:
drive.mount('/content/drive')
DIR = "/content/drive/MyDrive/BigData/Spotify_rec_sys"
os.chdir(DIR)

Mounted at /content/drive


In [None]:
od.download(
	"https://www.kaggle.com/datasets/andrewmvd/spotify-playlists")

In [8]:
spark = SparkSession.builder \
    .appName("Rec_Sys") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("petastorm.spark.converter.parentCacheDirUrl", "file:///tmp/petastorm_cache") \
    .getOrCreate()

In [9]:
spark

In [10]:
#Funciones para guardar y cargar información preprocesada
import pickle

def save_dict_keys_to_pickle(dictionary, pickle_filename):
    keys = list(dictionary.keys())
    with open(pickle_filename, 'wb') as f:
        pickle.dump(keys, f)

def load_dict_keys_from_pickle(pickle_filename):
    with open(pickle_filename, 'rb') as f:
        keys = pickle.load(f)
    return keys

def save_dataframes_to_parquet(dictionary, folder_path):
    for key, dataframe in dictionary.items():
        dataframe.write.mode("overwrite").parquet(f"{folder_path}/{key}.parquet")

def save_df_to_parquet(dataframe, name, folder_path):
        dataframe.write.mode("overwrite").parquet(f"{folder_path}/{name}.parquet")

def load_dataframes_from_parquet(keys, folder_path):
    dataframes = {}
    for key in keys:
        dataframe = spark.read.parquet(f"{folder_path}/{key}.parquet")
        dataframes[key] = dataframe
    return dataframes


working_path='parquets_20240526'

#Preprocesamiento

##Limpieza de datos

In [None]:
df = spark.read.option("header", "true").csv("spotify-playlists//spotify_dataset.csv")

df = df.dropna()

df = df.drop_duplicates()

df = df.toDF(*[col.replace(' ', '').replace('"', '') for col in df.columns])


df.head()

Row(user_id='9cc0cfd4d7d7885102480dd99e7a90d6', artistname='Elvis Costello', trackname='(The Angels Wanna Wear My) Red Shoes', playlistname='HARD ROCK 2010')

###Convirtiendo dataset a parquet

In [None]:
save_df_to_parquet(df,'input_df', working_path)

In [None]:
in_df = spark.read.parquet(f"{working_path}/input_df.parquet")
in_df.head()

Row(user_id='07f0fc3be95dcd878966b1f9572ff670', artistname='Miles Davis', trackname='Duke Booty', playlistname='Chill out')

##Creando dimensiones separadas para playlists, artistas, canciones y usuarios, reconstruyendo la matriz de reproducciones con los indices de dichas dimensiones.

In [None]:
dfs={}

def df_dim(df, input_col):
    windowSpec = Window.orderBy(input_col)
    dfs[input_col]=df.select(input_col).distinct().withColumn(f"{input_col}_index", fn.row_number().over(windowSpec))

for col_name in in_df.columns:
    df_dim(df,col_name)

newdf=df

for i in range(0, len(df.columns)):
    col_name = df.columns[i]
    newdf=newdf.join(dfs[col_name].withColumnRenamed(col_name, col_name+'_base'), fn.col(col_name)==fn.col(col_name+'_base')).drop(col_name).drop(col_name+'_base')


dfs['data'] = newdf

In [None]:
#Saving and reloading to work with parquet files
save_dict_keys_to_pickle(dfs, 'keys.pkl')
save_dataframes_to_parquet(dfs,working_path)
loaded_keys = load_dict_keys_from_pickle('keys.pkl')
dfs = load_dataframes_from_parquet(loaded_keys, working_path)

In [None]:
total_reproductions =  dfs['data'].count()

total_reproductions

12868518

###Se mantienen artistas con mas de 10000 reproducciones

In [None]:
artist_reproductions = dfs['data'].groupBy("artistname_index").agg(fn.count("*").alias("total_reproductions"))

In [None]:
artistas_mas_de_10k_reproducciones = artist_reproductions.filter(fn.col("total_reproductions") > 10000)

artistas_indices = artistas_mas_de_10k_reproducciones.select("artistname_index")

rep_top_artists = dfs['data'].join(artistas_indices, "artistname_index", "inner")

In [None]:
rep_top_artists.show()

+----------------+-------------+---------------+------------------+
|artistname_index|user_id_index|trackname_index|playlistname_index|
+----------------+-------------+---------------+------------------+
|          258175|         6734|          45362|             53455|
|          258175|         6734|          57234|             53455|
|          258175|        15667|         196000|             88292|
|          258175|          355|         196000|             76180|
|          258175|        14817|         196000|             38211|
|          258175|         6734|         196000|             53455|
|          258175|         8561|         196000|            140025|
|          258175|        14137|         196000|             13980|
|          258175|         6498|         196003|            158500|
|          258175|         5574|         196003|            117620|
|          258175|        15611|         196003|            119615|
|          258175|         5371|         196003|

In [None]:
rep_top_artists.count()

1881913

In [None]:
top_10_artists = dfs['artistname'].join(artist_reproductions.orderBy(fn.col("total_reproductions").desc()).limit(10),'artistname_index')
print("Top 10 Artists:")
top_10_artists.show()

Top 10 Artists:
+----------------+------------------+-------------------+
|artistname_index|        artistname|total_reproductions|
+----------------+------------------+-------------------+
|           51082|          Coldplay|              35485|
|           59560|         Daft Punk|              36086|
|           63174|       David Bowie|              27802|
|           80896|            Eminem|              28896|
|          116342|             JAY Z|              28928|
|          133453|        Kanye West|              29111|
|          169467|   Michael Jackson|              26336|
|          203157|             Queen|              28079|
|          204527|         Radiohead|              31429|
|          257149|The Rolling Stones|              30832|
+----------------+------------------+-------------------+



In [None]:
dfs['data_top_artists'] = rep_top_artists

In [None]:
dfs['data_top_artists'].select("artistname_index").distinct().count()

117

In [None]:
save_dict_keys_to_pickle(dfs, 'keys.pkl')
save_df_to_parquet(dfs['data_top_artists'],'data_top_artists', working_path)

##Matriz de reporducciones según el artista y normalización de los datos.

In [None]:
counts_df = dfs['data_top_artists'].groupBy("user_id_index", "artistname_index").agg(fn.count("*").alias("reproductions"))

In [None]:
max_reproduction = counts_df.agg({"reproductions": "max"}).collect()[0][0]
min_reproduction = counts_df.agg({"reproductions": "min"}).collect()[0][0]

normalized_pl_counts_df = counts_df.withColumn("normalized_reproduction", (fn.col("reproductions") - min_reproduction) / (max_reproduction - min_reproduction))

dfs['norm_data_top_artists'] = normalized_pl_counts_df

normalized_pl_counts_df.head(10)

[Row(user_id_index=6734, artistname_index=258175, reproductions=128, normalized_reproduction=0.03796711509715994),
 Row(user_id_index=15667, artistname_index=258175, reproductions=70, normalized_reproduction=0.02062780269058296),
 Row(user_id_index=355, artistname_index=258175, reproductions=58, normalized_reproduction=0.017040358744394617),
 Row(user_id_index=14817, artistname_index=258175, reproductions=99, normalized_reproduction=0.02929745889387145),
 Row(user_id_index=8561, artistname_index=258175, reproductions=16, normalized_reproduction=0.004484304932735426),
 Row(user_id_index=14137, artistname_index=258175, reproductions=48, normalized_reproduction=0.014050822122571001),
 Row(user_id_index=6498, artistname_index=258175, reproductions=314, normalized_reproduction=0.09357249626307922),
 Row(user_id_index=5574, artistname_index=258175, reproductions=113, normalized_reproduction=0.03348281016442452),
 Row(user_id_index=15611, artistname_index=258175, reproductions=171, normalized

In [None]:
save_dict_keys_to_pickle(dfs, 'keys.pkl')
save_df_to_parquet(dfs['norm_data_top_artists'], 'norm_data_top_artists', working_path)

#Cargando datos preprocesados

In [11]:
loaded_keys = load_dict_keys_from_pickle('keys.pkl')
dfs = load_dataframes_from_parquet(loaded_keys, working_path)

In [12]:
for k, df in dfs.items():
    print(k, df.head(10))

user_id [Row(user_id='00055176fea33f6e027cd3302289378b', user_id_index=1), Row(user_id='0007f3dd09c91198371454c608d47f22', user_id_index=2), Row(user_id='000b0f32b5739f052b9d40fcc5c41079', user_id_index=3), Row(user_id='000c11a16c89aa4b14b328080f5954ee', user_id_index=4), Row(user_id='00123e0f544dee3ab006aa7f1e5725a7', user_id_index=5), Row(user_id='00139e9cb50fb309549e1561b476226d', user_id_index=6), Row(user_id='00152c870313100559aad7b097d9c1f5', user_id_index=7), Row(user_id='00154ec9dd1acd4ebfb521629dcb3948', user_id_index=8), Row(user_id='001599a07cb8ef5f114a9fcf4e0e2757', user_id_index=9), Row(user_id='0019363a0d57e94d39988c31eeb8d015', user_id_index=10)]
artistname [Row(artistname=' Dolce', artistname_index=1), Row(artistname=' OneVoice', artistname_index=2), Row(artistname='!!!', artistname_index=3), Row(artistname='!!! (Chk Chk Chk)', artistname_index=4), Row(artistname='!!! Chk Chik Chick', artistname_index=5), Row(artistname='!ATTENTION!', artistname_index=6), Row(artistname

In [66]:
#Dividiendo el dataset segun el numero de usuarios distintos

unique_usr_ids =  dfs['norm_data_top_artists'].select("user_id_index").distinct().orderBy("user_id_index")


train_usr_ids, test_usr_ids = unique_usr_ids.randomSplit([0.8, 0.2], seed=42)


train_data = dfs['norm_data_top_artists'].join(train_usr_ids, on="user_id_index", how="inner")
test_data = dfs['norm_data_top_artists'].join(test_usr_ids, on="user_id_index", how="inner")

print("Training DataFrame:")
train_data.show()


Training DataFrame:
+-------------+----------------+-------------+-----------------------+
|user_id_index|artistname_index|reproductions|normalized_reproduction|
+-------------+----------------+-------------+-----------------------+
|         6734|          258175|          128|    0.03796711509715994|
|        15667|          258175|           70|    0.02062780269058296|
|        14817|          258175|           99|    0.02929745889387145|
|         8561|          258175|           16|   0.004484304932735426|
|        14137|          258175|           48|   0.014050822122571001|
|         6498|          258175|          314|    0.09357249626307922|
|         5574|          258175|          113|    0.03348281016442452|
|        15611|          258175|          171|    0.05082212257100149|
|         5371|          258175|          106|    0.03139013452914798|
|         9708|          258175|           36|    0.01046337817638266|
|        15063|          258175|          106|    0.03139

In [None]:
train_data.show()

+-------------+----------------+-------------+-----------------------+
|user_id_index|artistname_index|reproductions|normalized_reproduction|
+-------------+----------------+-------------+-----------------------+
|         5803|          285680|            4|   8.968609865470852E-4|
|         5803|          285075|            5|   0.001195814648729447|
|         5803|          278507|          221|    0.06576980568011959|
|         5803|          277472|           16|   0.004484304932735426|
|         5803|          272596|            2|   2.989536621823617...|
|         5803|          269254|            1|                    0.0|
|         5803|          260511|            2|   2.989536621823617...|
|         5803|          258829|           12|   0.003288490284005979|
|         5803|          257149|            5|   0.001195814648729447|
|         5803|          248402|            3|   5.979073243647235E-4|
|         5803|          246063|          106|    0.03139013452914798|
|     

In [67]:
total_rows = train_data.count()

# Calculate unique counts
n_users = train_data.select("user_id_index").distinct().count()
n_artists = train_data.select("artistname_index").distinct().count()

# Ensure the maximum index is within range
max_user_id = train_data.select("user_id_index").rdd.max()[0]
max_artist_id = train_data.select("artistname_index").rdd.max()[0]

max_id_users = max(n_users, max_user_id + 1)
max_id_artists = max(n_artists, max_artist_id + 1)

print(f"Max ID users: {max_id_users}")
print(f"Max ID artists: {max_id_artists}")

Max ID users: 15915
Max ID artists: 285681


In [68]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Flatten, Concatenate, Dense
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm.tf_utils import make_petastorm_dataset
from tensorflow.keras.callbacks import EarlyStopping

In [70]:
# Calculate steps per epoch
batch_size = 2048
steps_per_epoch = total_rows // batch_size
epochs = 10
embedding_size = 100

In [71]:
# Initialize SparkDatasetConverter
converter = make_spark_converter(train_data)


user_input = Input(shape=(1,), name='user_input')
user_embedding = Embedding(max_id_users, embedding_size, input_length=1, name='user_embedding')(user_input)
user_vec = Flatten(name='user_flatten')(user_embedding)

artist_input = Input(shape=(1,), name='artist_input')
artist_embedding = Embedding(max_id_artists, embedding_size, input_length=1, name='artist_embedding')(artist_input)
artist_vec = Flatten(name='artist_flatten')(artist_embedding)

concat = Concatenate()([user_vec, artist_vec])
dense1 = Dense(128, activation='relu')(concat)
dense2 = Dense(64, activation='relu')(dense1)
output = Dense(1)(dense2)

model = Model([user_input, artist_input], output)
model.compile(optimizer='adam', loss='mean_squared_error')


# Training with Petastorm dataset
def transform_row(row):
    return {"user_input": row.user_id_index, "artist_input": row.artistname_index}, row.normalized_reproduction

# Create TensorFlow dataset
with converter.make_tf_dataset(batch_size=batch_size) as dataset:
    dataset = dataset.map(lambda x: transform_row(x))

    # Implement early stopping
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    model.fit(dataset, epochs=epochs, steps_per_epoch=steps_per_epoch, validation_data=dataset, validation_steps=steps_per_epoch, callbacks=[early_stopping])

  self._filesystem = pyarrow.localfs
  dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
  dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
  if not dataset.common_metadata:
  self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
  self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
  meta = parquet_dataset.pieces[0].get_metadata()
  for partition in (parquet_dataset.partitions or []):
  metadata = dataset.metadata
  common_metadata = dataset.common_metadata
  futures_list = [thread_pool.submit(_split_piece, piece, dataset.fs.open) for piece in dataset.pieces]
  futures_list = [thread_pool.submit(_split_piece, piece, dataset.fs.open) for piece in dataset.pieces]
  return [pq.ParquetDatasetPiece(piece.path, open_file_func=fs_open,
  self._dataset = pq.ParquetDataset(
  parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
  

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [72]:
model_save_path = working_path + f"//model_{epochs}_{batch_size}"

In [73]:
model.save(model_save_path)

In [74]:
loaded_model = tf.keras.models.load_model(model_save_path)

Para evaluar este modelo se utillizaran 3 metricas:
- Presision@K
- Mean Average Precision (MAP)
- Normalized Discounted Cumulative Gain (NDCG)

In [157]:
def recommend_top_k_artists(user_id_index, model, data, k=10):
    artist_ids = data["artistname_index"].unique()
    user_ids = [user_id_index] * len(artist_ids)
    user_ids_array = np.array(user_ids)
    artist_ids_array = np.array(artist_ids)
    predictions = model.predict([user_ids_array, artist_ids_array], verbose=0)
    predicted_scores = list(zip(artist_ids, predictions))
    top_k_artists = sorted(predicted_scores, key=lambda x: x[1], reverse=True)[:k]
    top_k_artist_ids = [artist_id for artist_id, score in top_k_artists]
    return top_k_artist_ids

#Precision@K

def precision_at_k(recommended_items, relevant_items, k):
    relevant_set = set(relevant_items)
    recommended_set = set(recommended_items)
    return len(recommended_set & relevant_set) / k

#MAP

def average_precision(actual, predicted):
    score = 0.0
    num_hits = 0.0

    for i, p in enumerate(predicted):
        if p in actual and p not in predicted[:i]:
            num_hits += 1.0
            score += num_hits / (i + 1.0)

    if not actual:
        return 0.0
    return score / min(len(actual), len(predicted))

def mean_average_precision(actual_items, predicted_items):
    return np.mean([average_precision(actual, predicted) for actual, predicted in zip(actual_items, predicted_items)])

# NDCG

def dcg_at_k(r, k):
    r = np.asfarray(r)[:k]
    if r.size:
        return np.sum(np.divide(np.power(2, r) - 1, np.log2(np.arange(2, r.size + 2))))
    return 0.0

def ndcg_at_k(r, k):
    idcg = dcg_at_k(sorted(r, reverse=True), k)
    if not idcg:
        return 0.0
    return dcg_at_k(r, k) / idcg

def normalized_discounted_cumulative_gain(actual, predicted, k):
    r = [1 if p in actual else 0 for p in predicted]
    return ndcg_at_k(r, k)

In [161]:
def evaluate_model(test_data, model, k=10):
    user_ids = test_data['user_id_index'].unique()
    avg_map = 0.0
    avg_ndcg = 0.0
    precision_scores = []

    for user_id in user_ids:
        relevant_items = test_data[test_data['user_id_index'] == user_id]['artistname_index'].tolist()

        #Se ignoran ususarios que tengan pocos artistas (Arranque en frío)
        if len(relevant_items) < 25:
            continue

        recommended_items = recommend_top_k_artists(user_id, model, test_data, k)


        #Calcular Precision@K
        precision = precision_at_k(recommended_items, relevant_items, k)
        precision_scores.append(precision)

        # Calcular MAP para el usuario actual
        map_score = average_precision(relevant_items, recommended_items[:k])
        avg_map += map_score

        # Calcular NDCG para el usuario actual
        ndcg_score = normalized_discounted_cumulative_gain(relevant_items, recommended_items, k)
        avg_ndcg += ndcg_score

    # Calcular promedio de MAP y NDCG para todos los usuarios
    avg_precisionAtK = np.mean(precision_scores)
    avg_map /= len(user_ids)
    avg_ndcg /= len(user_ids)

    return avg_precisionAtK, avg_map, avg_ndcg

# Evaluar el modelo
avg_precisionAtK, avg_map, avg_ndcg = evaluate_model(test_data.toPandas(), model, k=10)
print(f"Precision@10: {avg_precisionAtK:.4f}")
print(f"Mean Average Precision (MAP)@10: {avg_map:.4f}")
print(f"Normalized Discounted Cumulative Gain (NDCG)@10: {avg_ndcg:.4f}")

Precision@10: 0.2463
Mean Average Precision (MAP)@10: 0.0308
Normalized Discounted Cumulative Gain (NDCG)@10: 0.1447


#Dando Recomendaciones

In [150]:
test_data.show()

+-------------+----------------+-------------+-----------------------+
|user_id_index|artistname_index|reproductions|normalized_reproduction|
+-------------+----------------+-------------+-----------------------+
|          355|          258175|           58|   0.017040358744394617|
|         8608|          258175|           42|   0.012257100149476832|
|        12719|          258175|          138|    0.04095665171898356|
|          550|          258175|          183|    0.05440956651718983|
|         4052|          258175|           89|   0.026307922272047833|
|        11906|          258175|           90|   0.026606875934230195|
|        12659|          258175|          126|    0.03736920777279522|
|         6366|          258175|           35|   0.010164424514200299|
|         3222|          258175|           17|   0.004783258594917788|
|          313|          258175|          108|    0.03198804185351271|
|        13393|          258175|           47|   0.013751868460388639|
|     

In [151]:
user_id_index=261
#13235

usr_reps = test_data.filter(fn.col("user_id_index") == user_id_index).distinct()\
    .join(dfs['artistname'],'artistname_index').orderBy('reproductions', ascending=False).select('artistname','reproductions')

n_artist_usr=usr_reps.count()
usr_reps.show(n=n_artist_usr,truncate=False)

+-----------------------+-------------+
|artistname             |reproductions|
+-----------------------+-------------+
|Grateful Dead          |42           |
|The Smashing Pumpkins  |40           |
|Iron Maiden            |31           |
|Muse                   |16           |
|Blur                   |16           |
|Radiohead              |14           |
|Coldplay               |13           |
|U2                     |13           |
|Arctic Monkeys         |12           |
|Foo Fighters           |12           |
|The Smiths             |12           |
|The Rolling Stones     |12           |
|Kings Of Leon          |12           |
|David Bowie            |12           |
|The Killers            |11           |
|The Clash              |10           |
|The Who                |9            |
|Green Day              |9            |
|Nirvana                |8            |
|The Cure               |7            |
|The Strokes            |7            |
|R.E.M.                 |6            |


In [152]:
#Generando 10 recomendaciones
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


rec_items = recommend_top_k_artists(user_id_index, loaded_model, train_data.toPandas(), 10)

rec_items = [(int(item),) for item in rec_items]

rec_artist_names = spark.createDataFrame(rec_items, StructType([StructField("artistname_index", IntegerType(), True)])).join(dfs['artistname'],'artistname_index')

print(f"Recommended artists for user {user_id_index}")
rec_artist_names.show(n=rec_artist_names.count(),truncate=False)

Recommended artists for user 261
+----------------+----------------------+
|artistname_index|artistname            |
+----------------+----------------------+
|171812          |Miles Davis           |
|168699          |Metallica             |
|274942          |Vitamin String Quartet|
|127024          |John Williams         |
|258175          |The Smiths            |
|100812          |Grateful Dead         |
|269792          |U2                    |
|100085          |Gorillaz              |
|194927          |Pearl Jam             |
|136060          |Kendrick Lamar        |
+----------------+----------------------+



In [153]:
#Similutides:
usr_reps.join(rec_artist_names,'artistname').select('artistname').show(n=n_artist_usr)

+-------------+
|   artistname|
+-------------+
|     Gorillaz|
|Grateful Dead|
|    Pearl Jam|
|   The Smiths|
|           U2|
+-------------+



In [155]:
precision = usr_reps.join(rec_artist_names,'artistname').count()/10

print(f"Precisión de las recomendaciones para el usuario {user_id_index}: {precision}")

Precisión de las recomendaciones para el usuario 261: 0.5
