In [1]:
import os
import json
import time
import librosa
import numpy as np
from sklearn.ensemble import RandomForestClassifier
import pickle

# Define your folders
UPLOADS_FOLDER = "uploads/"
RESULTS_FOLDER = "results/"

# Note mapping with frequency ranges
note_mapping = {
    'sa': (240, 260),
    're': (260, 290),
    're#': (290, 310),
    'ga': (310, 330),
    'ma': (330, 370),
    'ma#': (370, 390),
    'pa': (390, 420),
    'dha': (420, 470),
    'ni': (470, 520),
    'ni#': (520, 540)
}

# Extract features from audio
def extract_features(audio_path, sr=22050, hop_length=512):
    y, _ = librosa.load(audio_path, sr=sr)
    stft = librosa.stft(y, n_fft=2048, hop_length=hop_length)
    pitches, magnitudes = librosa.piptrack(S=np.abs(stft), sr=sr)
    features = []

    for i in range(pitches.shape[1]):
        pitch_slice = pitches[:, i]
        mag_slice = magnitudes[:, i]
        if mag_slice.any():
            freq = pitch_slice[np.argmax(mag_slice)]
            if freq > 0:
                features.append(freq)
    return features

# Detect notes in an audio file
def detect_notes(audio_path, model_path="note_classifier.pkl"):
    with open(model_path, "rb") as f:
        model = pickle.load(f)

    features = extract_features(audio_path)
    if not features:
        return []

    predicted_notes = model.predict(np.array(features).reshape(-1, 1))
    return predicted_notes

# Process new files
def process_file(file_name, model_path="note_classifier.pkl"):
    audio_path = os.path.join(UPLOADS_FOLDER, file_name)
    output_file = os.path.join(RESULTS_FOLDER, f"{os.path.splitext(file_name)[0]}.json")

    # Detect notes
    predicted_notes = detect_notes(audio_path, model_path)
    time_per_frame = 512 / 22050  # Hop length / sample rate

    # Create note durations
    detected_notes = []
    previous_note = None
    note_start_time = 0

    for i, note in enumerate(predicted_notes):
        current_time = i * time_per_frame
        if note != previous_note:
            if previous_note is not None:
                duration = current_time - note_start_time
                detected_notes.append({
                    "note": previous_note,
                    "start_time": round(note_start_time, 2),
                    "duration": round(duration, 2)
                })
            previous_note = note
            note_start_time = current_time

    # Add the last note
    if previous_note is not None:
        duration = len(predicted_notes) * time_per_frame - note_start_time
        detected_notes.append({
            "note": previous_note,
            "start_time": round(note_start_time, 2),
            "duration": round(duration, 2)
        })

    # Save to JSON
    with open(output_file, "w") as f:
        json.dump(detected_notes, f, indent=4)

    print(f"Processed {file_name}, results saved to {output_file}")

# Monitor uploads folder
def monitor_uploads():
    processed_files = set()

    while True:
        files = os.listdir(UPLOADS_FOLDER)
        for file in files:
            if file not in processed_files and file.endswith(".wav"):
                print(f"Processing new file: {file}")
                process_file(file)
                processed_files.add(file)
        time.sleep(5)  # Check for new files every 5 seconds

if __name__ == "__main__":
    # Ensure results folder exists
    os.makedirs(UPLOADS_FOLDER, exist_ok=True)
    os.makedirs(RESULTS_FOLDER, exist_ok=True)

    # Train the model once (if needed)
    if not os.path.exists("note_classifier.pkl"):
        print("Training model...")
        # Add your model training code here

    # Start monitoring the uploads folder
    print("Monitoring uploads folder...")
    monitor_uploads()


Monitoring uploads folder...


KeyboardInterrupt: 