In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv
from PIL import Image, ImageChops, ImageEnhance
import pandas as pd
from sklearn.metrics import classification_report

import timm
from tqdm.notebook import tqdm

import os
from pathlib import Path

In [None]:
path = "/kaggle/input/auto-dataset/techosmotr/techosmotr/train"   

config = {
    "epoch": 5,
    "batch_size": 4,
    "lr": 1e-3,
    "momentum": 0.9,
    "decay": 0.01
}

In [None]:
# Pretrained model on ImageNet-1000 dataset that is used as a backbone
efficient_net = "efficientnet_b0"

In [None]:
class AutoDataset(Dataset):
    def __init__(self, dir: str, transform):
        self.data = ImageFolder(dir, transform=transform)
            
    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]

In [None]:
class Model(nn.Module):
    def __init__(self, num_classes: int=1):
        super(Model, self).__init__()
        self.base_model = timm.create_model(efficient_net, pretrained=True)
        self.features = nn.Sequential(*list(self.base_model.children())[:-1])
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=1280, out_features=num_classes),
            nn.Sigmoid())

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
# Mean and Standard deviation were calculated separately using training_set data
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((256, 256)),
    transforms.Normalize(mean=[0.0256, 0.0259, 0.0263], std=[0.0158, 0.0157, 0.0158])
])

In [None]:
dataset = AutoDataset(dir=path, transform=transform, ela=True)
train_set, val_set, test_set = torch.utils.data.random_split(dataset, [5927, 329, 329])
train_loader = DataLoader(train_set, batch_size=config["batch_size"], shuffle=True)
val_loader = DataLoader(val_set, batch_size=config["batch_size"])
test_loader = DataLoader(test_set, batch_size=config["batch_size"])

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
model = Model().to(device)

optimizer = optim.SGD(params=model.parameters(), 
                      lr=config["lr"], 
                      momentum=config["momentum"],
                      weight_decay=config["decay"])
loss_fn = nn.BCELoss()

In [None]:
train_losses = []
val_losses = []

for epoch in (range(config["epoch"])):
    model.train()
    print("Training started...")
    running_loss = 0
    
    for images, labels in (train_loader):
        images, labels = images.to(device), labels.to(device).to(torch.float32)

        optimizer.zero_grad()
        outputs = model(images).squeeze(1)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * labels.size(0)
    train_loss = running_loss / len(train_loader)
    train_losses.append(train_loss)


    model.eval()
    running_loss = 0.0
    
    with torch.inference_mode():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device).to(torch.float32)

            outputs = model(images).squeeze(1)
            loss = loss_fn(outputs, labels)
            running_loss += loss.item() * labels.size(0)
        val_loss = running_loss / len(val_loader)
        val_losses.append(val_loss)
    
    print(f"Epoch: {epoch}, Train loss: {train_loss}, Val loss: {val_loss}")

In [None]:
plt.plot(train_losses, label="train")
plt.plot(val_losses, label="val")
plt.legend()

In [None]:
def predict(img, label):
    model.eval()
    output = model(img.to(device).unsqueeze(0)).squeeze(1)
    return label, 1 if output>0.5 else 0

In [None]:
y_true = []
y_pred = []

for i in test_set:
    label, output = predict(*i)
    y_true.append(label)
    y_pred.append(output)

In [None]:
print(classification_report(y_true, y_pred))

In [None]:
test_df = pd.read_csv("/kaggle/input/auto-dataset/test.csv")
test_df.shape

In [None]:
pred_df = pd.DataFrame(columns=['file_index', 'class'])

for _, row in test_df.iterrows():
    index = row.iloc[0]
    
    path = f'/kaggle/input/auto-dataset/techosmotr/techosmotr/test/{index}.jpeg'
    # print(path)
    img = Image.open(path)
    img = transform(img)
    output = model(img.to(device).unsqueeze(0)).squeeze(1)
    output = 1 if output>0.5 else 0
    
    pred_df.loc[len(pred_df.index)] = [index, 1-output]

In [None]:
pred_df.to_csv("submission.csv", index=False)