In [1]:
import pandas as pd
import numpy as np
import librosa
import os

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
import torch.optim as optim
import torch.nn.functional as F


In [21]:
avg = pd.read_csv('/Users/rgu/Desktop/UROPs/UROP4/repo/dataframes/avg_diego_47.csv')
display(avg)

Unnamed: 0,song_id,avg_weight0,avg_weight1,avg_weight2,avg_water_loss1,avg_water_loss2,avg_total_water_loss
0,0,2.1533,2.10955,2.09665,0.04375,0.0129,0.05665
1,1,2.12135,2.08655,2.07245,0.0348,0.0141,0.0489
2,2,1.9158,1.8681,1.86085,0.0477,0.00725,0.05495
3,3,1.82995,1.7755,1.7639,0.05445,0.0116,0.06605
4,4,2.44275,2.3798,2.3705,0.06295,0.0093,0.07225
5,5,2.212,2.16675,2.15565,0.04525,0.0111,0.05635
6,6,2.43565,2.39155,2.3801,0.0441,0.01145,0.05555
7,7,2.48585,2.44075,2.42585,0.0451,0.0149,0.06
8,8,3.2733,3.21245,3.1937,0.06085,0.01875,0.0796
9,9,3.2415,3.19515,3.17785,0.04635,0.0173,0.06365


In [18]:
def load_audio_files(paths, sr = 22050, fixed_length = 1323000):
    '''
    loads audios from audio path for training/testing
    '''
    audio_data = []
    for filepath in paths:
        y, sr = librosa.load(filepath, sr=sr)
        if len(y) < fixed_length: #ensures audios are same length
            y = np.pad(y, (0, fixed_length - len(y)), 'constant')
        else:
            y = y[:fixed_length]

        audio_data.append(y)

    #print(audio_data)
    return np.array(audio_data)

In [4]:
class AudioDataset(Dataset):
    def __init__(self, audio_data, targets):
        self.audio_data = audio_data
        self.targets = targets

    def __len__(self):
        return len(self.audio_data)
    
    def __getitem__(self, idx):
        audio = self.audio_data[idx]
        target = self.targets[idx]
        return torch.tensor(audio, dtype=torch.float32), torch.tensor(target, dtype=torch.float32)

In [19]:
#load audio to an array
path = '/Users/rgu/Desktop/UROPs/UROP4/diego_100_audios'
audio_files = [os.path.join(path, filename) for filename in os.listdir(path) if filename.lower().endswith('.mp3')]
audio_data = load_audio_files(audio_files)

In [30]:
audio_data = audio_data[:47] #REMOVE THIS AFTER EXPERIMENTS, we only have 47 datapoints atm

In [31]:
water_loss = np.array(avg['avg_water_loss1'])  # TRAINING ON WATER LOSS OF MINUTE 1

#create dataset and data loader
train_size = int(len(audio_data)* 0.8)
test_size = len(audio_data) - train_size

dataset = AudioDataset(audio_data, water_loss) #because we only have 47 water losses
train_set, test_set = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_set, batch_size=2, shuffle=True) 
test_loader = DataLoader(test_set, batch_size=2, shuffle=False)

In [32]:
class WaterLossCNN(nn.Module):
    """
    Define the CNN model
    """
    def __init__(self):
        super(WaterLossCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2, 2)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(32 * (audio_data.shape[1] // 4), 64) #accomodate for flattening
        self.fc2 = nn.Linear(64, 1)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 32 * (audio_data.shape[1] // 4)) #flattens the data
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [33]:
model = WaterLossCNN()

In [34]:
criterion = nn.MSELoss() #loss
optimizer = optim.Adam(model.parameters(), lr=0.001) #weight optimizer

In [35]:
#training
def train_model(model, dataloader, criterion, optimizer, num_epochs = 100):
    for epoch in range(num_epochs):
        for inputs, targets in dataloader:
            inputs = inputs.unsqueeze(1)  #add channel dimension: (batchsize, seq_len) -> (batchsize, 1, seq_len)
            outputs = model(inputs)
            loss = criterion(outputs, targets.unsqueeze(1))
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        #implement validation later when you have more data

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')
    return model


In [37]:
trained_model = train_model(model, train_loader, criterion, optimizer)

Epoch 1/100, Loss: 326.1878356933594


KeyboardInterrupt: 

In [None]:
from sklearn.metrics import mean_absolute_error

In [None]:
#evaluation
def evaluate_model(model, dataloader, criterion):
    model.eval()

    all_targets = []
    all_predictions = []

    with torch.no_grad():
        total_loss = 0
        for inputs, targets in dataloader:
            inputs = inputs.unsqueeze(1)
            outputs = model(inputs)
            loss = criterion(outputs, targets.unsqueeze(1))
            total_loss += loss.item()

            all_targets.extend(targets.numpy())
            all_predictions.extend(outputs.numpy().flatten())

        all_targets = np.array(all_targets)
        all_predictions = np.array(all_predictions)
        
        mae = mean_absolute_error(all_targets, all_predictions)

        avg_loss = total_loss / len(dataloader)
    
    return f"Average Loss: {avg_loss}, Mean Absolute Error:{mae}"

In [None]:
print(evaluate_model(trained_model, test_loader, criterion))

In [None]:
#predict new audio file's water loss
new_audio_file = 'new_file.wav'
new_audio_data = load_audio_files([new_audio_file])
new_audio_tensor = torch.tensor(new_audio_data, dtype=torch.float32).unsqueeze(1)  # Add batch and channel dimensions

model.eval()
with torch.no_grad():
    predicted_water_loss = model(new_audio_tensor)
    print('Predicted Water Loss:', predicted_water_loss.item())