In [21]:
import numpy as np
import os
from scipy.io import wavfile
from scipy.signal import get_window
from scipy.fft import rfft, dct
from scipy.signal import stft
import matplotlib.pyplot as plt
import seaborn as sns

In [22]:
# Step 1: Load Audio Files

def load_wav_file(file_path):
    sample_rate, audio_data = wavfile.read(file_path)
    return sample_rate, audio_data

def load_audio_files(directory):
    features = []
    track_names = []
    
    for root, _, files in os.walk(directory):
        for filename in files:
            file_path = os.path.join(root, filename)
            if filename.endswith('.wav'):
                try:
                    sample_rate, audio_data = load_wav_file(file_path)
                    if audio_data.ndim == 1:  # Check for mono audio
                        features.append(audio_data)
                        track_names.append(filename)
                    else:
                        print(f"Skipping non-mono audio file: {file_path}")
                except ValueError as e:
                    print(f"Error reading {file_path}: {e}")
                except Exception as e:
                    print(f"Unexpected error with {file_path}: {e}")

    # Convert features to a 2D array if possible
    if features:
        # Pad the features to have the same length
        max_length = max(len(f) for f in features)
        padded_features = np.array([np.pad(f, (0, max_length - len(f)), 'constant') for f in features])
    else:
        padded_features = np.empty((0,))  # Handle the case with no valid features

    return padded_features, track_names

# Step 2: Feature Extraction

def compute_mfcc(audio_data, sample_rate, num_coefficients=13):
    frame_size = 2048
    hop_size = 512
    num_frames = (len(audio_data) - frame_size) // hop_size + 1
    hamming_window = get_window('hamming', frame_size)

    mfccs = []
    
    for i in range(num_frames):
        frame = audio_data[i * hop_size:i * hop_size + frame_size]
        windowed_frame = frame * hamming_window
        spectrum = np.abs(rfft(windowed_frame))**2
        
        mel_filters = np.zeros((num_coefficients, len(spectrum)))
        mel_bins = np.linspace(0, len(spectrum) - 1, num_coefficients + 2).astype(int)

        for j in range(num_coefficients):
            mel_filters[j, mel_bins[j]:mel_bins[j + 1]] = 1
        
        mel_energy = np.dot(mel_filters, spectrum)
        mel_energy = np.log(mel_energy + 1e-10)
        mfcc = dct(mel_energy, type=2, norm='ortho')
        mfccs.append(mfcc)
    
    #print("MFCCS DONE")
    
    return np.mean(mfccs, axis=0)

def compute_chroma_features(audio_data, sample_rate):
    # Compute the short-time Fourier transform (STFT)
    f, t, Zxx = stft(audio_data, fs=sample_rate, nperseg=2048, noverlap=512)
    magnitude = np.abs(Zxx)
    
    # Chroma filter bank
    num_chroma_bins = 12
    chroma = np.zeros((num_chroma_bins, magnitude.shape[1]))
    
    # Map frequencies to chroma bins
    for i in range(len(f)):
        bin_freq = f[i]
        if bin_freq > 0:  # Avoid log(0)
            pitch_class = int(12 * np.log2(bin_freq / 440.0)) % 12
            if 0 <= pitch_class < num_chroma_bins:
                chroma[pitch_class] += magnitude[i, :]
    
    #print("Chroma DONE")
    
    return np.mean(chroma, axis=1)

def compute_spectral_contrast(audio_data):
    spectrum = np.abs(rfft(audio_data))**2
    # Use slicing that maintains the same length
    peaks = np.where((spectrum[:-2] < spectrum[1:-1]) & (spectrum[1:-1] > spectrum[2:]))[0] + 1
    valleys = np.where((spectrum[:-2] > spectrum[1:-1]) & (spectrum[1:-1] < spectrum[2:]))[0] + 1
    
    # Avoid empty arrays when calculating means
    if peaks.size > 0 and valleys.size > 0:
        contrast = np.mean(spectrum[peaks]) - np.mean(spectrum[valleys])
    else:
        contrast = 0  # Default value if no peaks or valleys found

    #print("Contrast Done")
    
    return contrast

def compute_tempo(audio_data):
    frame_size = 2048
    hop_size = 512
    num_frames = (len(audio_data) - frame_size) // hop_size + 1
    energy = np.array([np.sum(audio_data[i * hop_size:i * hop_size + frame_size] ** 2) for i in range(num_frames)])

    #print("Tempo DONE")
    
    return np.mean(energy)

def extract_features(audio_data, sample_rate):
    mfccs = compute_mfcc(audio_data, sample_rate)
    chroma = compute_chroma_features(audio_data, sample_rate)
    spectral_contrast = compute_spectral_contrast(audio_data)
    tempo = compute_tempo(audio_data)
    
    return np.concatenate([mfccs, chroma, [spectral_contrast], [tempo]])


# Step 3: Similarity Checks

def euclidean_distance(a, b):
    return np.sqrt(np.sum((a - b) ** 2))

def manhattan_distance(a, b):
    return np.sum(np.abs(a - b))

def cosine_distance(a, b):
    dot_product = np.dot(a, b)
    norm_a = np.linalg.norm(a)
    norm_b = np.linalg.norm(b)
    if norm_a == 0 or norm_b == 0:
        return 1.0
        
    cosine_similarity = dot_product / (norm_a * norm_b)
    return 1 - cosine_similarity  # Convert similarity to distance

def dtw(a, b):
    # Initialize the cost matrix
    n = len(a)
    m = len(b)
    cost = np.zeros((n + 1, m + 1))

    # Initialize the cost matrix with infinity
    cost[0, :] = np.inf
    cost[:, 0] = np.inf
    cost[0, 0] = 0

    # Populate the cost matrix
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            # Calculate the cost
            dist = abs(a[i - 1] - b[j - 1])
            cost[i, j] = dist + min(cost[i - 1, j],    # Insertion
                                    cost[i, j - 1],    # Deletion
                                    cost[i - 1, j - 1] # Match
                                   )

    # Return the last element of the cost matrix, which contains the DTW distance
    return cost[n, m]

def compute_distances(input_features, features):
    euclidean_distances = np.array([euclidean_distance(input_features, feature) for feature in features])
    manhattan_distances = np.array([manhattan_distance(input_features, feature) for feature in features])
    cosine_distances = np.array([cosine_distance(input_features, feature) for feature in features])
    dtw_distances = np.array([dtw(input_features, feature) for feature in features])
    
    return euclidean_distances, manhattan_distances, cosine_distances, dtw_distances

def whiten(features):
    # Center the features by subtracting the mean
    mean = np.mean(features, axis=0)
    centered_features = features - mean
    
    # Calculate the covariance matrix
    covariance_matrix = np.cov(centered_features, rowvar=False)
    
    # Add a small value to the diagonal to ensure numerical stability
    epsilon = 1e-5
    covariance_matrix += epsilon * np.eye(covariance_matrix.shape[0])
    
    # Use SVD to compute the whitening matrix
    U, S, Vt = np.linalg.svd(covariance_matrix)
    
    # Create the whitening matrix
    whitening_matrix = np.dot(U, np.diag(1.0 / np.sqrt(S + epsilon))).dot(U.T)
    
    # Apply the whitening transformation
    whitened_features = np.dot(centered_features, whitening_matrix)
    
    return whitened_features

# Step 4: Recommendation System

def recommend_tracks_by_distance(input_features, features, track_names, N=5):
    # First, whiten the features
    all_features = np.vstack([input_features, features])
    whitened_features = whiten(all_features)

    # Extract the whitened input features and features
    whitened_input_features = whitened_features[0]
    whitened_features = whitened_features[1:]

    #Compute the distances
    #euclidean_distances, manhattan_distances, cosine_distances, dtw_distances = compute_distances(input_features, features)
    euclidean_distances, manhattan_distances, cosine_distances, dtw_distances = compute_distances(whitened_input_features, whitened_features)
    
    # Get the indices of the N closest tracks for each distance metric
    closest_euclidean_indices = np.argsort(euclidean_distances)[:N]
    closest_manhattan_indices = np.argsort(manhattan_distances)[:N]
    closest_cosine_indices = np.argsort(cosine_distances)[:N]
    closest_dtw_indices = np.argsort(dtw_distances)[:N]
    
    recommended_euclidean = [track_names[i] for i in closest_euclidean_indices]
    recommended_manhattan = [track_names[i] for i in closest_manhattan_indices]
    recommended_cosine = [track_names[i] for i in closest_cosine_indices]
    recommended_dtw = [track_names[i] for i in closest_dtw_indices]

    return recommended_euclidean, recommended_manhattan, recommended_cosine, recommended_dtw




In [23]:
import pandas as pd

def save_features_to_csv(features, track_names, file_name='audio_features.csv'):
    df = pd.DataFrame(features)
    df['track_name'] = track_names
    df.to_csv(file_name, index=False)

def load_features_from_csv(file_name='audio_features.csv'):
    df = pd.read_csv(file_name)
    features = df.iloc[:, :-1].values  # All columns except the last one
    track_names = df['track_name'].tolist()  # Last column as track names
    return features, track_names

In [24]:
# Step 5: Main Function for Recommendation Based on Input Track

def recommend_based_on_input(input_file, audio_directory):
    # Load input audio file
    sample_rate, input_audio_data = load_wav_file(input_file)
    if input_audio_data is None:
        return  # Exit if loading failed
    
    # Extract features for the input audio
    input_features = extract_features(input_audio_data, sample_rate)
    
    # Load features from CSV if available, otherwise extract and save
    file_name = "C:/Users/nitin/OneDrive/Desktop/UNI/IITB/CS725-FML/Project/audio_features.csv"

    print("Input Data Done")

    count=0
    
    if os.path.exists(file_name):
        features, track_names = load_features_from_csv(file_name)
    else:
        audio_data_list, track_names = load_audio_files(audio_directory)

        # Initialize an empty list to hold the extracted features
        features_list = []
        for audio_data in audio_data_list:
            if audio_data is not None:
                extracted_features = extract_features(audio_data, sample_rate)
                features_list.append(extracted_features)
                count = count + 1
                if((count%10) == 0):
                    print(f"File Number {count} done....")
        
        features = np.array(features_list)        
        save_features_to_csv(features, track_names, file_name)

    print("Feature Extraction Done")
    
    recommended_euclidean, recommended_manhattan, recommended_cosine, recommended_dtw = recommend_tracks_by_distance(input_features, features, track_names, N=5)
    
    return recommended_euclidean, recommended_manhattan, recommended_cosine, recommended_dtw


In [25]:
# Example Usage
if __name__ == "__main__":
    input_audio_file = "C:/Users/nitin/OneDrive/Desktop/UNI/IITB/CS725-FML/Project/Data/genres_original/pop/pop.00001.wav"
    audio_directory = "C:/Users/nitin/OneDrive/Desktop/UNI/IITB/CS725-FML/Project/Data/genres_original"
    recommendations_euclidean, recommendations_manhattan, recommendations_cosine, recommendations_dtw = recommend_based_on_input(input_audio_file, audio_directory)
    print("Recommended Tracks (Euclidean):", recommendations_euclidean)
    print("Recommended Tracks (Manhattan):", recommendations_manhattan)
    print("Recommended Tracks (Cosine):", recommendations_cosine)
    print("Recommended Tracks (DTW):", recommendations_dtw)

Input Data Done
Feature Extraction Done
Recommended Tracks (Euclidean): ['hiphop.00067.wav', 'pop.00001.wav', 'jazz.00017.wav', 'classical.00058.wav', 'reggae.00032.wav']
Recommended Tracks (Manhattan): ['hiphop.00067.wav', 'classical.00058.wav', 'jazz.00017.wav', 'reggae.00032.wav', 'pop.00001.wav']
Recommended Tracks (Cosine): ['pop.00001.wav', 'pop.00074.wav', 'pop.00059.wav', 'hiphop.00003.wav', 'pop.00088.wav']
Recommended Tracks (DTW): ['hiphop.00067.wav', 'classical.00058.wav', 'jazz.00017.wav', 'reggae.00032.wav', 'pop.00001.wav']
