In [1]:
import os
import pandas as pd
import numpy as np
import librosa
from tqdm import tqdm
from pymongo import MongoClient

# Define file paths
metadata_path = '/home/abubakar/Documents/BDA/Project/fma_metadata/tracks.csv'
audio_dir = '/home/abubakar/Documents/BDA/Project/fma_medium'

# Load tracks metadata
tracks = pd.read_csv(metadata_path, header=[0, 1], index_col=0)

# Select medium subset tracks
medium_tracks = tracks[tracks['set', 'subset'] == 'medium']

# Get audio files
audio_files = []
for root, dirs, files in os.walk(audio_dir):
    for file in files:
        if file.endswith('.mp3'):
            audio_files.append(os.path.join(root, file))

# Create MongoDB client and database
client = MongoClient('localhost', 27017)
db = client['music']
collection = db['features']

# Create an empty DataFrame to store features
features_df = pd.DataFrame(columns=[
                           'file_path', 'track_id', 'duration', 'mfcc', 'spectral_centroid', 'zero_crossing_rate'])

# Function to extract audio features


def extract_audio_features(file_path, sr=22050):
    try:
        # Load audio file
        audio, sr = librosa.load(file_path, sr=sr)

        # Extract MFCC features
        mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13).flatten()

        # Extract spectral centroid
        spectral_centroid = librosa.feature.spectral_centroid(
            y=audio, sr=sr).flatten()

        # Extract zero-crossing rate
        zero_crossing_rate = librosa.feature.zero_crossing_rate(
            audio).flatten()

        # Calculate duration
        duration = librosa.get_duration(y=audio, sr=sr)

        return duration, mfcc, spectral_centroid, zero_crossing_rate

    except (librosa.util.exceptions.ParameterError, Exception) as e:
        print(f"Error processing file {file_path}: {e}. Skipping this file.")
        return None, None, None, None


def pad_array(arr, max_length):
    current_length = arr.shape[0]
    if current_length < max_length:
        pad_width = ((0, max_length - current_length),) + ((0, 0),) * (arr.ndim - 1)
        return np.pad(arr, pad_width, mode='constant')
    else:
        return arr

max_lengths = {"mfcc": 0, "spectral_centroid": 0, "zero_crossing_rate": 0}

samples = 10

# Iterate over audio files with tqdm, limit to 1500 files
for file_path in tqdm(audio_files[:samples], desc="Processing audio files"):
    track_id = int(os.path.basename(file_path)[:-4])
    duration, mfcc_features, spectral_centroid, zero_crossing_rate = extract_audio_features(
        file_path, sr=22050)
    if len(mfcc_features) > max_lengths["mfcc"]:
        max_lengths["mfcc"] = len(mfcc_features)
    elif len(spectral_centroid) > max_lengths["spectral_centroid"]:
        max_lengths["spectral_centroid"] = len(spectral_centroid)
    elif len(zero_crossing_rate) > max_lengths["zero_crossing_rate"]:
        max_lengths["zero_crossing_rate"] = len(zero_crossing_rate)


for file_path in tqdm(audio_files[:samples], desc="Processing audio files"):
    track_id = int(os.path.basename(file_path)[:-4])
    duration, mfcc_features, spectral_centroid, zero_crossing_rate = extract_audio_features(
        file_path, sr=22050)
    mfcc_features = pad_array(mfcc_features, max_lengths["mfcc"])
    spectral_centroid = pad_array(spectral_centroid, max_lengths["spectral_centroid"])
    zero_crossing_rate = pad_array(zero_crossing_rate, max_lengths["zero_crossing_rate"])

    if duration is not None and mfcc_features is not None:
        # features_df.loc[len(features_df)] = {'file_path': file_path,
        #                                      'track_id': track_id, 'duration': duration,
        #                                      'mfcc': mfcc_features,
        #                                      'spectral_centroid': spectral_centroid,
        #                                      'zero_crossing_rate': zero_crossing_rate}
        # Insert data into MongoDB collection
        # record_obj = {f"mfcc_{i}":mfcc_features[i].item() for i in range(500)}
        record_obj = {}
        # record_obj["mfcc"] = mfcc_features.tolist()
        record_obj["file_path"] = file_path
        record_obj["track_id"] = track_id
        record_obj["duration"] = duration
        record_obj["zero_crossing_rate"] = zero_crossing_rate.tolist()
        # record_obj["spectral_centroid"] = spectral_centroid.tolist()

        collection.insert_one(record_obj)

# # Round duration and convert track_id to int
# features_df['track_id'] = features_df['track_id'].astype(int)
# features_df['duration'] = features_df['duration'].round(2)SSSS

# Save DataFrame to CSV
# features_df.to_csv('audio_features_1425.csv', index=False)
# print("DataFrame saved to 'audio_features_1425.csv'")

# Save DataFrame to MongoDB
# records = features_df.to_dict(orient='records')
# collection.insert_many(records)
print("Data saved to MongoDB")


# Close the MongoClient connection
client.close()

# Print a message indicating completion
print("Data inserted into MongoDB collection 'features'")

Processing audio files: 100%|██████████| 10/10 [00:01<00:00,  5.04it/s]
Processing audio files: 100%|██████████| 10/10 [00:01<00:00,  8.37it/s]

Data saved to MongoDB
Data inserted into MongoDB collection 'features'





In [2]:
type(duration)

float

In [2]:
max_lengths

{'mfcc': 16809, 'spectral_centroid': 1293, 'zero_crossing_rate': 1293}

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Music Recommendation") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/music.features") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/music.features") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .getOrCreate()

24/05/12 20:49:26 WARN Utils: Your hostname, abubakar-Lenovo-Flex-7-14IAU7 resolves to a loopback address: 127.0.1.1; using 192.168.100.63 instead (on interface wlp0s20f3)
24/05/12 20:49:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/abubakar/Downloads/spark-3.1.1-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/abubakar/.ivy2/cache
The jars for the packages stored in: /home/abubakar/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fc9eae27-67a6-4eef-b78f-f03d7c966e36;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.3.0 in central
	found org.mongodb#mongodb-driver-sync;4.8.2 in central
	[4.8.2] org.mongodb#mongodb-driver-sync;[4.8.1,4.8.99)
	found org.mongodb#bson;4.8.2 in central
	found org.mongodb#mongodb-driver-core;4.8.2 in central
	found org.mongodb#bson-record-codec;4.8.2 in central
:: resolution report :: resolve 6279ms :: artifacts dl 3ms
	:: modules in use:
	org.mongodb#bson;4.8.2 from central in [default]
	org.mongodb#bson-record-codec;4.8.2 from central in [default]
	org.mongodb#mongodb-driver-core;4.8.2 from central in [default]
	org.mongodb#mongodb-driver-sync;4.8.2 from central in [default]
	org.mongodb.spark#mongo-spark-connec

In [68]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
import pandas as pd
from pyspark.sql.types import StructType,StructField,StringType,DoubleType,IntegerType,ArrayType
# detch data from mongo db 
data=list(collection.find())
# convert mongo db to pandas data frame 
df_pandas=pd.DataFrame(data)

# replace non value with empty list 
df_pandas["mfcc"].fillna(value=pd.Series([[]]*len(df_pandas))),
df_pandas['spectral_centroid'].fillna(value=pd.Series([[]]*len(df_pandas)),inplace=True),
df_pandas['zero_crossing_rate'].fillna(value=pd.Series([[]]*len(df_pandas)),inplace=True)

# Convert duration field to numeric
df_pandas['duration'] = pd.to_numeric(df_pandas['duration'], errors='coerce')



# Define the schema
schema = StructType([
    StructField("_id", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("file_path", StringType(), True),
    StructField("mfcc", ArrayType(DoubleType()), True),
    StructField("spectral_centroid", ArrayType(DoubleType()), True),
    StructField("track_id", IntegerType(), True),
    StructField("zero_crossing_rate", ArrayType(DoubleType()), True)
])


# Convert Pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(df_pandas, schema=schema)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_pandas['spectral_centroid'].fillna(value=pd.Series([[]]*len(df_pandas)),inplace=True),
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_pandas['zero_crossing_rate'].fillna(value=pd.Series([[]]*len(df_pandas)),inplace=True)


TypeError: field duration: DoubleType can not accept object '/home/abubakar/Documents/BDA/Project/fma_medium/120/120147.mp3' in type <class 'str'>

In [4]:
df_spark = spark.read \
.format("mongodb") \
.load()

In [5]:
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import VectorAssembler
import numpy as np


array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())
df_new = df_spark.select(array_to_vector_udf(df_spark['zero_crossing_rate']).alias('features'))

In [40]:
!$PYSPARK_DRIVER_PYTHON

In [26]:
df_spark

DataFrame[_id: string, duration: double, file_path: string, track_id: int, zero_crossing_rate: array<double>]

In [27]:
# Split the data into training and testing sets
(training, test) = df_spark.randomSplit([0.8, 0.2])


In [28]:
training.show(5)

+--------------------+------------------+--------------------+--------+--------------------+
|                 _id|          duration|           file_path|track_id|  zero_crossing_rate|
+--------------------+------------------+--------------------+--------+--------------------+
|6640c65da1a916e6d...|29.976598639455784|/home/abubakar/Do...|  120167|[0.09716796875, 0...|
|6640c65da1a916e6d...|29.976598639455784|/home/abubakar/Do...|  120475|[0.11328125, 0.13...|
|6640c65da1a916e6d...|29.976598639455784|/home/abubakar/Do...|  120932|[0.01416015625, 0...|
|6640c65ea1a916e6d...|29.988979591836735|/home/abubakar/Do...|  120334|[0.02197265625, 0...|
|6640c65ea1a916e6d...|29.976598639455784|/home/abubakar/Do...|  120163|[0.0927734375, 0....|
+--------------------+------------------+--------------------+--------+--------------------+
only showing top 5 rows



In [7]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import VectorAssembler
import numpy as np

# Function to pad feature vectors to a fixed length
def pad_features(feat, max_len):
    padded_feat = feat + [0.0] * (max_len - len(feat))
    return Vectors.dense(padded_feat)

# UDF to pad feature vectors
pad_features_udf = udf(lambda feat, max_len: pad_features(feat, max_len), VectorUDT())


def apply_pca(df, input_col, k):
    # Find the maximum length of feature vectors
    max_len = df.selectExpr("size({}) as size".format(input_col)).agg({"size": "max"}).collect()[0]["max(size)"]
    
    # Pad feature vectors to the maximum length
    df = df.withColumn("padded_features", pad_features_udf(col(input_col), lit(max_len)))
    
    # Apply PCA
    pca = PCA(k=k, inputCol="padded_features", outputCol="pca_features")
    pca_model = pca.fit(df)
    df_pca = pca_model.transform(df)
    
    return df_pca

In [8]:
df_spark.columns
df_spark.printSchema()


root
 |-- _id: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- file_path: string (nullable = true)
 |-- mfcc: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- spectral_centroid: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- track_id: integer (nullable = true)
 |-- zero_crossing_rate: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [9]:
from pyspark.sql.functions import explode
# explode the array into individual rows
exploded_df = df_spark.select(explode("zero_crossing_rate").alias("zero_crossing_rate"))

assembler=VectorAssembler(inputCols=["zero_crossing_rate"],outputCol="feature_vector")
spark_df = assembler.transform(exploded_df)

spark_df.show()


+------------------+---------------+
|zero_crossing_rate| feature_vector|
+------------------+---------------+
|       0.072265625|  [0.072265625]|
|     0.09130859375|[0.09130859375]|
|       0.111328125|  [0.111328125]|
|     0.06396484375|[0.06396484375]|
|     0.05419921875|[0.05419921875]|
|      0.0517578125| [0.0517578125]|
|        0.05078125|   [0.05078125]|
|         0.0546875|    [0.0546875]|
|      0.0830078125| [0.0830078125]|
|     0.15283203125|[0.15283203125]|
|      0.1826171875| [0.1826171875]|
|     0.24560546875|[0.24560546875]|
|     0.25146484375|[0.25146484375]|
|     0.18212890625|[0.18212890625]|
|      0.1455078125| [0.1455078125]|
|     0.07861328125|[0.07861328125]|
|            0.0625|       [0.0625]|
|      0.0673828125| [0.0673828125]|
|      0.0908203125| [0.0908203125]|
|     0.13427734375|[0.13427734375]|
+------------------+---------------+
only showing top 20 rows



                                                                                

In [10]:
df_spark.printSchema()

root
 |-- _id: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- file_path: string (nullable = true)
 |-- mfcc: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- spectral_centroid: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- track_id: integer (nullable = true)
 |-- zero_crossing_rate: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [11]:
df_spark.select("mfcc").limit(1).collect()[0]

Row(mfcc=None)

In [12]:
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())
df_new = df_spark.select(array_to_vector_udf(df_spark['zero_crossing_rate']).alias('features'), df_spark['track_id'])

In [13]:
df_new.printSchema()

root
 |-- features: vector (nullable = true)
 |-- track_id: integer (nullable = true)



In [15]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col,monotonically_increasing_id
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
    
    

# Assuming 'df' is your DataFrame with features, and 'features_col' is the name of the column containing features
# 'k' is the number of nearest neighbors you want to find
def approximate_nearest_neighbors(spark_df, features_col='zero', k=5):
    # Create an LSH model

    
    brp_lsh = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", numHashTables=10)
    
    # Fit the model
    model = brp_lsh.fit(spark_df)
    
    
    return model

def find_similar_song(spark_df, query_id, model, k):
    query_df = spark_df.filter(col("_id") == query_id)
    # Get the original ID
    # Approximate nearest neighbor search
    df_nn = model.approxNearestNeighbors(query_df, spark_df, k, distCol="distance")

    df_nn = df_nn.withColumn("track_id", monotonically_increasing_id())
    
    return df_nn.select("track_id", "distance", "hashes")


In [18]:
# Check the structure of the feature_vector column
spark_df.select("feature_vector").show(5, truncate=False)


+---------------+
|feature_vector |
+---------------+
|[0.072265625]  |
|[0.09130859375]|
|[0.111328125]  |
|[0.06396484375]|
|[0.05419921875]|
+---------------+
only showing top 5 rows



                                                                                

In [24]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col,monotonically_increasing_id
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
    
    

# Assuming 'df' is your DataFrame with features, and 'features_col' is the name of the column containing features
# 'k' is the number of nearest neighbors you want to find
def approximate_nearest_neighbors(spark_df, features_col='zero', k=5):
    # Create an LSH model
    assembler = VectorAssembler(inputCols=[features_col], outputCol="features_vector")
    spark_df = assembler.transform(spark_df)
    print(spark_df)

    
    brp_lsh = BucketedRandomProjectionLSH(inputCol="features_vector", outputCol="hashes", numHashTables=10)
    
    # Fit the model
    model = brp_lsh.fit(spark_df)
    
    
    return model

def find_similar_song(spark_df, query_id, model, k):
    query_df = spark_df.filter(col("_id") == query_id)
    # Get the original ID
    # Approximate nearest neighbor search
    df_nn = model.approxNearestNeighbors(query_df, spark_df, k, distCol="distance")

    df_nn = df_nn.withColumn("track_id", monotonically_increasing_id())
    
    return df_nn.select("track_id", "distance", "hashes")

# Example usage:
# df_nn = approximate_nearest_neighbors(df, "features")
# df_nn.show()


In [25]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf

from pyspark.ml.linalg import VectorUDT

def array_to_vector(arr):
    return Vectors.dense(arr)

array_to_vector_udf = udf(array_to_vector, VectorUDT())

from pyspark.sql.types import StructType, StructField, DoubleType

new_df = df_spark.withColumn("mfcc_vector", array_to_vector_udf(col("mfcc")))

In [26]:
new_df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- file_path: string (nullable = true)
 |-- mfcc: vector (nullable = true)
 |-- spectral_centroid: vector (nullable = true)
 |-- track_id: integer (nullable = true)
 |-- zero_crossing_rate: vector (nullable = true)
 |-- mfcc_vector: vector (nullable = true)



In [27]:
df_spark.printSchema()

root
 |-- _id: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- file_path: string (nullable = true)
 |-- mfcc: vector (nullable = true)
 |-- spectral_centroid: vector (nullable = true)
 |-- track_id: integer (nullable = true)
 |-- zero_crossing_rate: vector (nullable = true)



In [31]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [32]:
# Convert Pandas DataFrame to PySpark DataFrame

feature_columns = df_spark.columns[2:-1]


In [34]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Split the data into training and testing sets
(training, test) = df_spark.randomSplit([0.8, 0.2])
# Prepare the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="mfcc", labelCol="rating", numTrees=10)

In [36]:
df_spark

DataFrame[_id: string, duration: double, file_path: string, mfcc: vector, spectral_centroid: vector, track_id: int, zero_crossing_rate: vector]