<a href="https://colab.research.google.com/github/Cinnamorix/OS_Project_DEMOs/blob/main/FinalProject_OS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
#Cell 1
!pip install librosa tqdm
!pip install joblib



In [12]:
#Cell 2
import kagglehub

# Download latest version
path = kagglehub.dataset_download("carlthome/gtzan-genre-collection")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/gtzan-genre-collection


In [13]:
#Cell 3
audio_root = f"{path}/genres"
print(audio_root)

/kaggle/input/gtzan-genre-collection/genres


In [14]:
#Cell 4
import os
import librosa
import pandas as pd
import numpy as np
import time
import joblib
from concurrent.futures import ThreadPoolExecutor
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler

#Config
AUDIO_ROOT = "/kaggle/input/gtzan-genre-collection/genres"
OUTPUT_CSV = "/content/audio_features_augmented.csv"
FEATURE_CACHE_DIR = "/content/feature_cache"
os.makedirs(FEATURE_CACHE_DIR, exist_ok=True)

#feature

def extract_features(y, sr, filename, genre, tag="original"):
    try:
        tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
        pitch = librosa.yin(y, fmin=60, fmax=400, sr=sr).mean()
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
        mfcc_mean = mfcc.mean(axis=1)

        row = {
            "filename": f"{filename}_{tag}",
            "genre": genre,
            "bpm": tempo,
            "pitch": pitch,
        }
        for i, val in enumerate(mfcc_mean):
            row[f"mfcc_{i}"] = val

        return row
    except Exception as e:
        print(f"❌ Feature extraction failed: {filename}: {e}")
        return None

# process file + augmentations (with cache)

def process_file(args):
    file_path, genre = args
    data = []
    try:
        y, sr = librosa.load(file_path)
        base_name = os.path.splitext(os.path.basename(file_path))[0]
        tags = ["original", "stretch", "pitch", "noise"]
        versions = [
            y,
            librosa.effects.time_stretch(y, rate=0.9),
            librosa.effects.pitch_shift(y, sr=sr, n_steps=2),
            y + 0.005 * np.random.randn(len(y))
        ]

        for tag, version in zip(tags, versions):
            cache_path = os.path.join(FEATURE_CACHE_DIR, f"{base_name}_{tag}.pkl")
            if os.path.exists(cache_path):
                row = joblib.load(cache_path)
            else:
                row = extract_features(version, sr, base_name, genre, tag=tag)
                if row:
                    joblib.dump(row, cache_path)
            if row:
                data.append(row)

    except Exception as e:
        print(f"❌ Error processing file {file_path}: {e}")
    return data

# task list

tasks = []
start = time.time()
for genre in os.listdir(AUDIO_ROOT):
    genre_path = os.path.join(AUDIO_ROOT, genre)
    if os.path.isdir(genre_path):
        for file in os.listdir(genre_path):
            if file.endswith(".au") or file.endswith(".wav"):
                tasks.append((os.path.join(genre_path, file), genre))


print(f"📂 Processing {len(tasks)} files...")
start = time.time()
data = []
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
    results = executor.map(process_file, tasks)
    for res in results:
        data.extend(res)
end = time.time()
print(f"Processing finish in {end - start:.2f} Sec")

#DataFrame
print("✅ Extracting to CSV...")
df = pd.DataFrame(data)
df.to_csv(OUTPUT_CSV, index=False)

📂 Processing 1000 files...
Processing finish in 2148.05 Sec
✅ Extracting to CSV...


In [15]:
#Cell 5
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler

#Load data
df = pd.read_csv("/content/audio_features_augmented.csv")

#Convert columns to numeric (handle list-in-string like "[107.6]")
df["bpm"] = df["bpm"].apply(lambda x: eval(x)[0] if isinstance(x, str) and "[" in x else x)

#Convert all relevant columns to float safely
columns_to_convert = ["bpm", "pitch"] + [f"mfcc_{i}" for i in range(13)]
for col in columns_to_convert:
    df[col] = pd.to_numeric(df[col], errors='coerce')

#Drop rows with missing values in selected columns
df = df.dropna(subset=columns_to_convert)

#Define playlist classification rule-based labels
def classify_playlist(row):
    labels = []
    if row["bpm"] > 130 and row["mfcc_0"] > -150:
        labels.append("workout")
    if row["pitch"] < 120 and row["mfcc_1"] < 0:
        labels.append("study")
    if row["bpm"] < 100 and row["mfcc_2"] < 5:
        labels.append("chill")
    if row["mfcc_0"] > -90 and row["bpm"] > 100:
        labels.append("party")
    if row["bpm"] > 80 and row["mfcc_0"] < -100:
        labels.append("focus")
    if row["mfcc_1"] > 5 and row["pitch"] > 130:
        labels.append("dance")
    if row["bpm"] > 120 and row["mfcc_3"] > 0:
        labels.append("energetic")
    if row["pitch"] > 150 and row["mfcc_5"] < 3:
        labels.append("mood_boost")
    if row["bpm"] < 90 and row["mfcc_4"] < -2:
        labels.append("relax")
    if row["mfcc_2"] > 10 and row["pitch"] < 100:
        labels.append("ambient")
    if row["bpm"] < 60 and row["mfcc_0"] < -200:
        labels.append("sleep")

    # If no labels are assigned, return 'relax' as a default
    if not labels:
        return "relax"
    return labels[0]  # In case multiple labels apply, choose the first one

#Apply label function
df["playlist"] = df.apply(classify_playlist, axis=1)


#Features and target
X = df[columns_to_convert]
y = df["playlist"]

#Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

#Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

#Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

#Predict and evaluate
y_pred = model.predict(X_test)

# 📊 Report
print("🎯 Classification Report:")
print(classification_report(y_test, y_pred))

#Predict for the entire dataset
y_prob = model.predict_proba(X_scaled)

#Select the highest probability label for each song
df["playlist_predicted"] = [model.classes_[prob.argmax()] for prob in y_prob]

#Calculate probabilities
df["playlist_probabilities"] = [max(prob) for prob in y_prob]

#Save to new CSV
df.to_csv("/content/audio_features_with_single_playlist_prediction.csv", index=False)
print("✅ Saved: /content/audio_features_with_single_playlist_prediction.csv")


🎯 Classification Report:
              precision    recall  f1-score   support

     ambient       1.00      0.25      0.40         8
       chill       1.00      1.00      1.00       156
       dance       1.00      0.57      0.73        14
   energetic       1.00      0.60      0.75         5
       focus       0.99      1.00      0.99       393
       party       0.94      1.00      0.97       109
       relax       0.74      0.82      0.78        28
       study       1.00      1.00      1.00         1
     workout       1.00      0.99      0.99        86

    accuracy                           0.97       800
   macro avg       0.96      0.80      0.85       800
weighted avg       0.98      0.97      0.97       800

✅ Saved: /content/audio_features_with_single_playlist_prediction.csv


In [16]:
#Cell 6
import joblib
# Save model
joblib.dump(model, "playlist_classifier_model.joblib")
# Save scaler
joblib.dump(scaler, "scaler.joblib")
print("✅ Model and scaler saved.")

✅ Model and scaler saved.
