In [1]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
from glitch_this import ImageGlitcher
%matplotlib inline

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, ConcatDataset, random_split
from torchvision import datasets, transforms
from torchinfo import summary
import os
import PIL

In [2]:
def get_images(path):
    images_list = []
    
    for root, dirs, files in os.walk(path):
        for name in files:
            images_list.append(os.path.join(root, name))

    return images_list

In [3]:
class CustomDataset(Dataset):

    def __init__(self, root, target, transform=None):
        self.root = root
        self.images = get_images(root)
        self.targets = [int(target) for i in range(len(self.images))]
        self.transform = transform

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, index):
        image_name = self.images[index]  
        image = PIL.Image.open(image_name).convert('RGB')
        label = torch.tensor(self.targets[index],dtype=torch.float32)
        if self.transform:
            image = self.transform(image)
        return (image, label)

In [4]:
def pixelate(img):
    # Resize down to 128x128 pixels
    imgSmall = img.resize((128,128), resample=PIL.Image.Resampling.BILINEAR)
    
    # Scale back up using NEAREST to original size
    return imgSmall.resize(img.size, PIL.Image.Resampling.NEAREST)

def shuffle_pixel(region):
    temp = np.reshape(region, (-1, region.shape[2]))
    np.random.shuffle(temp)
    region = np.reshape(temp, region.shape)
    return region

def distort(img):
    img = np.array(img)
    
    for i in range(img.shape[0]//14):
        for j in range(img.shape[1]//14):
            if np.random.randint(0,5) == 0:
                img[14*i:14*i+14,14*j:14*j+14,:] = shuffle_pixel(img[14*i:14*i+14,14*j:14*j+14,:])

    return PIL.Image.fromarray(np.uint8(img)).convert('RGB')
            
glitcher = ImageGlitcher()

def glitch(img):
    glitch_amount = np.random.randint(2,9)
    if np.random.randint(0,1) == 0:
        return glitcher.glitch_image(img, glitch_amount, color_offset=False)
    else:
        return glitcher.glitch_image(img, glitch_amount, color_offset=True)

def noise_lambda(img):
    if np.random.randint(0,2) == 0:
        return pixelate(distort(img))
    else:
        return glitch(img)

In [5]:
def crop_black_border(image):
    y_nonzero, _, _ = np.nonzero(np.array(image) > 10)
    return image.crop((0, np.min(y_nonzero), image.size[0], np.max(y_nonzero)))

In [6]:
x = np.array([[3, 1], [1,2], [5, 6]])
np.nonzero(x)

(array([0, 0, 1, 1, 2, 2]), array([0, 1, 0, 1, 0, 1]))

In [7]:
# caltech dataset
# root = "../caltech101/101_ObjectCategories/"

# UCF101 dataset
root = "../Data/505_video_frames/"
batch_size=32

normal_transform = transforms.Compose([
        transforms.Lambda(crop_black_border),
        transforms.Resize((224,224)),           
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

glitch_transform = transforms.Compose([ 
        transforms.Lambda(crop_black_border),
        transforms.Resize((224,224)),  
        transforms.Lambda(noise_lambda),  
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

In [8]:
original_dataset = CustomDataset(root=root,target=0,transform=normal_transform)
glitch_dataset = CustomDataset(root=root,target=1,transform=glitch_transform)
dataset = ConcatDataset([original_dataset, glitch_dataset])

In [9]:
train_size = int(0.7 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

In [10]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [11]:
class GlitchDetect(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3,16,3,1)
        self.conv2 = nn.Conv2d(16,32,3,1)
        self.conv3 = nn.Conv2d(32,64,3,1)
        self.conv4 = nn.Conv2d(64,128,3,1)
        self.conv5 = nn.Conv2d(128,256,3,1)
        self.fc1 = nn.Linear(6400,2056)
        self.dropout1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(2056,512)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(512,1)  
        
    def forward(self,x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x,2,2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x,2,2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x,2,2)
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x,2,2)
        x = F.relu(self.conv5(x))
        x = F.max_pool2d(x,2,2)

        x = x.view(-1,6400)

        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)
        return F.sigmoid(x)

In [12]:
model = GlitchDetect()

In [13]:
print(summary(model, input_size=(batch_size, 3, 224, 224)))

Layer (type:depth-idx)                   Output Shape              Param #
GlitchDetect                             [32, 1]                   --
├─Conv2d: 1-1                            [32, 16, 222, 222]        448
├─Conv2d: 1-2                            [32, 32, 109, 109]        4,640
├─Conv2d: 1-3                            [32, 64, 52, 52]          18,496
├─Conv2d: 1-4                            [32, 128, 24, 24]         73,856
├─Conv2d: 1-5                            [32, 256, 10, 10]         295,168
├─Linear: 1-6                            [32, 2056]                13,160,456
├─Dropout: 1-7                           [32, 2056]                --
├─Linear: 1-8                            [32, 512]                 1,053,184
├─Dropout: 1-9                           [32, 512]                 --
├─Linear: 1-10                           [32, 1]                   513
Total params: 14,606,761
Trainable params: 14,606,761
Non-trainable params: 0
Total mult-adds (G): 6.83
Input size (MB): 1

In [14]:
class EarlyStopper:
    def __init__(self, patience=2, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.min_validation_loss = np.inf

    def early_stop(self, validation_loss):
        if validation_loss < self.min_validation_loss and abs(validation_loss - self.min_validation_loss) > self.min_delta:
            self.counter = 0
            self.min_validation_loss = validation_loss
        else:
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

In [15]:
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.00001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,mode='min',factor=0.1,patience=2,verbose=True)
early_stopper = EarlyStopper(patience=3, min_delta=0.1)

In [16]:
def count_parameters(model):
    params = [p.numel() for p in model.parameters() if p.requires_grad]
    for item in params:
        print(f'{item:>8}')
    print(f'________\n{sum(params):>8}')

count_parameters(model)

     432
      16
    4608
      32
   18432
      64
   73728
     128
  294912
     256
13158400
    2056
 1052672
     512
     512
       1
________
14606761


In [17]:
from tqdm import tqdm

In [None]:
epochs = 15

train_losses = []
test_losses = []

for i in range(epochs):
    for b, (X_train, y_train) in tqdm(enumerate(train_loader)):
        b+=1
        model.train()
        X_train = X_train.to(device='cuda')
        y_train = y_train.to(device='cuda')
        pred = model(X_train)
        loss = criterion(pred, y_train.unsqueeze(dim=-1))
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if b % 1000 == 0:
            print(f'epoch: {i:2}  batch: {b:4} [{batch_size*b:6}/{len(train_dataset)}]  loss: {loss.item()}')

    # train_losses.append(loss)

    running_loss = 0
    model.eval()
    with torch.no_grad():
        for vb, (X_test, y_test) in enumerate(test_loader):
            X_test = X_test.to(device='cuda')
            y_test = y_test.to(device='cuda')
            val = model(X_test)
            loss = criterion(val, y_test.unsqueeze(dim=-1))
            running_loss += loss
    # test_losses.append(loss)

    avg_loss = running_loss / (vb+1)
    print(f'epoch: {i:2}  finished,  validation loss: {loss.item()}')
    if early_stopper.early_stop(loss):             
        break
    scheduler.step(loss)

1000it [04:05,  4.41it/s]

epoch:  0  batch: 1000 [ 32000/127947]  loss: 0.7071014046669006


2000it [07:59,  4.30it/s]

epoch:  0  batch: 2000 [ 64000/127947]  loss: 0.6153396368026733


3000it [11:59,  4.22it/s]

epoch:  0  batch: 3000 [ 96000/127947]  loss: 0.7005798816680908


3999it [15:55,  4.18it/s]


epoch:  0  finished,  validation loss: 0.7158223390579224


793it [02:59,  4.91it/s]

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    for X_test, y_test in test_loader:
        X_test = X_test.to(device='cuda')
        y_test = y_test.to(device='cuda')
        y_val = model(X_test)
        predicted = torch.round(y_val)
        correct += (predicted == y_test.unsqueeze(dim=-1)).sum()
print(f'Test accuracy: {correct.item()}/{len(test_dataset)} = {correct.item()*100/len(test_dataset):7.3f}%')

# Test sample

In [None]:
newsample_path = './Capture.PNG'
newsample = PIL.Image.open(newsample_path).convert('RGB')
plt.imshow(newsample)

In [None]:
newsample.size

In [None]:
inv_normalize = transforms.Normalize(
    mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],
    std=[1/0.229, 1/0.224, 1/0.225]
)
# im_inv = inv_normalize(glitch_dataset[4023][0])
# plt.imshow(np.transpose(im_inv.numpy(), (1, 2, 0)));

In [None]:
img = normal_transform(newsample)
img_inv = inv_normalize(img)
plt.imshow(np.transpose(img_inv.numpy(), (1, 2, 0)));

In [None]:
model.eval()
y_val = model(img.to(device='cuda'))

In [None]:
y_val

In [None]:
img_inv = inv_normalize(train_dataset[6][0])
plt.imshow(np.transpose(img_inv.numpy(), (1, 2, 0)));