In [3]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.preprocessing import LabelEncoder

# Load features from CSV file
features_df = pd.read_csv("features.csv")

# Convert all columns to strings
features_df = features_df.astype(str)

# Convert categorical data to numerical data
label_encoder = LabelEncoder()
for column in features_df.columns:
    features_df[column] = label_encoder.fit_transform(features_df[column])

# Check if there are any numeric features
numeric_features_df = features_df.select_dtypes(include=[np.number])
if numeric_features_df.empty:
    print("No numeric features found in the dataset.")
else:
    # Separate the features from the dataframe
    features = numeric_features_df.values

    # Normalization
    scaler = MinMaxScaler()
    normalized_features = scaler.fit_transform(features)

    # Standardization
    scaler = StandardScaler()
    standardized_features = scaler.fit_transform(features)

    # Dimensionality reduction (PCA)
    pca = PCA(n_components=5)  # Assuming you want to reduce to 5 dimensions
    reduced_features = pca.fit_transform(features)

    # Combine all transformed features into a single DataFrame
    transformed_df = pd.DataFrame(normalized_features, columns=[f"Normalized_Feature_{i}" for i in range(normalized_features.shape[1])])
    transformed_df = transformed_df.join(pd.DataFrame(standardized_features, columns=[f"Standardized_Feature_{i}" for i in range(standardized_features.shape[1])]))
    transformed_df = transformed_df.join(pd.DataFrame(reduced_features, columns=[f"Reduced_Component_{i}" for i in range(reduced_features.shape[1])]))

    # Save the transformed features to a single CSV file
    transformed_df.to_csv("transformed_features.csv", index=False)


  features_df = pd.read_csv("features.csv")


In [4]:
features_df

Unnamed: 0,feature,chroma_cens,chroma_cens.1,chroma_cens.2,chroma_cens.3,chroma_cens.4,chroma_cens.5,chroma_cens.6,chroma_cens.7,chroma_cens.8,...,tonnetz.39,tonnetz.40,tonnetz.41,zcr,zcr.1,zcr.2,zcr.3,zcr.4,zcr.5,zcr.6
0,106575,104375,104380,104388,104393,104408,104377,104388,104382,104373,...,104170,103962,103954,104378,2582,104217,1152,213,104320,104272
1,106574,88275,89860,87670,89427,89094,88132,88809,87624,89339,...,103147,102938,102931,11205,1912,103193,920,174,16350,103249
2,106576,104376,104381,104389,104394,104409,104378,104389,104383,104374,...,104169,103961,103953,104379,2583,104218,1153,214,104319,104271
3,47614,103974,103477,99202,93598,94365,103382,94398,99927,104209,...,103584,102959,102947,80892,2373,104157,1096,1,47676,104133
4,54358,96661,104091,101531,99456,95723,103879,93337,88442,90467,...,103708,103039,103276,45798,2380,104153,1081,1,34469,104187
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
106572,44209,26288,79470,98604,3966,95796,98244,11610,103464,86706,...,82433,64073,43086,70800,312,18798,94,9,15414,6574
106573,44210,24724,11443,25910,22388,67434,13280,33917,3773,11426,...,85230,69913,82651,58563,360,22766,92,6,22961,19004
106574,44211,79826,5172,27948,72277,11105,28146,96151,92750,93309,...,66829,28074,47964,42921,394,36088,130,8,12826,16556
106575,44212,8695,70914,2381,75792,64742,44752,41896,80532,34392,...,45709,34039,28768,85594,301,41745,134,11,37282,21425


In [6]:
import os
import librosa
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import concurrent.futures
import warnings

# Suppress PySoundFile and Audioread warnings
warnings.filterwarnings("ignore", message="PySoundFile failed. Trying audioread instead.")

def extract_features(audio_file, sr=22050, n_mfcc=13):
    try:
        # Load audio file
        y, sr = librosa.load(audio_file, sr=sr)
        
        # Extract MFCC features
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        
        # Compute mean of each MFCC coefficient
        mfcc_mean = np.mean(mfcc, axis=1)
        
        return mfcc_mean
    except Exception as e:
        print(f"Error processing {audio_file}: {e}")
        return None

# Function to process audio files in parallel
def process_audio_files(audio_files):
    features = []
    for audio_file in audio_files:
        mfcc_features = extract_features(audio_file)
        if mfcc_features is not None:
            file_name = os.path.basename(audio_file)  # Extract file name without path
            features.append([file_name, *mfcc_features])
    return features

# Function to normalize features
def normalize_features(features):
    # Min-max normalization
    normalized_features = (features - features.min()) / (features.max() - features.min())
    return normalized_features

# Function to standardize features
def standardize_features(features):
    # Standardization
    scaler = StandardScaler()
    standardized_features = scaler.fit_transform(features)
    return standardized_features

# Path to the folder containing subfolders of audio files
fma_large_folder = "audio"

# Initialize an empty list to store features
features_list = []

# Define the number of threads
num_threads = os.cpu_count()

# Loop through subfolders in fma_large
for folder_name in sorted(os.listdir(fma_large_folder)):
    folder_path = os.path.join(fma_large_folder, folder_name)
    if os.path.isdir(folder_path):
        print("Processing folder:", folder_name)
        
        # List all MP3 files in the current subfolder
        mp3_files = [os.path.join(folder_path, file_name) for file_name in sorted(os.listdir(folder_path)) if file_name.endswith(".mp3")]
        
        # Process audio files in parallel using multithreading
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            results = executor.map(process_audio_files, [mp3_files[i:i + num_threads] for i in range(0, len(mp3_files), num_threads)])
        
        # Flatten the list of features
        for batch in results:
            features_list.extend(batch)

# Convert features list to DataFrame
columns = ['filename'] + [f"mfcc_{i}" for i in range(len(features_list[0]) - 1)]
features_df = pd.DataFrame(features_list, columns=columns)

# Drop any rows with missing values (if any)
features_df.dropna(inplace=True)

# Separate the filename column
file_names = features_df['filename']
features_df.drop(columns=['filename'], inplace=True)

# Apply normalization and standardization to features (excluding filename column)
normalized_features = normalize_features(features_df)
standardized_features = standardize_features(normalized_features)

# Perform dimensionality reduction using PCA
pca = PCA(n_components=10)  # Specify the number of components to keep
reduced_features = pca.fit_transform(standardized_features)

# Convert reduced features to DataFrame
reduced_features_df = pd.DataFrame(reduced_features, columns=[f"pca_{i}" for i in range(reduced_features.shape[1])])

# Concatenate filename column to reduced features DataFrame
reduced_features_df.insert(0, 'filename', file_names)

# Save the reduced features DataFrame to a new CSV file
reduced_features_df.to_csv("reduced_audio_features.csv", index=False)


Processing folder: 000
Processing folder: 001


	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Error processing audio\001\001486.mp3: 


In [7]:
import pandas as pd

# Replace 'path_to_your_csv_file' with the actual path to your CSV file
csv_file_path = 'reduced_audio_features.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)

# Display the DataFrame
print(df)


        filename     pca_0     pca_1     pca_2     pca_3     pca_4     pca_5  \
0     000002.mp3 -0.213493 -0.889269  0.300109 -1.396368 -0.202127 -0.782602   
1     000003.mp3  0.381634 -0.571499 -0.498397 -0.911051  0.144306 -0.135533   
2     000005.mp3  1.516205 -0.187136  0.846741 -1.151169  0.056882 -1.181047   
3     000010.mp3 -0.875782 -1.428132  0.076501  0.801126 -0.037359 -0.455641   
4     000020.mp3  1.863987  0.421322 -0.032939 -0.049972  0.339171 -0.044254   
...          ...       ...       ...       ...       ...       ...       ...   
1612  001995.mp3 -0.415149 -0.397714 -0.088522  0.741243 -2.236274  4.584163   
1613  001996.mp3 -1.006367 -3.281262  0.643810  0.356301 -0.995869 -0.420045   
1614  001997.mp3  0.666179  0.840533  0.375378 -0.177088 -0.941006 -0.638792   
1615  001998.mp3 -0.478807  1.101714  0.993000 -0.420857 -0.027245  0.200850   
1616  001999.mp3 -1.197815  2.253566  0.047799 -0.086981  0.215302 -0.164473   

         pca_6     pca_7     pca_8     

In [6]:
reduced_features_df

Unnamed: 0,pc_0,pc_1,pc_2,pc_3,pc_4,pc_5,pc_6,pc_7,pc_8,pc_9
0,2.775385,-0.512451,-1.398603,0.766013,1.904395,2.08756,-0.067536,-0.661068,0.120825,-0.114971
1,2.742665,-0.478032,2.643861,0.371387,0.4753,-0.785631,-0.154473,-0.638914,-0.20686,0.917257
2,-0.779586,-0.885235,-0.654513,-0.18593,-0.46282,-0.079726,0.478409,0.296423,0.761742,0.362223
3,-0.239705,-1.24525,-0.929713,0.350711,0.067621,-0.576786,0.258883,0.597501,0.542174,0.105898
4,2.438098,-0.014551,-0.324895,1.198067,-1.613589,-0.289867,-1.392326,-0.511372,-0.262235,-0.331464
5,-1.123491,5.063447,0.209638,0.614883,-0.085365,0.092426,0.558527,-0.485258,0.41905,-0.162114
6,0.692517,-0.387207,1.58581,-0.821482,0.327681,0.095521,0.386481,0.181995,0.29093,0.129893
7,-2.141782,-1.617599,-0.902444,-1.414973,0.404663,-0.152856,-1.000738,-0.559777,-0.066186,-0.136791
8,5.31636,-0.022638,-0.722914,-1.403744,-0.617986,0.70927,0.29518,1.035913,-0.210943,-0.211891
9,-1.291747,-1.268354,-1.352191,-1.268271,-0.115264,-0.344346,-0.037749,-0.3288,-0.338828,0.045768


In [11]:
import tarfile
import gzip
import os

def extract_tar_gz(file_path, extract_path):
    try:
        # Open the .tar.gz file
        with gzip.open(file_path, 'rb') as f_in:
            # Open the .tar file
            with tarfile.open(fileobj=f_in, mode='r') as tar:
                # Extract the contents to the specified path
                tar.extractall(path=extract_path)
        print("Extraction completed successfully.")
    except Exception as e:
        print(f"An error occurred: {e}")

# Example usage:
file_path = 'hadoop-3.4.0.tar.gz'  # Replace 'hadoop-3.4.0.tar.gz' with the path to your file
extract_path = 'D:/project/'  # Replace 'D:/project/' with your desired extraction path

# Ensure the extraction directory exists
os.makedirs(extract_path, exist_ok=True)

extract_tar_gz(file_path, extract_path)


An error occurred: [Errno 2] No such file or directory: 'D:\\project\\hadoop-3.4.0\\share\\doc\\hadoop\\hadoop-yarn\\hadoop-yarn-server\\hadoop-yarn-server-resourcemanager\\apidocs\\org\\apache\\hadoop\\yarn\\server\\resourcemanager\\monitor\\capacity\\class-use\\ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy.html'


In [6]:
# import pandas as pd
# import pymongo

# # Function to connect to MongoDB
# def connect_to_mongodb():
#     # Connect to MongoDB
#     client = pymongo.MongoClient("mongodb://localhost:27017/")  # Replace with your MongoDB connection string
#     # Create or select a database
#     db = client["music_features_database"]
#     return db

# # Path to the CSV file containing transformed data
# csv_file = "reduced_audio_features.csv"

# # Read the CSV file into a DataFrame
# transformed_data_df = pd.read_csv(csv_file)

# # Connect to MongoDB
# db = connect_to_mongodb()

# # Convert DataFrame to dictionary
# transformed_data_dict = transformed_data_df.to_dict(orient="records")

# # Insert data into MongoDB collection
# collection_name = "audio_features_collection"
# collection = db[collection_name]
# collection.insert_many(transformed_data_dict)

# print("Transformed data has been successfully stored in MongoDB.")


In [7]:
import os
import librosa
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import pymongo
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to extract audio features
def extract_features(audio_file, sr=22050, n_mfcc=13):
    try:
        y, sr = librosa.load(audio_file, sr=sr)
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        mfcc_mean = np.mean(mfcc, axis=1)
        return mfcc_mean
    except Exception as e:
        logger.error(f"Error extracting features from {audio_file}: {e}")
        return None

# Function to normalize features
def normalize_features(features):
    try:
        normalized_features = (features - features.min()) / (features.max() - features.min())
        return normalized_features
    except Exception as e:
        logger.error(f"Error normalizing features: {e}")
        return None

# Function to standardize features
def standardize_features(features):
    try:
        scaler = StandardScaler()
        standardized_features = scaler.fit_transform(features)
        return standardized_features
    except Exception as e:
        logger.error(f"Error standardizing features: {e}")
        return None

# Function to connect to MongoDB
def connect_to_mongodb(uri, db_name):
    try:
        client = pymongo.MongoClient(uri)
        db = client[db_name]
        return db
    except Exception as e:
        logger.error(f"Error connecting to MongoDB: {e}")
        return None

# Main function to process audio files and store data in MongoDB
def process_audio_files(audio_folder, mongodb_uri, db_name, output_csv):
    try:
        features_df = pd.DataFrame()
        for file in os.listdir(audio_folder):
            if file.endswith(".mp3"):
                audio_file = os.path.join(audio_folder, file)
                mfcc_features = extract_features(audio_file)
                if mfcc_features is not None:
                    df = pd.DataFrame([mfcc_features], columns=[f"mfcc_{i}" for i in range(len(mfcc_features))])
                    features_df = pd.concat([features_df, df], ignore_index=True)

        if features_df.empty:
            logger.error("No audio features extracted.")
            return

        normalized_features = normalize_features(features_df)
        if normalized_features is None:
            return

        standardized_features = standardize_features(normalized_features)
        if standardized_features is None:
            return

        pca = PCA(n_components=10)
        reduced_features = pca.fit_transform(standardized_features)
        reduced_features_df = pd.DataFrame(reduced_features, columns=[f"pc_{i}" for i in range(reduced_features.shape[1])])
        reduced_features_df.to_csv(output_csv, index=False)

        db = connect_to_mongodb(mongodb_uri, db_name)
        if db is not None:
            transformed_data_dict = reduced_features_df.to_dict(orient="records")
            collection_name = "audio_features_collectionn"
            collection = db[collection_name]
            collection.insert_many(transformed_data_dict)
            logger.info("Transformed data has been successfully stored in MongoDB.")
        else:
            logger.error("Failed to connect to MongoDB.")

    except Exception as e:
        logger.error(f"An error occurred during processing: {e}")

# Set parameters
audio_folder = "audio"
mongodb_uri = "mongodb://localhost:27017/"
db_name = "music_features_databasee"
output_csv = "reduced_audio_features.csv"

# Process audio files and store data in MongoDB
process_audio_files(audio_folder, mongodb_uri, db_name, output_csv)


INFO:__main__:Transformed data has been successfully stored in MongoDB.


In [8]:
import pymongo

# MongoDB connection URI
mongo_uri = "mongodb://localhost:27017/"

try:
    # Connect to MongoDB
    client = pymongo.MongoClient(mongo_uri)
    
    # List available databases
    print("Available databases:")
    print(client.list_database_names())
    
    # List collections in a specific database
    db = client["music_features_database"]
    print("Collections in music_features_database:")
    print(db.list_collection_names())
    
    # Close connection
    client.close()
    
    print("Connection successful!")
except Exception as e:
    print("Connection error:", e)


INFO:pymongo.serverSelection:{"message": "Waiting for suitable server to become available", "selector": "Primary()", "operation": "listDatabases", "topologyDescription": "<TopologyDescription id: 663d7a0573acb8a1044114cb, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None>]>", "clientId": {"$oid": "663d7a0573acb8a1044114cb"}, "remainingTimeMS": 30}


Available databases:
['admin', 'config', 'local', 'music_features_databasee']
Collections in music_features_database:
[]
Connection successful!


Training the Recommendation Model with Apache Spark


In [10]:
#if spark works
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MusicRecommendationModelTraining") \
    .getOrCreate()

# Load transformed audio features from MongoDB or CSV file
transformed_data = spark.read.format("mongo").load("mongodb://localhost:27017/music_features_databasee/audio_features_collectionn")
# Example: transformed_data = spark.read.csv("reduced_audio_features.csv", header=True, inferSchema=True)

# Split the data into training and testing sets (80% train, 20% test)
(training_data, test_data) = transformed_data.randomSplit([0.8, 0.2])

# Train ALS model
als = ALS(maxIter=10, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

# Stop Spark session
spark.stop()


FileNotFoundError: [WinError 2] The system cannot find the file specified

Exploring Collaborative Filtering and ANN Algorithms


In [None]:
#if spark works

# Collaborative Filtering
from pyspark.ml.recommendation import ALS

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CollaborativeFilteringExample") \
    .getOrCreate()

# Load transformed audio features from MongoDB or CSV file
transformed_data = spark.read.format("mongo").load("mongodb://localhost:27017/music_features_database.reduced_audio_features_collection")
# Example: transformed_data = spark.read.csv("reduced_audio_features.csv", header=True, inferSchema=True)

# Split the data into training and testing sets (80% train, 20% test)
(training_data, test_data) = transformed_data.randomSplit([0.8, 0.2])

# Train ALS model for collaborative filtering
als = ALS(maxIter=10, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training_data)

# ANN - Approximate Nearest Neighbours
# You can use a library like Annoy, Faiss, or Hnswlib for ANN implementation

# Stop Spark session
spark.stop()


Evaluating the Model with Various Metrics and Hyperparameter Tuning


In [None]:
#if spark works

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ModelEvaluationAndHyperparameterTuning") \
    .getOrCreate()

# Load transformed audio features from MongoDB or CSV file
transformed_data = spark.read.format("mongo").load("mongodb://localhost:27017/music_features_database.reduced_audio_features_collection")
# Example: transformed_data = spark.read.csv("reduced_audio_features.csv", header=True, inferSchema=True)

# Split the data into training and testing sets (80% train, 20% test)
(training_data, test_data) = transformed_data.randomSplit([0.8, 0.2])

# Define ALS model
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")

# Define parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [10, 20, 30]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Define cross-validator
cross_validator = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Perform hyperparameter tuning
cv_model = cross_validator.fit(training_data)

# Make predictions on the test data
predictions = cv_model.transform(test_data)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

# Best model parameters
best_model = cv_model.bestModel
print("Best Rank =", best_model.rank)
print("Best Max Iter =", best_model._java_obj.parent().getMaxIter())
print("Best Regularization Parameter =", best_model._java_obj.parent().getRegParam())

# Stop Spark session
spark.stop()


Deployment - Cell 4:

In [13]:
from pyspark.ml.recommendation import ALSModel
from flask import Flask, render_template, request
from kafka import KafkaProducer

# Initialize Flask app
app = Flask(__name__)

# Load the trained ALS model
saved_model_path = "music_recommendation_model"
trained_model = ALSModel.load(saved_model_path)

# Initialize Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Route for home page
@app.route('/')
def home():
    return render_template('index.html')

# Route for generating recommendations
@app.route('/recommend', methods=['POST'])
def recommend():
    # Extract user input
    user_id = request.form.get('user_id')
    num_recommendations = int(request.form.get('num_recommendations'))

    # Generate recommendations using the trained model
    recommendations = trained_model.recommendForUserSubset(spark.createDataFrame([(user_id,)], ["user_id"]), num_recommendations)

    # Send recommendations to Kafka topic
    for row in recommendations.collect():
        for rec in row.recommendations:
            producer.send('recommendations_topic', key=str(row.user_id).encode(), value=str(rec.item_id).encode())

    # Return recommendations to user interface
    return render_template('recommendations.html', recommendations=recommendations)

# Route for receiving user activity
@app.route('/user_activity', methods=['POST'])
def user_activity():
    # Extract user activity
    user_id = request.form.get('user_id')
    item_id = request.form.get('item_id')

    # Send user activity to Kafka topic
    producer.send('user_activity_topic', key=user_id.encode(), value=item_id.encode())

    return 'User activity sent to Kafka topic'

if __name__ == '__main__':
    app.run(debug=True)


ModuleNotFoundError: No module named 'flask'