In [52]:
import pandas as pd 
import numpy as np
import librosa
import matplotlib.pyplot as plt
import os
from transformers import Wav2Vec2Processor, Wav2Vec2Model
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from sklearn.decomposition import PCA

In [53]:
def plot_song(mp3_dir, song_name):
    # Load the mp3 file (Librosa loads audio as a NumPy array)
    y, sr = librosa.load(os.path.join(mp3_dir, song_name), sr=None)

    # Compute the Short-Time Fourier Transform (STFT)
    D = librosa.stft(y)

    # Convert the amplitude to decibels
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)

    # Create the plot
    plt.figure(figsize=(12, 8))
    librosa.display.specshow(S_db, sr=sr, x_axis='time', y_axis='hz')
    plt.colorbar(format='%+2.0f dB')
    plt.title('Spectrogram of the song')
    plt.xlabel('Time')
    plt.ylabel('Frequency (Hz)')
    plt.show()


In [54]:
wav2vec2_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
wav2vec2_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")




In [4]:
def extract_audio_embeddings(file_path,wav2vec2_model, wav2vec2_processor):
    # Load audio file
    print(file_path)
    audio, sr = librosa.load(file_path, sr=16000)  # Use 16kHz as Wav2Vec2 expects 16kHz input
    input_values = wav2vec2_processor(audio, return_tensors="pt", sampling_rate=16000).input_values

    # Pass through Wav2Vec2 model to get hidden-states (embeddings)
    with torch.no_grad():
        embeddings = wav2vec2_model(input_values).last_hidden_state
    
    # Convert embeddings to numpy array (optional)
    embeddings_np = embeddings.squeeze(0).cpu().numpy()
    
    return embeddings_np

In [5]:
file_path = "/home/hyarrava/Song_generator/data/DSP_LOVE_SONGS/dsp_love_song_14.mp3"
embeddings = extract_audio_embeddings(file_path, wav2vec2_model, wav2vec2_processor)


/home/hyarrava/Song_generator/data/DSP_LOVE_SONGS/dsp_love_song_14.mp3


In [6]:
# Function to extract embeddings for all songs in a folder and save them
def extract_embeddings_for_folder(folder_path):
    embeddings_folder = os.path.join(folder_path, "embeddings")
    
    # Create the embeddings folder if it does not exist
    if not os.path.exists(embeddings_folder):
        os.makedirs(embeddings_folder)
    
    # Iterate over all mp3 files in the specified folder
    for song_file in os.listdir(folder_path):
        if song_file.endswith('.mp3'):
            file_path = os.path.join(folder_path, song_file)
            embeddings = extract_audio_embeddings(file_path)
            
            # Save embeddings as .npy file
            embedding_file_path = os.path.join(embeddings_folder, f'{os.path.splitext(song_file)[0]}_embeddings.npy')
            np.save(embedding_file_path, embeddings)
            print(f"Saved embeddings for {song_file} at {embedding_file_path}")



In [7]:
# Specify the folder path for the songs
#music_folder = "/home/hyarrava/Song_generator/data/THAMAN_HIT_SONGS"
#extract_embeddings_for_folder(music_folder)

In [8]:
### Connecting the Embeddings folder

In [9]:
folder_path = "/home/hyarrava/Song_generator/data/"
DSP_LOVE_SONGS_FOLDER = "DSP_LOVE_SONGS/embeddings/"

In [10]:
def save_pca_embeddings(folder_path, dir_genre_folder):
    full_path = os.path.join(folder_path, dir_genre_folder)
    for file_name in os.listdir(full_path):
        file_path = os.path.join(full_path,file_name)
    
        embedding  = np.load(file_path)
        pca = PCA(n_components=10)
        pca_embedding = pca.fit_transform(embedding)
        pca_embedding_file_path = os.path.join(full_path, f'{os.path.splitext(file_path)[0]}_pca.npy')
        np.save(pca_embedding_file_path, pca_embedding)
        

### Data Preparation

In [11]:
def truncate_or_pad(embedding, target_length=1000):
    # Get current length of the embedding
    current_length = embedding.shape[0]
    
    if current_length > target_length:
        # Truncate the embedding if it's too long
        return embedding[:target_length, :]
    else:
        # Pad the embedding if it's too short
        padding = np.zeros((target_length - current_length, embedding.shape[1]))
        return np.vstack((embedding, padding))


def convert_into_short_embeddings(emb_vector, target_length = 1000):
    current_length = emb_vector.shape[0]
    factor = current_length//target_length

    if factor >1:
        return np.mean(emb_vector[:target_length*factor].reshape(target_length, 
                                                                 factor, emb_vector.shape[1]), axis=1)

    else :
        truncate_or_pad(emb_vector, target_length=1000)

In [12]:
def short_embeddings(folder_path, dir_genre_folder):
    full_path = os.path.join(folder_path, dir_genre_folder)
    #stacked_songs_embeddings = np.array()
    for file_name in os.listdir(full_path):
        if file_name.split(".")[0].endswith("embeddings_pca"):
            file_path = os.path.join(full_path,file_name)
            embedding  = np.load(file_path)
            short_embeddings = convert_into_short_embeddings(embedding, target_length)
            print(short_embeddings.shape)
            

In [13]:
def create_embeddings(folder_path, dsp_folder, thaman_folder, target_length):
    dsp_songs = []
    thaman_songs = []

    for file_name in os.listdir(os.path.join(folder_path, dsp_folder)):
        if file_name.split(".")[0].endswith("embeddings_pca"):
            file_path = os.path.join(folder_path, dsp_folder,file_name)
            embedding  = np.load(file_path)
            short_embeddings = convert_into_short_embeddings(embedding, target_length)
            dsp_songs.append(short_embeddings)

    for file_name in os.listdir(os.path.join(folder_path, thaman_folder)):
        if file_name.split(".")[0].endswith("embeddings_pca"):
            file_path = os.path.join(folder_path,thaman_folder, file_name)
            embedding  = np.load(file_path)
            short_embeddings = convert_into_short_embeddings(embedding, target_length)
            thaman_songs.append(short_embeddings)

    X = np.array(dsp_songs + thaman_songs)  # Combine all embeddings
    y = np.array([0] * len(dsp_songs) + [1] * len(thaman_songs))  # Label: 0 for DSP, 1 for Thaman
    
    return X, y
            

In [14]:
folder_path = "/home/hyarrava/Song_generator/data/"
dsp_folder = "DSP_LOVE_SONGS/embeddings/"
thaman_folder = "THAMAN_LOVE_SONGS/embeddings/"
target_length = 1000
X, y = create_embeddings(folder_path, dsp_folder, thaman_folder, target_length)


In [15]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print("Train Data Shape:", X_train.shape)
print("Test Data Shape:", X_test.shape)

Train Data Shape: (50, 1000, 10)
Test Data Shape: (13, 1000, 10)


In [16]:
class SongClassificationModel(nn.Module):
    def __init__(self):
        super(SongClassificationModel, self).__init__()
        
        # Define the layers
        self.flatten = nn.Flatten()  # Flatten the (1000, 10) input to (10000,)
        self.fc1 = nn.Linear(10000, 128)  # First Dense layer (input: 10000, output: 128)
        self.dropout_layer1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(128, 64)     # Second Dense layer (input: 128, output: 64)
        self.dropout_layer2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(64, 32)      # Third Dense layer (input: 64, output: 32)
        self.fc4 = nn.Linear(32, 16)       # Output layer (input: 32, output: 1)
        self.fc5 = nn.Linear(16, 1)
        
    def forward(self, x):
        x = self.flatten(x)
        x = F.relu(self.fc1(x))  # ReLU activation
        x = self.dropout_layer1(x)
        x = F.relu(self.fc2(x))  # ReLU activation
        x = self.dropout_layer2(x)
        x = F.relu(self.fc3(x))  # ReLU activation
        x = F.relu(self.fc4(x))
        x = torch.sigmoid(self.fc5(x))  # Sigmoid for binary classification output
        return x

In [17]:
def prepare_dataset(X_train, y_train, X_test, y_test, batch_size=8):
    # Convert numpy arrays to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)  # Add extra dimension for target
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
    
    # Create DataLoader for batch processing
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader

In [18]:
# Training function with checkpoint saving
def train_model_with_checkpoint(model, train_loader, criterion, optimizer, num_epochs=20, checkpoint_dir='checkpoints/'):
    # Create a directory to save checkpoints if it doesn't exist
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    
    model.train()
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')
        
        # Save the model checkpoint after every epoch
        checkpoint_path = os.path.join(checkpoint_dir, f'model_epoch_{epoch+1}.pth')
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': running_loss / len(train_loader),
        }, checkpoint_path)
        print(f'Model checkpoint saved at {checkpoint_path}')

    return model


In [19]:
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            predicted = (outputs > 0.5).float()  # Binary classification threshold
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')


In [20]:
batch_size = 8
num_epochs = 30
learning_rate = 0.001

# Prepare data (assuming X_train, y_train, X_test, y_test are already prepared as numpy arrays)
train_loader, test_loader = prepare_dataset(X_train, y_train, X_test, y_test, batch_size)

# Initialize model, criterion (loss function), and optimizer
model = SongClassificationModel()
criterion = nn.BCELoss()  # Binary cross-entropy loss for binary classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
model = train_model_with_checkpoint(model, train_loader, criterion, optimizer, num_epochs)
torch.save(model.state_dict(), 'model_weights.pth')
# Evaluate the model
evaluate_model(model, test_loader)

Epoch [1/30], Loss: 0.6875
Model checkpoint saved at checkpoints/model_epoch_1.pth
Epoch [2/30], Loss: 0.5301
Model checkpoint saved at checkpoints/model_epoch_2.pth
Epoch [3/30], Loss: 0.3160
Model checkpoint saved at checkpoints/model_epoch_3.pth
Epoch [4/30], Loss: 0.1844
Model checkpoint saved at checkpoints/model_epoch_4.pth
Epoch [5/30], Loss: 0.0514
Model checkpoint saved at checkpoints/model_epoch_5.pth
Epoch [6/30], Loss: 0.0135
Model checkpoint saved at checkpoints/model_epoch_6.pth
Epoch [7/30], Loss: 0.0035
Model checkpoint saved at checkpoints/model_epoch_7.pth
Epoch [8/30], Loss: 0.0032
Model checkpoint saved at checkpoints/model_epoch_8.pth
Epoch [9/30], Loss: 0.0034
Model checkpoint saved at checkpoints/model_epoch_9.pth
Epoch [10/30], Loss: 0.0005
Model checkpoint saved at checkpoints/model_epoch_10.pth
Epoch [11/30], Loss: 0.0010
Model checkpoint saved at checkpoints/model_epoch_11.pth
Epoch [12/30], Loss: 0.0007
Model checkpoint saved at checkpoints/model_epoch_12.pt

In [21]:
np.save('X_train.npy', X_train)
np.save('y_train.npy', y_train)

# If you have test data
np.save('X_test.npy', X_test)
np.save('y_test.npy', y_test)

## Test unknown audio

In [55]:
test_audio_path = "/home/hyarrava/Song_generator/notebooks/videoplayback.mp3"
test_embeddings = extract_audio_embeddings(test_audio_path, wav2vec2_model, wav2vec2_processor)
pca = PCA(n_components=10)
test_pca_embedding = pca.fit_transform(test_embeddings)
short_pca_emb = convert_into_short_embeddings(test_pca_embedding, target_length = 1000)


/home/hyarrava/Song_generator/notebooks/videoplayback.mp3


In [56]:
model.eval()
test_input = torch.tensor(short_pca_emb, dtype = torch.float32)
test_tensor = test_input.permute(1,0).unsqueeze(0) 

with torch.no_grad():
    output = model(test_tensor)

In [57]:
predicted_label = (output > 0.5).float()
if predicted_label.item() == 1.0:
    print("Predicted Label: DSP")
else:
    print("Predicted Label: Thaman")

Predicted Label: DSP


In [58]:
output

tensor([[0.6272]])