In [6]:
!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (670 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/670.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/670.0 kB[0m [31m6.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m665.6/670.0 kB[0m [31m11.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m670.0/670.0 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.6.1-py3-none-any.whl (307 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/307.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collecte

#Phase 1

In [8]:
import numpy as np
import pandas as pd
import pymongo
import librosa
import os
import zipfile
import urllib.request
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# Step 1: Download and Extract the Dataset
# Define the URL and filename of the dataset
dataset_url = "https://www.kaggle.com/datasets/aaronyim/fma-small"
dataset_filename = "fma_small.zip"

# Define the directory to extract the dataset
extract_dir = "fma_small"

# Check if the dataset zip file already exists
if not os.path.exists(dataset_filename):
    # If the dataset zip file does not exist, download it
    print("Downloading dataset...")
    urllib.request.urlretrieve(dataset_url, dataset_filename)
    print("Download complete.")

# Check if the extraction directory already exists
if not os.path.exists(extract_dir):
    # If the extraction directory does not exist, create it
    os.makedirs(extract_dir)

# Extract the dataset
print("Extracting dataset...")
with zipfile.ZipFile(fma_small, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)
print("Extraction complete.")

# Step 2: Load the Dataset
with zipfile.ZipFile('fma_metadata.zip', 'r') as zip_ref:
    zip_ref.extractall('fma_small')

# Load metadata
metadata_df = pd.read_csv('fma_small/tracks.csv')

# Step 3: Feature Extraction
def extract_features(audio_file):
    # Load audio file
    y, sr = librosa.load(audio_file)

    # Extract MFCC (Mel-Frequency Cepstral Coefficients)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)  # Extract 13 MFCC coefficients

    # Extract spectral centroid
    spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)

    return mfcc, spectral_centroid

# Apply feature extraction to each audio file and store in a DataFrame
features_df = pd.DataFrame(columns=['track_id', 'mfcc', 'spectral_centroid'])
for track_id, audio_file in metadata_df[['track_id', 'path']].iterrows():
    mfcc, spectral_centroid = extract_features(os.path.join('fma_dataset', 'fma_small', 'fma_small', '{:06d}'.format(track_id), '{:06d}.mp3'.format(track_id)))
    features_df = features_df.append({'track_id': track_id, 'mfcc': mfcc.tolist(), 'spectral_centroid': spectral_centroid.tolist()}, ignore_index=True)

# Step 4: Normalization or Standardization
# Apply normalization or standardization to the extracted features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features_df['features'].values.tolist())
features_df['features'] = scaled_features.tolist()

# Step 5: Dimensionality Reduction
# Apply dimensionality reduction techniques
pca = PCA(n_components=50)
reduced_features = pca.fit_transform(features_df['features'].values.tolist())
features_df['features'] = reduced_features.tolist()

# Step 6: Storage in MongoDB
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['music_recommendation']
collection = db['audio_features']

# Convert DataFrame to JSON format and insert into MongoDB collection
for row in features_df.itertuples():
    collection.insert_one({'track_id': row.track_id, 'mfcc': row.mfcc, 'spectral_centroid': row.spectral_centroid})

# Print success message
print("Data stored in MongoDB successfully!")


Extracting dataset...


NameError: name 'fma_small' is not defined

#Phase 2


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# Step 1: Connect to MongoDB and retrieve the audio features data
spark = SparkSession.builder \
    .appName("MusicRecommendationModel") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/music_recommendation.audio_features") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/music_recommendation.recommendations") \
    .getOrCreate()

# Load audio features data from MongoDB
audio_features_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Feature engineering and assembling
feature_columns = audio_features_df.columns  # Assuming all columns except 'track_id' are features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(audio_features_df)

# Splitting the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

# Scaling the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(train_data)
train_data_scaled = scaler_model.transform(train_data)
test_data_scaled = scaler_model.transform(test_data)

# Step 2: Training the music recommendation model with hyperparameter tuning
# Initialize ALS model
als = ALS(userCol="track_id", itemCol="track_id", ratingCol="play_count",
          coldStartStrategy="drop", nonnegative=True)

# Create ParamGrid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [10, 20]) \
    .addGrid(als.regParam, [0.01, 0.1]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="play_count", predictionCol="prediction")

# Create TrainValidationSplit
tvs = TrainValidationSplit(estimator=als,
                            estimatorParamMaps=param_grid,
                            evaluator=evaluator,
                            trainRatio=0.8)

# Fit TrainValidationSplit
model = tvs.fit(train_data_scaled)

# Step 3: Evaluating the recommendation model
# Make predictions
predictions = model.transform(test_data_scaled)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) = " + str(rmse))

# Print best parameters
best_model = model.bestModel
print("Best Rank:", best_model.rank)
print("Best MaxIter:", best_model._java_obj.parent().getMaxIter())
print("Best RegParam:", best_model._java_obj.parent().getRegParam())

# Print success message
print("Music recommendation model trained and evaluated successfully!")


#Phase 3


In [None]:
# app.py
from flask import Flask, render_template, request
from kafka import KafkaProducer, KafkaConsumer
import json

app = Flask(__name__)

# Initialize Kafka producer and consumer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
consumer = KafkaConsumer('user_activity', bootstrap_servers=['localhost:9092'], group_id='recommendation_group')

# Define routes
@app.route('/')
def index():
    return render_template('index.html')

@app.route('/playback', methods=['POST'])
def playback():
    # Capture user activity (e.g., track played)
    track_id = request.form['track_id']

    # Publish user activity to Kafka topic
    producer.send('user_activity', json.dumps({'track_id': track_id}).encode('utf-8'))

    return "Playback recorded successfully."

@app.route('/recommendation')
def recommendation():
    # Consume real-time recommendation from Kafka topic
    recommendation = []
    for message in consumer:
        data = json.loads(message.value.decode('utf-8'))
        recommendation.append(data['recommendation'])
        if len(recommendation) >= 5:
            break

    # Display recommendations to the user
    return render_template('recommendation.html', recommendation=recommendation)

if __name__ == '__main__':
    app.run(debug=True)


#Phase 4

In [None]:
music-recommendation-system/
│
├── data/
│   ├── fma_dataset/          # Extracted FMA dataset
│   ├── fma_metadata.zip     # Metadata zip file
│   └── tracks.csv           # Metadata CSV file
│
├── models/                  # Trained recommendation models
│   ├── collaborative_filtering_model.pkl
│   └── ann_model.pkl
│
├── etl_pipeline.py          # ETL pipeline script
├── train_model.py           # Model training script
├── app.py                   # Flask web application
│
├── templates/               # HTML templates for web app
│   ├── index.html
│   └── recommendations.html
│
└── README.md                # Project documentation
