# import 

In [108]:
import torch 
from torch import nn 
import os 
import matplotlib.pyplot as plt 
from torch.utils.data import Dataset , DataLoader
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T
from torchaudio.utils import download_asset
from IPython.display import Audio
from tqdm.auto import tqdm


# custom dataset 
### we preprocess the aduio file through some steps 

- load the audio file
- pick the first audio channel 
- resample the wav 
- zero padding to unify the wav legnth 
- transform the wav into Spectrogram 


In [109]:
class custom_ds(Dataset) : 
    def __init__(self,data_folder,label) : 
        super().__init__() 
        self.data_folder = data_folder 
        self.label = label 
        self.files = os.listdir(os.path.join(os.getcwd() , 'data' , 'recorded-data' , data_folder)) 
    def __len__(self) : 
        return len(self.files)
    def __getitem__(self  , idx) : 
        wav, sample_rate = torchaudio.load(os.path.join(os.getcwd() , 'data' , 'recorded-data' , self.data_folder , self.files[idx] ) )
        wav = wav[0 , :]
        wav = F.resample(wav, sample_rate, 16000) 
        padding = 48000 - wav.shape[0] 
        zeros = torch.zeros([padding]) 
        wav = torch.cat([zeros , wav] , axis = 0 )

        transform = torchaudio.transforms.Spectrogram(n_fft=320 )
        spectrogram = transform(wav)
        spectrogram = spectrogram.unsqueeze(0)
        
        return  spectrogram, self.label

In [110]:
positive_ds = custom_ds('positive' ,1.)
negative_ds = custom_ds('negative'  ,0.)

In [111]:
dataset = torch.utils.data.ConcatDataset([positive_ds  ,negative_ds])

In [112]:
data_loader = DataLoader(dataset, batch_size=8,
                        shuffle=True)

In [113]:
train_set, val_set = torch.utils.data.random_split(data_loader, [ 22,6 ])

# the model 
### the model is basic down sample model 


In [114]:
class model(nn.Module) : 
    def __init__(self) : 
        super().__init__()
        self.S = nn.Sequential(
            nn.Conv2d(1, 16 , (3,3)) , 
            nn.ReLU() , 
            nn.MaxPool2d((2,2)) , 
            nn.Conv2d(16 , 16 , (3,3)) , 
            nn.ReLU() , 
            nn.Flatten() , 
            nn.Linear(181104 , 128 ), 
            nn.Dropout(.2),
            nn.Linear(128 ,1) , 
            nn.Sigmoid()
            
        )
    def forward(self , inputs ) : 
        return self.S(inputs ) 

In [115]:
model = model()

# train step 

In [126]:
def train_step(model , dataloader , loss_fn , optimizer ) : 
    model.train()
    train_loss = 0 
    for batch_images , batch_labels in dataloader.dataset : 
        preds =  model(batch_images).reshape(-1) 
        loss = loss_fn(preds , batch_labels.type(torch.float)) 
        train_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step() 
        
        
    
    train_loss = train_loss / len(dataloader)
    return train_loss  
        

# test step 

In [131]:
def test_step(model , dataloader , loss_fn) : 
    model.eval()
    test_loss  = 0 
    with torch.no_grad() : 
        for batch_images , batch_labels in dataloader.dataset : 
            preds = model(batch_images ).reshape(-1)
            loss = loss_fn(preds , batch_labels.type(torch.float) )
            test_loss += loss.item()
            
            
    test_loss = test_loss / len(dataloader)
    return test_loss 

In [132]:
def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5):
    
    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "test_loss": [],
    }
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)
        
        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"test_loss: {test_loss:.4f} | "

        )

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["test_loss"].append(test_loss)

    # 6. Return the filled results at the end of the epochs
    return results

In [133]:
optim = torch.optim.Adam(model.parameters() ,  lr = .0001)
loss_fn = nn.BCELoss()

In [134]:
model_0_results = train(model=model, 
                        train_dataloader=train_set,
                        test_dataloader=val_set,
                        optimizer=optim,
                        loss_fn=loss_fn, 
                        epochs=5)

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch: 1 | train_loss: 0.0121 | train_acc: 1.2670 | test_loss: 0.0015 | test_acc: 4.6667
Epoch: 2 | train_loss: 0.0006 | train_acc: 1.2727 | test_loss: 0.0006 | test_acc: 4.6667
Epoch: 3 | train_loss: 0.0001 | train_acc: 1.2727 | test_loss: 0.0001 | test_acc: 4.6667
Epoch: 4 | train_loss: 0.0000 | train_acc: 1.2727 | test_loss: 0.0001 | test_acc: 4.6667
Epoch: 5 | train_loss: 0.0000 | train_acc: 1.2727 | test_loss: 0.0001 | test_acc: 4.6667


In [140]:
torch.save(model.state_dict(), 'torch.pth')