In [None]:
import os
import librosa
import numpy as np
import json
import joblib
import matplotlib.pyplot as plt

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
from collections import Counter

SAMPLE_RATE = 22050
MIN_SAMPLES_PER_RAGA = 3

filename_raga_map = {
    "Varashiki Vahana": "Hamsadhwani",
    "Bhuvini Dasudane": "Sriranjani",
    # Add more mappings as needed
}

def extract_audio_features(file_path):
    try:
        y, sr = librosa.load(file_path, sr=SAMPLE_RATE)
        mfcc = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1)
        chroma = np.mean(librosa.feature.chroma_stft(y=y, sr=sr), axis=1)
        contrast = np.mean(librosa.feature.spectral_contrast(y=y, sr=sr), axis=1)
        zcr = np.mean(librosa.feature.zero_crossing_rate(y=y), axis=1)
        return np.hstack([mfcc, chroma, contrast, zcr])
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

def get_raga_label(file_path):
    json_path = file_path.replace(".mp3", ".json")
    if os.path.exists(json_path):
        try:
            with open(json_path) as f:
                meta = json.load(f)
                if "raaga" in meta and len(meta["raaga"]) > 0:
                    return meta["raaga"][0]["name"]
        except:
            pass
    for key, raga in filename_raga_map.items():
        if key.lower() in file_path.lower():
            return raga
    return None

def load_dataset(data_dir):
    X, y = [], []
    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith(".mp3"):
                full_path = os.path.join(root, file)
                features = extract_audio_features(full_path)
                if features is not None:
                    raga = get_raga_label(full_path)
                    if raga:
                        X.append(features)
                        y.append(raga)
    X = np.array(X)
    y = np.array(y)
    print(f"Loaded {len(X)} samples")
    print(f"Unique ragas: {len(set(y))}")
    print(f"Raga distribution: {Counter(y)}")
    return X, y

def filter_rare_ragas(X, y, min_count=3):
    counts = Counter(y)
    Xf, yf = [], []
    for xi, yi in zip(X, y):
        if counts[yi] >= min_count:
            Xf.append(xi)
            yf.append(yi)
    print(f"Filtered to {len(Xf)} samples and {len(set(yf))} ragas with ≥{min_count} samples")
    return np.array(Xf), np.array(yf)

def train_model(X_train, y_train):
    clf = RandomForestClassifier(n_estimators=150, max_depth=25, random_state=42)
    clf.fit(X_train, y_train)
    return clf

def predict_raga(audio_file, model):
    features = extract_audio_features(audio_file)
    if features is not None:
        return model.predict([features])[0]
    return None

def run_full_pipeline(dataset_path, test_file_path=None):
    print("Starting Raga Identification Pipeline...\n")

    X, y = load_dataset(dataset_path)
    X, y = filter_rare_ragas(X, y, MIN_SAMPLES_PER_RAGA)
    if len(X) == 0:
        print("Not enough data to train.")
        return

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    model = train_model(X_train, y_train)

    y_pred = model.predict(X_test)
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, zero_division=0))
    ConfusionMatrixDisplay.from_estimator(model, X_test, y_test, xticks_rotation='vertical')
    plt.tight_layout()
    plt.show()

    if test_file_path and os.path.exists(test_file_path):
        prediction = predict_raga(test_file_path, model)
        print(f"\nPredicted Raga for '{os.path.basename(test_file_path)}': {prediction}")
    else:
        print("\nNo valid test file provided.")

# Paths
dataset_path = r"D:\carnatic\carnatic"
test_file = r"D:\carnatic\carnatic\1\Cherthala Ranganatha Sharma - Bhuvini Dasudane.mp3"

run_full_pipeline(dataset_path, test_file)


🎼 Starting Raga Identification Pipeline...


  "cipher": algorithms.TripleDES,
  "class": algorithms.Blowfish,
  "class": algorithms.TripleDES,


⚠️ No raga info for Mahati - Gopi Gopala Bala.mp3, skipping.
⚠️ No raga info for Chaitra Sairam - Ardhanareeshwaram.mp3, skipping.
⚠️ No raga info for Chaitra Sairam - Gange Maampahi.mp3, skipping.
⚠️ No raga info for Kuldeep Pai - Gange Maam Pahi.mp3, skipping.
⚠️ No raga info for Kuldeep Pai - Vasudeva Sutam Devam.mp3, skipping.
⚠️ No raga info for Sanjay Subrahmanyan - Teerthakarayinile.mp3, skipping.
⚠️ No raga info for KP Nandini - Tillana.mp3, skipping.


In [3]:
!pip install xgboost


Collecting xgboost
  Downloading xgboost-3.0.0-py3-none-win_amd64.whl.metadata (2.1 kB)
Downloading xgboost-3.0.0-py3-none-win_amd64.whl (150.0 MB)
   ---------------------------------------- 0.0/150.0 MB ? eta -:--:--
   ---------------------------------------- 0.8/150.0 MB 4.8 MB/s eta 0:00:32
   ---------------------------------------- 1.6/150.0 MB 4.7 MB/s eta 0:00:32
    --------------------------------------- 2.6/150.0 MB 4.7 MB/s eta 0:00:32
    --------------------------------------- 3.1/150.0 MB 4.5 MB/s eta 0:00:33
   - -------------------------------------- 4.2/150.0 MB 4.3 MB/s eta 0:00:35
   - -------------------------------------- 5.0/150.0 MB 4.1 MB/s eta 0:00:36
   - -------------------------------------- 5.8/150.0 MB 4.1 MB/s eta 0:00:35
   - -------------------------------------- 6.8/150.0 MB 4.2 MB/s eta 0:00:35
   -- ------------------------------------- 7.6/150.0 MB 4.1 MB/s eta 0:00:35
   -- ------------------------------------- 8.4/150.0 MB 4.1 MB/s eta 0:00:35
 

In [None]:
import os
import librosa
import numpy as np
import json
import joblib
import matplotlib.pyplot as plt

from collections import Counter
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, ConfusionMatrixDisplay

SAMPLE_RATE = 22050
MIN_SAMPLES_PER_RAGA = 3

def build_filename_raga_map(data_dir):
    raga_map = {}
    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith(".json"):
                full_path = os.path.join(root, file)
                try:
                    with open(full_path) as f:
                        meta = json.load(f)
                        if "raaga" in meta and len(meta["raaga"]) > 0:
                            raga_name = meta["raaga"][0]["name"]
                            base_name = os.path.splitext(file)[0].lower()
                            raga_map[base_name] = raga_name
                except:
                    pass
    return raga_map

def get_raga_label(file_path, filename_raga_map):
    base_name = os.path.splitext(os.path.basename(file_path))[0].lower()
    return filename_raga_map.get(base_name, None)

def extract_audio_features(file_path):
    try:
        y, sr = librosa.load(file_path, sr=SAMPLE_RATE)
        mfcc = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1)
        chroma = np.mean(librosa.feature.chroma_stft(y=y, sr=sr), axis=1)
        contrast = np.mean(librosa.feature.spectral_contrast(y=y, sr=sr), axis=1)
        zcr = np.mean(librosa.feature.zero_crossing_rate(y=y), axis=1)
        mfcc_delta = np.mean(librosa.feature.delta(mfcc), axis=0)
        mfcc_delta2 = np.mean(librosa.feature.delta(mfcc, order=2), axis=0)
        tonnetz = np.mean(librosa.feature.tonnetz(y=y, sr=sr), axis=1)
        pitches, _ = librosa.piptrack(y=y, sr=sr)
        pitches = pitches[pitches > 0]
        pitch_features = [
            np.mean(pitches) if len(pitches) > 0 else 0,
            np.std(pitches) if len(pitches) > 0 else 0,
            np.median(pitches) if len(pitches) > 0 else 0
        ]
        return np.hstack([mfcc, chroma, contrast, zcr, mfcc_delta, mfcc_delta2, tonnetz, pitch_features])
    except:
        return None

def load_dataset(data_dir):
    X, y = [], []
    filename_raga_map = build_filename_raga_map(data_dir)
    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith(".mp3"):
                full_path = os.path.join(root, file)
                features = extract_audio_features(full_path)
                if features is not None:
                    raga = get_raga_label(full_path, filename_raga_map)
                    if raga:
                        X.append(features)
                        y.append(raga)
    X = np.array(X)
    y = np.array(y)
    print(f"Loaded {len(X)} samples")
    print(f"Unique ragas: {len(set(y))}")
    print(f"Raga distribution: {Counter(y)}")
    return X, y

def filter_rare_ragas(X, y, min_count=3):
    counts = Counter(y)
    Xf, yf = [], []
    for xi, yi in zip(X, y):
        if counts[yi] >= min_count:
            Xf.append(xi)
            yf.append(yi)
    print(f"Filtered to {len(Xf)} samples and {len(set(yf))} ragas with ≥{min_count} samples")
    return np.array(Xf), np.array(yf)

def train_model(X_train, y_train):
    clf = XGBClassifier(n_estimators=150, max_depth=10, learning_rate=0.1,
                        use_label_encoder=False, eval_metric='mlogloss', random_state=42)
    clf.fit(X_train, y_train)
    return clf

def predict_raga(audio_file, model, scaler, show_pitch_plot=True):
    features = extract_audio_features(audio_file)
    if features is not None:
        features_scaled = scaler.transform([features])
        prediction = model.predict(features_scaled)[0]

        if show_pitch_plot:
            try:
                y, sr = librosa.load(audio_file, sr=SAMPLE_RATE)
                pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
                pitch_track = []
                for t in range(pitches.shape[1]):
                    index = magnitudes[:, t].argmax()
                    pitch = pitches[index, t]
                    pitch_track.append(pitch if pitch > 0 else np.nan)
                plt.figure(figsize=(12, 4))
                plt.plot(pitch_track, label='Pitch Contour', color='magenta')
                plt.xlabel("Frame Index")
                plt.ylabel("Frequency (Hz)")
                plt.title(f"Pitch Contour of '{os.path.basename(audio_file)}'")
                plt.grid(True)
                plt.legend()
                plt.tight_layout()
                plt.show()
            except Exception as e:
                print(f"Failed to generate pitch plot: {e}")
        return prediction
    return None

def run_full_pipeline(dataset_path, test_file_path=None):
    print("Starting Raga Identification Pipeline...\n")
    X, y = load_dataset(dataset_path)
    X, y = filter_rare_ragas(X, y, MIN_SAMPLES_PER_RAGA)
    if len(X) == 0:
        print("Not enough data to train.")
        return

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    model = train_model(X_train_scaled, y_train)

    y_pred = model.predict(X_test_scaled)
    print("\nClassification Report:\n")
    print(classification_report(y_test, y_pred, zero_division=0))
    ConfusionMatrixDisplay.from_estimator(model, X_test_scaled, y_test, xticks_rotation='vertical')
    plt.tight_layout()
    plt.show()

    joblib.dump({'model': model, 'scaler': scaler}, "raga_model_pipeline.pkl")
    print("Model and scaler saved to 'raga_model_pipeline.pkl'")

    if test_file_path and os.path.exists(test_file_path):
        prediction = predict_raga(test_file_path, model, scaler, show_pitch_plot=True)
        print(f"\nPredicted Raga for '{os.path.basename(test_file_path)}': {prediction}")
    else:
        print("\nNo test file provided or file not found.")

# Set paths
dataset_path = r"D:\carnatic\carnatic"
test_file = r"D:\carnatic\carnatic\1\Cherthala Ranganatha Sharma - Bhuvini Dasudane.mp3"

run_full_pipeline(dataset_path, test_file)


Starting Raga Identification Pipeline...



  "cipher": algorithms.TripleDES,
  "class": algorithms.Blowfish,
  "class": algorithms.TripleDES,
