In [1]:
import zipfile
from pathlib import Path

data_path = "/content"
image_path = data_path + "/data"

with zipfile.ZipFile("//content/drive/MyDrive/HT9/Dataset/RSCD dataset-1million.zip", "r") as zip_ref:
      zip_ref.extractall(image_path)

In [131]:
import torch
import torch.nn as nn
import torchvision.models as models

class ResNet18(nn.Module):
    def __init__(self, num_classes):
        super(ResNet18, self).__init__()
        self.resnet = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            nn.Sequential(*list(models.resnet18(weights=None).children())[4:-1])
        )
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [132]:
from torch.nn.modules.loss import CrossEntropyLoss
import torch.optim as optim
from torch.utils.data import DataLoader

# Define hyperparameters
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Initialize model and optimizer
model = ResNet18(num_classes=6)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = CrossEntropyLoss()

In [5]:
import os
from PIL import Image

# Set the path to the directory
path = '/content/data/RSCD dataset-1million/train'

# Initialize an empty list to store the subdirectories
subdirs = []

# Loop through all the directories and files in the directory
for dirpath, dirnames, filenames in os.walk(path):
    # Loop through all the subdirectories
    for dirname in dirnames:
        # Append the subdirectory to the list
        subdirs.append(os.path.join(dirpath, dirname))

# Initialize an empty list to store the images
image_list = []
counter = 0

labels_names = ['dry', 'fresh', 'ice', 'melted', 'water', 'wet']
dictionary = {}

for counter in range(len(subdirs)):
  # Loop through all the files in the directory
  for filename in os.listdir(subdirs[counter]):
    # Check if the file is an image
    if filename.endswith('.jpg') or filename.endswith('.png'):
      for name in labels_names:
        if name in filename:
          if name not in dictionary:
            dictionary[name] = []
          dictionary[name].append(subdirs[counter] + "/" + filename)



In [6]:
for name in labels_names:
  print(f'{name} - {len(dictionary[name])}')

dry - 355509
fresh - 73560
ice - 54092
melted - 61093
water - 147257
wet - 267486


In [148]:
dict2 = {}

for name in labels_names:
  if name not in dict2:
    dict2[name] = []
  dict2[name] = dictionary[name][:100]

In [149]:
for name in labels_names:
  print(f'{name} - {len(dict2[name])}')

dry - 100
fresh - 100
ice - 100
melted - 100
water - 100
wet - 100


In [150]:
dict2["dry"][0]

'/content/data/RSCD dataset-1million/train/dry_concrete_slight/20220704230245451-dry-concrete-slight.jpg'

In [151]:
import os
from PIL import Image

image_list = []

arr = []

for name in dict2:
  for filename in dict2[name]:
    arr.append(filename)

# Loop through all the files in the directory
for filename in arr:
  img = Image.open(filename)
  # Append the image to the list
  image_list.append(img)

In [152]:
len(arr)

600

In [153]:
labels = []

for filename in arr:
  for name in labels_names:
    if name in filename:
      labels.append(name)
      break


# for filename in arr:
#   labels.append([name for name in labels_names if name in filename])
#   if 1 < len([name for name in labels_names if name in filename]):
#     print(filename)



In [154]:
labels_names
len(image_list)


600

In [164]:
from sklearn import preprocessing
import torch

le = preprocessing.LabelEncoder()
targets = le.fit_transform(labels_names)
# targets: array([0, 1, 2, 3, 4])

targets = torch.as_tensor(targets)

In [166]:
labels2 = labels
labels = le.transform(labels)

In [169]:
labels

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3,

In [170]:
import torch
from torchvision import transforms

# Define the transformations to apply to the images
transform = transforms.Compose([
    transforms.ToTensor()
])

# Define the dataset
class ImageListDataset(torch.utils.data.Dataset):
    def __init__(self, image_list, labels, transform=None):
        self.image_list = image_list
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, index):
        image = self.image_list[index]
        if self.transform:
            image = self.transform(image)
        label = self.labels[index]
        return image, label

dataset = ImageListDataset(image_list, labels, transform=transform)

# Define the data loader

from torch.utils.data import random_split

[training_data, testing_data] = random_split(dataset, [0.7, 0.3], generator=torch.Generator().manual_seed(69))

torch.manual_seed(69)

dataloader_train = torch.utils.data.DataLoader(training_data, batch_size=32, shuffle=True)
dataloader_test = torch.utils.data.DataLoader(testing_data, batch_size=32, shuffle=True)

In [171]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()
    
    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch 
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [172]:
def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval() 
    
    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
    
            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()
            
            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            
    # Adjust metrics to get average loss and accuracy per batch 
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [173]:
from tqdm.auto import tqdm

def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5):
  
  results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

  for epoch in tqdm(range(epochs)):
      train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer)
      test_loss, test_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn)
      
      # 4. Print out what's happening
      print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
      )

      # 5. Update results dictionary
      results["train_loss"].append(train_loss)
      results["train_acc"].append(train_acc)
      results["test_loss"].append(test_loss)
      results["test_acc"].append(test_acc)

  # 6. Return the filled results at the end of the epochs
  return results

In [174]:
labels

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3,

In [175]:
torch.manual_seed(69)

model_results = train(model, dataloader_train, dataloader_test, optimizer, loss_fn, num_epochs)

  0%|          | 0/10 [00:00<?, ?it/s]

Epoch: 1 | train_loss: 1.0919 | train_acc: 0.5848 | test_loss: 1.9211 | test_acc: 0.4250
Epoch: 2 | train_loss: 1.1439 | train_acc: 0.5826 | test_loss: 1.9601 | test_acc: 0.3969
Epoch: 3 | train_loss: 1.1969 | train_acc: 0.5938 | test_loss: 2.2645 | test_acc: 0.3573
Epoch: 4 | train_loss: 1.1265 | train_acc: 0.5915 | test_loss: 2.3095 | test_acc: 0.4729
Epoch: 5 | train_loss: 1.0463 | train_acc: 0.5781 | test_loss: 3.7173 | test_acc: 0.3917
Epoch: 6 | train_loss: 1.0796 | train_acc: 0.5804 | test_loss: 1.0825 | test_acc: 0.6115
Epoch: 7 | train_loss: 1.0096 | train_acc: 0.6295 | test_loss: 1.3008 | test_acc: 0.5698
Epoch: 8 | train_loss: 1.0161 | train_acc: 0.6094 | test_loss: 1.7976 | test_acc: 0.4115
Epoch: 9 | train_loss: 1.1040 | train_acc: 0.5580 | test_loss: 2.5339 | test_acc: 0.4896
Epoch: 10 | train_loss: 1.0574 | train_acc: 0.5714 | test_loss: 2.4986 | test_acc: 0.5156


In [179]:
torch.save(model.state_dict(), "/content/model_save_0.pth")

In [181]:
model1 = ResNet18(6)
model1.load_state_dict(torch.load("/content/model_save_0.pth"))

<All keys matched successfully>

In [182]:
from torch.autograd.grad_mode import inference_mode
model1.eval()

img1 = dictionary["wet"][1666]

with inference_mode:
  

IndentationError: ignored