In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
import pandas as pd

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Dataset 
class AudioDataset(Dataset):
    def __init__(self, X, y) -> None:
        super().__init__()
        # Convert into pytorch tensors
        self.X = torch.tensor(X, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        features = self.X[index]
        target = self.y[index]
        return features, target

# DataLoader
def data_loader(data_dir,
                batch_size,
                random_seed=42,
                valid_size=0.1,
                shuffle=True,
                test=False):

    if test:
        # Handle test set if necessary
        pass
    
    df = pd.read_pickle('final_data.pkl')
    
    X, y = df.iloc[:, :-1].values, df.iloc[:, -1].values

    # load the dataset
    train_dataset = AudioDataset(X, y)

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)
 
    valid_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)


# CIFAR10 dataset 
train_loader, valid_loader = data_loader(data_dir='./data_cvs',
                                         batch_size=8)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm1d(out_channels),
            nn.ReLU())
        self.conv2 = nn.Sequential(
            nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(out_channels))
        self.downsample = downsample
        self.relu = nn.ReLU()
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3),  # Adjust to Conv1d
            nn.BatchNorm1d(64),  # Adjust to BatchNorm1d
            nn.ReLU())
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)  # Adjust to MaxPool1d
        self.layer0 = self._make_layer(block, 64, layers[0], stride=1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)  # Adjust to AvgPool1d
        self.fc = nn.Linear(512, num_classes)
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            downsample = nn.Sequential(
                nn.Conv1d(self.inplanes, planes, kernel_size=1, stride=stride),  # Adjust to Conv1d
                nn.BatchNorm1d(planes),  # Adjust to BatchNorm1d
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

num_classes = 4
num_epochs = 20
batch_size = 16
learning_rate = 0.01

model = ResNet(ResidualBlock, [3, 4, 6, 3], num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=0.001, momentum=0.9)  

In [2]:
# Train the model
import gc
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):  
        # Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(i)
        
        del images, labels, outputs
        torch.cuda.empty_cache()
        gc.collect()

    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))
            
    # Validation
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs
    
        print('Accuracy of the network on the validation images: {} %'.format(100 * correct / total))

0
1
2
3
4
5
6
Epoch [1/20], Loss: 1.0815
Accuracy of the network on the validation images: 45.45454545454545 %
0
1
2
3
4
5
6
Epoch [2/20], Loss: 0.3863
Accuracy of the network on the validation images: 36.36363636363637 %
0
1
2
3
4
5
6
Epoch [3/20], Loss: 1.2745
Accuracy of the network on the validation images: 27.272727272727273 %
0
1
2
3
4
5
6
Epoch [4/20], Loss: 0.5364
Accuracy of the network on the validation images: 27.272727272727273 %
0
1
2
3
4
5
6
Epoch [5/20], Loss: 1.0899
Accuracy of the network on the validation images: 54.54545454545455 %
0
1
2
3
4
5
6
Epoch [6/20], Loss: 0.6121
Accuracy of the network on the validation images: 36.36363636363637 %
0
1
2
3
4
5
6
Epoch [7/20], Loss: 0.7355
Accuracy of the network on the validation images: 63.63636363636363 %
0
1
2
3
4
5
6
Epoch [8/20], Loss: 0.8653
Accuracy of the network on the validation images: 54.54545454545455 %
0
1
2
3
4
5
6
Epoch [9/20], Loss: 1.3522
Accuracy of the network on the validation images: 45.45454545454545 %

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x0000014CE53B53D0>>
Traceback (most recent call last):
  File "C:\Users\borac\AppData\Roaming\Python\Python312\site-packages\ipykernel\ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 


In [9]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
import pandas as pd
from torch.cuda.amp import GradScaler, autocast
import gc
from torch_audiomentations import Compose, AddBackgroundNoise, Shift

transform = Compose([
    Shift(min_shift=-0.5, max_shift=0.5, p=0.5)
])

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Dataset 
class AudioDataset(Dataset):
    def __init__(self, X, y) -> None:
        super().__init__()
        self.X = torch.tensor(X, dtype=torch.float32).unsqueeze(1)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        features = self.X[index]
        target = self.y[index]
        return features, target

# DataLoader
def data_loader(data_dir,
                batch_size,
                random_seed=42,
                valid_size=0.1,
                shuffle=True,
                test=False):

    if test:
        pass
    
    df = pd.read_pickle('final_data.pkl')
    
    X, y = df.iloc[:, :-1].values, df.iloc[:, -1].values

    train_dataset = AudioDataset(X, y)

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)
 
    valid_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)

# CIFAR10 dataset 
batch_size = 16
train_loader, valid_loader = data_loader(data_dir='./data_cvs', batch_size=batch_size)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm1d(out_channels),
            nn.ReLU())
        self.conv2 = nn.Sequential(
            nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(out_channels))
        self.downsample = downsample
        self.relu = nn.ReLU()
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm1d(64),
            nn.ReLU())
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        self.layer0 = self._make_layer(block, 64, layers[0], stride=1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512, num_classes)
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            downsample = nn.Sequential(
                nn.Conv1d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm1d(planes),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

num_classes = 4
num_epochs = 20
learning_rate = 0.01

model = ResNet(ResidualBlock, [3, 4, 6, 3], num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=0.001, momentum=0.9)  
scaler = GradScaler()

best_accuracy = 0.0
best_model_wts = model.state_dict()

for epoch in range(num_epochs):
    model.train()
    for i, (images, labels) in enumerate(train_loader):  
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        
        with autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        del images, labels, outputs
        torch.cuda.empty_cache()
        gc.collect()
    
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))
            
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs
    
        accuracy = 100 * correct / total
        print('Accuracy of the network on the validation images: {} %'.format(accuracy))
        
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_model_wts = model.state_dict()

# Load best model weights
model.load_state_dict(best_model_wts)
for i, (images, labels) in enumerate(train_loader):  
    images = images.to(device)
    labels = labels.to(device)
    
    optimizer.zero_grad()
    
    with autocast():
        outputs = model(images)
        print(outputs, labels)
        loss = criterion(outputs, labels)
    break

# Save the best model weights
torch.save(model.state_dict(), 'best_model.pth')

print(f'Best validation accuracy: {best_accuracy} %')

Epoch [1/20], Loss: 1.7749
Accuracy of the network on the validation images: 18.181818181818183 %
Epoch [2/20], Loss: 0.6628
Accuracy of the network on the validation images: 9.090909090909092 %
Epoch [3/20], Loss: 0.5845
Accuracy of the network on the validation images: 9.090909090909092 %
Epoch [4/20], Loss: 0.9028
Accuracy of the network on the validation images: 27.272727272727273 %
Epoch [5/20], Loss: 1.0201
Accuracy of the network on the validation images: 27.272727272727273 %
Epoch [6/20], Loss: 1.5264
Accuracy of the network on the validation images: 18.181818181818183 %
Epoch [7/20], Loss: 0.9219
Accuracy of the network on the validation images: 18.181818181818183 %
Epoch [8/20], Loss: 0.5500
Accuracy of the network on the validation images: 27.272727272727273 %
Epoch [9/20], Loss: 0.2937
Accuracy of the network on the validation images: 18.181818181818183 %
Epoch [10/20], Loss: 1.0773
Accuracy of the network on the validation images: 63.63636363636363 %
Epoch [11/20], Loss: 0

In [2]:
# Load best model weights
model = ResNet(ResidualBlock, [3, 4, 6, 3], num_classes).to(device)
model.load_state_dict(torch.load("best_model.pth"))
for i, (images, labels) in enumerate(train_loader):  
    images = images.to(device)
    labels = labels.to(device)
    
    optimizer.zero_grad()
    
    with autocast():
        outputs = model(images)
        print(outputs, labels)
        loss = criterion(outputs, labels)
    break


print(f'Best validation accuracy: {best_accuracy} %')

NameError: name 'ResNet' is not defined