# DataSaur Case 3
---

Due date: **October 15th 2023, 14.00**

## Description
---
Необходимо обучить модель для выявления поддельных фотографий автотранспорта, так как люди ежегодно пытаются избежать технического осмотра, применяя методы подделки, такие как фотошоп и другие.

В данной задаче по бинарной классификации "0" являются правильным фото автотранспорта, "1" являются фиктивными фото (снятые с экрана монитора, фотошопом и тд.)

В папке "фиктивные" вы можете заметить что фото сами также делятся на подклассы. Все они будут являться "1" для этой задачи. Если ваша модель также правильно будет определять подклассы фиктивных фото, покажите работу модели во время защиты решения задач. За это будет отдельный бонус.

На презентацию кода пройдут топ 7 работ по итогу данного соревнования

# Dataset Preparation
___

1.  Download `techosmotr.zip` from the course website to your local machine.
2.  Go to Google Drive and upload the folder.
3.  Run the code block below. It will ask for permission to mount your Google Drive so this colab can access it. Paste the authorization code into the box as requested.

In [1]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')
%cd  /content/drive/'My Drive'/techosmotr/techosmotr/

Mounted at /content/drive
/content/drive/My Drive/techosmotr/techosmotr


## Make sure you have all libraries

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from PIL import Image
import os
import random
from torchvision import models
import pickle
import pandas as pd


# Dataloader for Binary Classification

In [4]:
batch_size = 64
momentum = 0.9
lr = 0.0075
epochs = 20
log_interval = 100

class CustomImageDataset(Dataset):
    def __init__(self, cat_dir, not_cat_dirs, transform=None):
        # Load cat images
        self.cat_images = [os.path.join(cat_dir, img) for img in os.listdir(cat_dir)]

        # Load not-cat images from multiple directories
        self.not_cat_images = []
        for not_cat_dir in not_cat_dirs:
            self.not_cat_images.extend([os.path.join(not_cat_dir, img) for img in os.listdir(not_cat_dir)])

        # Combining and labeling
        self.all_images = self.cat_images + self.not_cat_images
        self.labels = torch.tensor([0] * len(self.cat_images) + [1] * len(self.not_cat_images) )  # Update labels accordingly

        self.transform = transform

    def __len__(self):
        return len(self.all_images)

    def __getitem__(self, idx):
        image_path = self.all_images[idx]
        image = Image.open(image_path).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



## Load train data
skip this section if you dont want to train model

In [None]:
# Initialize dataset
cat_dir = "train/pravilniye(correct)/0-correct"
not_cat_dirs = ["train/fictivniye(fictitious)/1-not-on-the-brake-stand", "train/fictivniye(fictitious)/2-from-the-screen", "train/fictivniye(fictitious)/3-from-the-screen+photoshop", "train/fictivniye(fictitious)/4-photoshop"]

dataset = CustomImageDataset(cat_dir=cat_dir, not_cat_dirs=not_cat_dirs, transform=transform)

# Split the dataset into train and validation subsets
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = len(dataset) - train_size   # 20% for validation

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=1)
val_loader  = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=1)

# Model


In [5]:


nclasses = 2  # Assuming binary classification

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Load a pre-trained ResNet model and extract features
        self.resnet = models.resnet18(pretrained=True)
        # Remove the last fully connected layer (classification layer)
        self.features = nn.Sequential(*list(self.resnet.children())[:-1])

        # Define new classification layers
        self.fc1 = nn.Linear(512, 256)  # For resnet18, resnet34
        # If you use resnet50, resnet101, or resnet152, use the line below instead:
        # self.fc1 = nn.Linear(2048, 256)

        self.fc1_norm = nn.BatchNorm1d(256)
        self.fc1_drop = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, nclasses)

    def forward(self, x):
        # Extract features using ResNet
        x = self.features(x)
        # Flatten the output from the ResNet feature extractor
        x = x.view(x.size(0), -1)

        # Forward through the custom classification layer
        x = self.fc1_norm(F.relu(self.fc1(x)))
        x = self.fc1_drop(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

# Training
If you dont want to wait for the model to train (takes couple of hours)<br>
You can skip this part and jump directly to Evaluate and Submission File

In [None]:
model = Net()

# Check if GPU is available and if not, use CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'Using {device} device.')

# Move the model to the device (GPU if available)
model.to(device)

optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)

def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # Move data and target to GPU if available
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

def validation():
    model.eval()
    validation_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            # Move data and target to GPU if available
            data, target = data.to(device), target.to(device)

            output = model(data)
            validation_loss += F.nll_loss(output, target, reduction="sum").item() # sum up batch loss
            pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()

    validation_loss /= len(val_loader.dataset)
    print('\nValidation set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        validation_loss, correct, len(val_loader.dataset),
        100. * correct / len(val_loader.dataset)))


for epoch in range(1, epochs + 1):
    train(epoch)
    validation()
    model_file = 'model_' + str(epoch) + '.pth'
    torch.save(model.state_dict(), model_file)
    print('\nSaved model to ' + model_file + '.')

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 264MB/s]


Using cuda:0 device.


# Evaluate and Submission File


In [28]:

class TestImageDataset(Dataset):
    def __init__(self, dir, transform=None):
        self.image_files = [os.path.join(dir, f) for f in os.listdir(dir)]
        self.transform = transform
    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = self.image_files[idx]
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(image_path)  # return image and filename

# Define your model architecture and replace the checkpoint path
model = Net()  # Your model class here
model.load_state_dict(torch.load("model_8.pth")) ### select best model with best validation score
model.eval()

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Test dataset and loader
test_dir = "test"
test_dataset = TestImageDataset(dir=test_dir, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Collecting predictions and file names
dataframe_dict = {"file_index" : [], "class": []}

# Prediction
with torch.no_grad():
    for images, filenames in test_loader:
        images = images.to(device)
        outputs = model(images)

        # Replace with your prediction logic if not a simple argmax
        _, preds = torch.max(outputs, 1)

        dataframe_dict['file_index'].extend(filenames)
        dataframe_dict['class'].extend(preds.cpu().numpy())

# Save to CSV
df = pd.DataFrame(data=dataframe_dict)
# Remove .jpeg and convert filenames to integer indices
df['file_index'] = df['file_index'].apply(lambda x: int(x.replace(".jpeg", "")))
df.to_csv("predictions.csv", index=False)
print("Written to csv file predictions.csv")



Written to csv file predictions.csv
