In [1]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

print(f"PyTorch version: {torch.__version__}")

# Check PyTorch has access to MPS (Metal Performance Shader, Apple's GPU architecture)
print(f"Is MPS (Metal Performance Shader) built? {torch.backends.mps.is_built()}")
print(f"Is MPS available? {torch.backends.mps.is_available()}")

# Set the device      
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

PyTorch version: 2.0.1
Is MPS (Metal Performance Shader) built? True
Is MPS available? True
Using device: mps


In [2]:
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
import torch.nn as nn
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
from torch.autograd import Variable
import sys
from tqdm.notebook import tqdm
import time
import copy

In [3]:
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import random
import torch.nn as nn

train_data_path = 'Dataset/Train'
transform = transforms.Compose([
    transforms.Resize((200, 200)),
    transforms.CenterCrop(160),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_dataset = datasets.ImageFolder(root=train_data_path, transform=transform)

test_data_path= 'Dataset/Validation'
test_dataset= datasets.ImageFolder(root=test_data_path, transform=transform)

train_loader= DataLoader(train_dataset, batch_size=32, shuffle=True)

In [4]:
from facenet_pytorch import InceptionResnetV1 
vggface = InceptionResnetV1(pretrained='vggface2')

In [5]:
def sinc(band,t_right):
    y_right= torch.sin(2*math.pi*band*t_right)/(2*math.pi*band*t_right)
    y_left= flip(y_right,0)

    y=torch.cat([y_left,Variable(torch.ones(1)).to(device),y_right])

    return y

def flip(x, dim):
    xsize = x.size()
    dim = x.dim() + dim if dim < 0 else dim
    x = x.contiguous()
    x = x.view(-1, *xsize[dim:])
#     x = x.view(x.size(0), x.size(1), -1)[:, getattr(torch.arange(x.size(1)-1, 
#                       -1, -1))().to(device).long(), :]
    x = torch.flip(x, [dim]).to(device)
    return x.view(xsize)

class sinc_conv(nn.Module):

    def __init__(self, N_filt,Filt_dim,fs):
        super(sinc_conv,self).__init__()

        # Mel Initialization of the filterbanks
        low_freq_mel = 80
        high_freq_mel = (2595 * np.log10(1 + (fs / 2) / 700))  # Convert Hz to Mel
        mel_points = np.linspace(low_freq_mel, high_freq_mel, N_filt)  # Equally spaced in Mel scale
        f_cos = (700 * (10**(mel_points / 2595) - 1)) # Convert Mel to Hz
        b1=np.roll(f_cos,1)
        b2=np.roll(f_cos,-1)
        b1[0]=30
        b2[-1]=(fs/2)-100
                
        self.freq_scale=fs*1.0
        self.filt_b1 = nn.Parameter(torch.from_numpy(b1/self.freq_scale))
        self.filt_band = nn.Parameter(torch.from_numpy((b2-b1)/self.freq_scale))

        
        self.N_filt=N_filt
        self.Filt_dim=Filt_dim
        self.fs=fs
        

    def forward(self, x):
        
        filters=Variable(torch.zeros((self.N_filt,self.Filt_dim))).to(device)
        N=self.Filt_dim
        t_right=Variable(torch.linspace(1, (N-1)/2, steps=int((N-1)/2))/self.fs).to(device)
        
        
        min_freq=50.0;
        min_band=50.0;
        
        filt_beg_freq=torch.abs(self.filt_b1)+min_freq/self.freq_scale
        filt_end_freq=filt_beg_freq+(torch.abs(self.filt_band)+min_band/self.freq_scale)
       
        n=torch.linspace(0, N, steps=N)

        # Filter window (hamming)
        window=0.54-0.46*torch.cos(2*math.pi*n/N);
        window=Variable(window.float32().to(device))

        
        for i in range(self.N_filt):
                        
            low_pass1 = 2*filt_beg_freq[i].float32()*sinc(filt_beg_freq[i].float32()*self.freq_scale,t_right)
            low_pass2 = 2*filt_end_freq[i].float32()*sinc(filt_end_freq[i].float32()*self.freq_scale,t_right)
            band_pass=(low_pass2-low_pass1)

            band_pass=band_pass/torch.max(band_pass)

            filters[i,:]=band_pass.to(device)*window

        out=F.conv1d(x, filters.view(self.N_filt,1,self.Filt_dim))
    
        return out

In [14]:
import math
class classifier_vggface(nn.Module):
  def __init__(self):
    super(classifier_vggface, self).__init__()
    self.encoder= vggface
    self.classifier= nn.Sequential(
        sinc_conv(1,99,1600),
        sinc_conv(1,99,1600),
        sinc_conv(1,99,1600),
        nn.Linear(218, 2),
        # nn.BatchNorm1d(512),
        # nn.ReLU(inplace=True),
        # nn.Linear(512, 128),
        # nn.BatchNorm1d(128),
        # nn.ReLU(inplace=True),
        # nn.Linear(128, 2),
    )
  def forward(self, x):
    x= self.encoder(x)
    x= x.unsqueeze(1)
    x= self.classifier(x)
    return x.squeeze()

# model1= InceptionResnetV1(
#     classify=True,
#     pretrained='vggface2',
#     num_classes=2
# )

model1= classifier_vggface().to(torch.float32).to(device)

for img,_ in train_loader:
  print(model1(img.to(device)).shape)
  break


torch.Size([32, 2])


In [22]:
for param in model1.parameters(): #freeze model
    param.requires_grad = False

model1.classifier= nn.Sequential(
        sinc_conv(1,99,1600),
        sinc_conv(1,99,1600),
        sinc_conv(1,99,1600),
        nn.Linear(218, 218),
#         nn.BatchNorm1d(218),
        nn.ReLU(inplace=True),
        nn.Linear(218, 128),
#         nn.BatchNorm1d(128),
        nn.ReLU(inplace=True),
        nn.Linear(128, 2),
    )

model1

classifier_vggface(
  (encoder): InceptionResnetV1(
    (conv2d_1a): BasicConv2d(
      (conv): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (conv2d_2a): BasicConv2d(
      (conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (conv2d_2b): BasicConv2d(
      (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (maxpool_3a): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2d_3b): BasicConv2d(
      (conv): Conv2d(64, 80, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(80, eps=0.001, momentum=0.1, affine

In [23]:
from timm.loss import LabelSmoothingCrossEntropy
from torch.optim.lr_scheduler import CosineAnnealingLR
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
import torch.nn as nn
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
from torch.autograd import Variable
from torch.optim.lr_scheduler import CosineAnnealingLR

criterion = LabelSmoothingCrossEntropy()
model1= model1.to(torch.float32).to(device)
criterion = criterion.to(device)
optimizer1 = optim.AdamW(model1.classifier.parameters(), lr=0.001)
exp_lr_scheduler1 = CosineAnnealingLR(optimizer1, T_max=10, eta_min=0.001)

test_loader= DataLoader(test_dataset, batch_size=32, shuffle=True)

dataloaders = {
    "train": train_loader,
    "val": test_loader
}

dataset_sizes = {
    "train": len(train_dataset),
    "val": len(test_dataset)
}

In [24]:
import shutil
import os
def save_model1(model):
    if not os.path.isdir('saved_models'):
        os.mkdir('saved_models')
        
    torch.save(model.state_dict(), os.path.join('saved_models', 'clf_sync.pt'))
    print("Model successfully saved.")

In [25]:
import os

accs=[]
losses=[]

def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    global losses
    global accs
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print("-"*10)
        
        for phase in ['train', 'val']: # We do training and validation phase per epoch
            if phase == 'train':
                model.train() # model to training mode
            else:
                model.eval() # model to evaluate
            
            running_loss = 0.0
            running_corrects = 0.0
            
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'): # no autograd makes validation go faster
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1) # used for accuracy
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * 24
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step() # step at end of epoch
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc =  running_corrects.float() / dataset_sizes[phase]
            
            save_model1(model)
            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))
            accs.append(epoch_acc)
            losses.append(epoch_loss)
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict()) # keep the best validation accuracy model
        print()
    time_elapsed = time.time() - since # slight error
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Best Train Acc: {:.4f}".format(best_acc))
    
    model.load_state_dict(best_model_wts)
    return model

model_ft1 = train_model(model1, criterion, optimizer1, exp_lr_scheduler1, num_epochs=5)

Epoch 0/4
----------


  0%|          | 0/4376 [00:00<?, ?it/s]

  Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


Model successfully saved.
train Loss: 0.3341 Acc: 0.8389


  0%|          | 0/1233 [00:00<?, ?it/s]

Model successfully saved.
val Loss: 0.3680 Acc: 0.8087

Epoch 1/4
----------


  0%|          | 0/4376 [00:00<?, ?it/s]

Model successfully saved.
train Loss: 0.3161 Acc: 0.8556


  0%|          | 0/1233 [00:00<?, ?it/s]

Model successfully saved.
val Loss: 0.3537 Acc: 0.8209

Epoch 2/4
----------


  0%|          | 0/4376 [00:00<?, ?it/s]

Model successfully saved.
train Loss: 0.3096 Acc: 0.8625


  0%|          | 0/1233 [00:00<?, ?it/s]

Model successfully saved.
val Loss: 0.3634 Acc: 0.8061

Epoch 3/4
----------


  0%|          | 0/4376 [00:00<?, ?it/s]

Model successfully saved.
train Loss: 0.3050 Acc: 0.8666


  0%|          | 0/1233 [00:00<?, ?it/s]

Model successfully saved.
val Loss: 0.3115 Acc: 0.8604

Epoch 4/4
----------


  0%|          | 0/4376 [00:00<?, ?it/s]

Model successfully saved.
train Loss: 0.3022 Acc: 0.8697


  0%|          | 0/1233 [00:00<?, ?it/s]

Model successfully saved.
val Loss: 0.3274 Acc: 0.8431

Training complete in 68m 29s
Best Train Acc: 0.8604
