In [1]:
#import libraries
import pandas as pd
import os
import zipfile
import shutil

#import torch libraries

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.utils.data import random_split

# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [2]:
# clone the repo
!git clone https://github.com/Prashant-AV/Qualcomm-DL-Hackathon

Cloning into 'Qualcomm-DL-Hackathon'...
remote: Enumerating objects: 10, done.[K
remote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects: 100% (10/10), done.[K
remote: Total 10 (delta 1), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (10/10), 30.68 MiB | 14.90 MiB/s, done.
Resolving deltas: 100% (1/1), done.


In [3]:
# extract the content
extract_dir = "/content/Train"
os.makedirs(extract_dir, exist_ok=True)

#open and extract the zip file
for i in range(1,3):
  with zipfile.ZipFile(f"/content/Qualcomm-DL-Hackathon/train/images part-{i}.zip","r") as zip_ref:
    zip_ref.extractall(extract_dir)

In [4]:
train_data_csv = pd.read_csv("/content/Qualcomm-DL-Hackathon/train/train.csv")
test = pd.read_csv("/content/Qualcomm-DL-Hackathon/test.csv")

In [5]:
# extract images and split them into train and test

destination_folder = "/content/Train/train_images"

if not os.path.exists(destination_folder):
  os.makedirs(destination_folder)

for i in range(1,3):
  source_folder = f"/content/Train/images part-{i}"
  for root, dirs, files in os.walk(source_folder):
    for file in files:
      src_file = os.path.join(root, file)
      shutil.copy2(src_file, destination_folder)



In [6]:
#splitting datasets into train and test
test_images = list(test["image_names"])
source_folder = f"/content/Train/train_images"
destination_folder = "/content/Train/final_test_images"

if not os.path.exists(destination_folder):
  os.makedirs(destination_folder)
count = 0
for root, dirs, files in os.walk(source_folder):
  for file in files:
    # if count < 1:
    #   print(file, type(file), test_images[0],type(test_images[0]))
    count += 1
    if file in test_images:
      # print("file")
      src_file = os.path.join(root, file)
      shutil.move(src_file, destination_folder)


2352


In [7]:
# initialize the res dict to construct final csv at the end
count = 0
res = {"image_names":[], "emergency_or_not":[]}
for root, dirs, files in os.walk(destination_folder):
  for file in files:
    # if count < 1:
    #   print(file, type(file), test_images[0],type(test_images[0]))
    count += 1
    res["image_names"].append(file)


706


In [8]:
# Creating classes to handle image to label mapping with(for train) and without labels(for test)
from torch.utils.data import Dataset
from PIL import Image

class CustomImageDatasetWithLabels(Dataset):
    def __init__(self, csv_file, image_folder, transform=None):
        self.labels_df = pd.read_csv(csv_file)
        self.image_folder = image_folder
        self.transform = transform

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_folder, self.labels_df.iloc[idx, 0])
        image = Image.open(img_name)
        label = int(self.labels_df.iloc[idx, 1])  # Convert label to int
        if self.transform:
            image = self.transform(image)
        return image, label

class CustomImageDatasetWithoutLabels(Dataset):
    def __init__(self, image_folder, transform=None):
        self.image_folder = image_folder
        self.image_names = os.listdir(image_folder)
        self.transform = transform

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_folder, self.image_names[idx])
        image = Image.open(img_name)
        if self.transform:
            image = self.transform(image)
        return image

In [9]:
# Tried different combinations with color saturation, vertical flip, etc. 
# Found this combination to be better than others
# Here, we resize the image to 256x256 along with horizontal flip and rotation of 15 degrees.
train_transforms = transforms.Compose([
    transforms.Resize((256,256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229,0.224,0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229,0.224,0.225])
])


In [10]:

# Paths to your CSV files and image folders
train_csv = '/content/Qualcomm-DL-Hackathon/train/train.csv'
train_image_folder = '/content/Train/train_images'

# Initialize your datasets
train_dataset = CustomImageDatasetWithLabels(csv_file=train_csv, image_folder=train_image_folder, transform=train_transforms)



In [11]:
# Defining a function to split data into train and test sets
def split_data(data,ratio=0.8):
    train_size = int(ratio * len(data))
    test_size = len(data) - train_size
    train_data, test_data = random_split(data, [train_size, test_size])
    return train_data, test_data

In [133]:
# Defining a function to define data loaders
# Found batch size 128 to be the optimal value based on testing
def create_dataloaders(train_data, test_data,b_train=128,b_test=32):
    train_loader = DataLoader(train_data, batch_size=b_train, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=b_test, shuffle=False)
    return train_loader, test_loader


In [None]:
# define the train and test sets
train_loader, test_loader = create_dataloaders(train_dataset, test_dataset)

In [135]:
# Initializing the model
from torchvision.models import resnet18
model = resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs,2)

model = model.to(device)



In [136]:
# Define the optimizer and scehduler
from torch.optim.lr_scheduler import StepLR
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)


In [None]:
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _,predicted = torch.max(outputs.data,1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")
    scheduler.step()

Epoch 1/40, Loss: 0.2652, Accuracy: 91.49%
Epoch 2/40, Loss: 0.1356, Accuracy: 94.84%
Epoch 3/40, Loss: 0.0819, Accuracy: 97.08%
Epoch 4/40, Loss: 0.0669, Accuracy: 97.45%
Epoch 5/40, Loss: 0.0503, Accuracy: 97.93%
Epoch 6/40, Loss: 0.0474, Accuracy: 98.06%
Epoch 7/40, Loss: 0.0604, Accuracy: 97.81%
Epoch 8/40, Loss: 0.0413, Accuracy: 98.60%
Epoch 9/40, Loss: 0.0415, Accuracy: 98.48%
Epoch 10/40, Loss: 0.0265, Accuracy: 99.21%
Epoch 11/40, Loss: 0.0240, Accuracy: 98.91%
Epoch 12/40, Loss: 0.0124, Accuracy: 99.39%
Epoch 13/40, Loss: 0.0089, Accuracy: 99.70%
Epoch 14/40, Loss: 0.0090, Accuracy: 99.64%
Epoch 15/40, Loss: 0.0089, Accuracy: 99.51%
Epoch 16/40, Loss: 0.0056, Accuracy: 99.94%
Epoch 17/40, Loss: 0.0068, Accuracy: 99.70%
Epoch 18/40, Loss: 0.0071, Accuracy: 99.64%
Epoch 19/40, Loss: 0.0057, Accuracy: 99.76%
Epoch 20/40, Loss: 0.0054, Accuracy: 99.76%
Epoch 21/40, Loss: 0.0054, Accuracy: 99.64%
Epoch 22/40, Loss: 0.0052, Accuracy: 99.70%
Epoch 23/40, Loss: 0.0057, Accuracy: 99.6

In [123]:
# Used to evaluate test data if data is split into train and test
def evaluate_test(test_loader):
    model.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()
    test_accuracy = 100 * test_correct / test_total
    print(f"Test Accuracy: {test_accuracy:.2f}%")

Test Accuracy: 97.58%


In [91]:
# Define a function to predict the outputs for test data
results = {}
def predict_op():
  global res
  global results
  res["emergency_or_not"].clear()
  model.eval()
  test_correct = 0
  test_total = 0
  with torch.no_grad():
      for inputs in test_loader_temp:
          inputs = inputs.to(device)

          outputs = model(inputs)
          _, predicted = torch.max(outputs.data, 1)
          # print([t.item() for t in list(predicted)])
          res["emergency_or_not"].extend([t.item() for t in list(predicted)])
  results = {}
  for i in range(len(res["image_names"])):
    results[res["image_names"][i]] = res["emergency_or_not"][i]


In [73]:
# Define a function to create csv
def create_csv(name):
  temp = []
  import pandas as pd
  data = pd.read_csv("/content/Qualcomm-DL-Hackathon/test.csv")
  for i in list(data["image_names"]):
    temp.append(results[i])
  data["emergency_or_not"] = temp
  data.to_csv(f"{name}.csv", index=False)

In [None]:
# Predict the outputs and create the final csv file
predict_op()
create_csv()