In [1]:
import hdf5_getters as GETTERS
import h5py
import glob

In [2]:
# Localiza un archivo de canción
filename = glob.glob(
    r'dataset\A\A\A\TRAAAAW128F429D538.h5'
)[0]

# Abre el archivo
h5 = GETTERS.open_h5_file_read(filename)

# Obtén algunos datos
artist_name = GETTERS.get_artist_name(h5)
song_title = GETTERS.get_title(h5)
year = GETTERS.get_year(h5)
duration = GETTERS.get_duration(h5)

print(
    f"Artist: {artist_name}, Song: {song_title}, Year: {year}, Duration: {duration:.2f}s")

h5.close()

IndexError: list index out of range

#### 1. Crear la sesión con SPARK

In [20]:
#Initialize spark:
from pyspark.sql import SparkSession
# import findspark
# findspark.init()

import pyspark
# print(pyspark.__version__)
# findspark.init("C:\spark\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3")


# spark = SparkSession.builder \
#     .appName("MSD Recommender") \
#     .master("spark://192.168.1.130:7077") \
#     # .config("spark.driver.host", "192.168.1.130") \
#     .config("spark.driver.memory", "4g") \
#     .config("spark.executor.memory", "12g") \
#     .config("spark.executor.cores", "3") \
#     .getOrCreate()


from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("MillionSongProcessing") \
#     .master("spark://spark-master:7077") \
#     .getOrCreate()
spark = SparkSession \
    .builder \
    .appName('XML ETL') \
    .master("local[*]") \
    .config('job.local.dir', 'file:/models/music-recommender-model') \
    .getOrCreate()

df = spark.range(10).toDF("number")
df.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+



In [19]:
# Stop Spark session
spark.stop()

In [15]:
df_test = spark.range(10)  # 1 millón de filas
print(df_test.count())  # Debe devolver 1,000,000

10


#### Anañizar el dataset

In [21]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

import numpy as np
import hdf5_getters as GETTERS
import h5py
import glob
import pandas as pd

sc = spark.sparkContext

In [22]:
# Cargar ruta del dataset

dataset_path = "/opt/bitnami/spark/datasets/MillionSongSubset"
file_paths = glob.glob(f"{dataset_path}/**/*.h5", recursive=True)
#Usando solo 3 paths
file_paths = file_paths[:20]
print(file_paths)

['/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEEOQ128F9309BC0.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEEGA128F4272FF8.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEEGO12903CF7D27.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEEBP128EF3673A7.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEELW128F933D255.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEELO128F425BD8F.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/E/TRAEEMP128F92F4B9C.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/G/TRAEGRU128F9358874.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/G/TRAEGGV128F92F29B2.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/G/TRAEGMN128F42690CD.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/G/TRAEGUL128E078B0C1.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/G/TRAEGAZ128F426240C.h5', '/opt/bitnami/spark/datasets/MillionSongSubset/A/E/G/TRAEGJE128F42656F1.h5'

In [23]:
# Lista para guardar los features
data = []
metadata_list = []  # OJO cambiamos el nombre de la lista de metadatos


def extract_features(path):
    try:
        h5 = GETTERS.open_h5_file_read(path)
        segments_timbre = GETTERS.get_segments_timbre(h5)
        if segments_timbre is None or len(segments_timbre) == 0:
            timbre_mean = [0.0] * 12
            timbre_std = [0.0] * 12
        else:
            timbre_mean = np.mean(segments_timbre, axis=0).tolist()
            timbre_std = np.std(segments_timbre, axis=0).tolist()

        features = {
            'tempo': float(GETTERS.get_tempo(h5)),
            'loudness': float(GETTERS.get_loudness(h5)),
            'duration': float(GETTERS.get_duration(h5)),
            'key': float(GETTERS.get_key(h5)),
            'mode': float(GETTERS.get_mode(h5)),
            'time_signature': float(GETTERS.get_time_signature(h5)),
            'song_id': GETTERS.get_song_id(h5).decode('utf-8')
        }

        for i in range(12):
            features[f'timbre_mean_{i}'] = timbre_mean[i]
            features[f'timbre_std_{i}'] = timbre_std[i]

        song_id = GETTERS.get_song_id(h5).decode('utf-8')
        title = GETTERS.get_title(h5).decode('utf-8')
        artist = GETTERS.get_artist_name(h5).decode('utf-8')

        meta = {
            'song_id': song_id,
            'title': title,
            'artist': artist
        }

        h5.close()

        return features, meta

    except Exception as e:
        print(f"Error procesando el fichero: {path} --> {e}")
        return None, None


# Recorremos los ficheros uno a uno
for path in file_paths:
    result, meta = extract_features(path)
    if result is not None:
        data.append(result)
    if meta is not None:
        metadata_list.append(meta)

# Convertimos las listas a pandas
df_pd = pd.DataFrame(data)
df_meta = pd.DataFrame(metadata_list)

# Guardamos los metadatos
df_meta.to_csv("songs_metadata.csv", index=False)

# Y lo convertimos a DataFrame de PySpark
df = spark.createDataFrame(df_pd)

# Visualizamos los primeros registros
df.show(5)

                                                                                

+-------+--------+---------+----+----+--------------+------------------+------------------+-----------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+
|  tempo|loudness| duration| key|mode|time_signature|           song_id|     timbre_mean_0|     timbre_std_0|      timbre_mean_1|      timbre_std_1|      timbre_mean_2|      timbre_std_2|      timbre_mean_3|      timbre_std_3|     timbre_mean_4|      timbre_std_4|      timbre_mean_5|      timbre_std_5|      timbre_mean_6|      timbre_std_6|      timbre_mean_7|      timbre_std_7|     timbre_mean_8|      timbre_std_8|     timbre_mean_9|      timbre_std_9|     tim

#### Entrenar el Modelo

In [24]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

feature_cols = ['tempo', 'loudness', 'duration', 'key', 'mode', 'time_signature'] + \
               [f'timbre_mean_{i}' for i in range(12)] + \
               [f'timbre_std_{i}' for i in range(12)]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
df_assembled = assembler.transform(df)


scaler = StandardScaler(inputCol="features_vec", outputCol="scaled_features")
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)


kmeans = KMeans(featuresCol='scaled_features',
                predictionCol='cluster_id', k=20, seed=42)
model = kmeans.fit(df_scaled)


25/06/17 18:21:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/06/17 18:21:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

#### Recomendación 

In [25]:
import numpy as np
import pandas as pd
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeansModel

# Supongamos que tienes ya definido:
# model, scaler_model, assembler, df_scaled

# Simulamos la nueva canción
new_song = {
    'tempo': 120.0,
    'loudness': -5.0,
    'duration': 210.0,
    'key': 2.0,
    'mode': 1.0,
    'time_signature': 4.0,
    'song_id': 'NEW_SONG_ID'
}

# Simulamos timbre features
for i in range(12):
    new_song[f'timbre_mean_{i}'] = np.random.uniform(-50, 50)
    new_song[f'timbre_std_{i}'] = np.random.uniform(10, 50)

# Convertimos a Spark DataFrame
new_song_df_pd = pd.DataFrame([new_song])
new_song_df = spark.createDataFrame(new_song_df_pd)

#  Ensamblamos y escalamos usando el pipeline original
new_song_vec = assembler.transform(new_song_df)
new_song_scaled = scaler_model.transform(new_song_vec)

# Hacemos la predicción de cluster
new_song_pred = model.transform(new_song_scaled)
predicted_cluster = new_song_pred.select(
    "cluster_id").collect()[0]["cluster_id"]

print(f"La nueva canción ha sido asignada al cluster: {predicted_cluster}")

# Buscamos canciones del dataset original en el mismo cluster

# Si no habías guardado los clusters en el df original:
df_with_cluster = model.transform(df_scaled)

# Filtramos por el mismo cluster
similar_songs = df_with_cluster.filter(
    df_with_cluster.cluster_id == predicted_cluster)

# Mostramos algunas recomendaciones (por ejemplo 3)
similar_songs.select("song_id").limit(3).show()

La nueva canción ha sido asignada al cluster: 15
+------------------+
|           song_id|
+------------------+
|SOHODRU12A81C230CE|
+------------------+



In [26]:
metadata_df = pd.read_csv("songs_metadata.csv")

# Supongamos que estos son los song_ids recomendados que has obtenido de tu modelo
recommended_song_ids = [row['song_id']
                        for row in similar_songs.select("song_id").limit(3).collect()]


# Filtramos los metadatos para obtener nombre de la canción y artista
recommended_songs = metadata_df[metadata_df['song_id'].isin(
    recommended_song_ids)]

# Mostramos el resultado
print(recommended_songs[['song_id', 'title', 'artist']])

               song_id    title                artist
15  SOHODRU12A81C230CE  Genuine  Five Fingers of Funk


#### Guardar el modelo

In [27]:
from pyspark.ml import Pipeline

# Creamos el pipeline con los stages que usaste durante el entrenamiento
pipeline = Pipeline(stages=[assembler, scaler, kmeans])

# Lo ajustamos sobre el dataframe ya escalado
pipeline_model = pipeline.fit(df)

In [28]:
pipeline_model.write().overwrite().save("file:///models/music-recommender-model")

                                                                                

#### Cargar el Modelo

In [30]:
from pyspark.ml import PipelineModel

loaded_model = PipelineModel.load("file:///models/music-recommender-model")

In [36]:
new_song = {
    'tempo': 120.0,
    'loudness': -5.0,
    'duration': 210.0,
    'key': 8.0,
    'mode': 1.0,
    'time_signature': 4.0,
    'song_id': 'NEW_SONG_ID'
}

new_song_pred = model.transform(new_song_scaled)
predicted_cluster = new_song_pred.select(
    "cluster_id").collect()[0]["cluster_id"]

print(f"La nueva canción ha sido asignada al cluster: {predicted_cluster}")

# Buscamos canciones del dataset original en el mismo cluster

# Si no habías guardado los clusters en el df original:
df_with_cluster = model.transform(df_scaled)

# Filtramos por el mismo cluster
similar_songs = df_with_cluster.filter(
    df_with_cluster.cluster_id == predicted_cluster)

# Mostramos algunas recomendaciones (por ejemplo 3)
similar_songs.select("song_id").limit(3).show()

La nueva canción ha sido asignada al cluster: 15
+------------------+
|           song_id|
+------------------+
|SOHODRU12A81C230CE|
+------------------+



In [37]:
metadata_df = pd.read_csv("songs_metadata.csv")

# Supongamos que estos son los song_ids recomendados que has obtenido de tu modelo
recommended_song_ids = [row['song_id']
                        for row in similar_songs.select("song_id").limit(3).collect()]


# Filtramos los metadatos para obtener nombre de la canción y artista
recommended_songs = metadata_df[metadata_df['song_id'].isin(
    recommended_song_ids)]

# Mostramos el resultado
print(recommended_songs[['song_id', 'title', 'artist']])

               song_id    title                artist
15  SOHODRU12A81C230CE  Genuine  Five Fingers of Funk
