<a href="https://colab.research.google.com/github/AhmadJamal01/Floodead-Inside/blob/main/Deep_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import torch
import torchvision
import numpy as np
import torch.nn as nn
import torchvision.transforms as transforms
import torch.utils.data as data
import torch.optim as optim
from torchvision import datasets, transforms
from torchvision import models, transforms
from torch.utils.data import DataLoader
from torch.optim import lr_scheduler


## Read the dataset

In [2]:
!pip install gdal > /dev/null

In [None]:
# import gdown
# gdown.download("https://drive.google.com/uc?id=1och-QmNa3FAiS-wssgzCwISbmpSezIi_", "dataset.zip", quiet=False)
# gdown.extractall("dataset.zip")
# path = 'dataset/'

# gdown.download("https://drive.google.com/file/d/1YUbTBFrk9QF0ivR5F640G3dhCMC3XQUZ/view?usp=sharing", "dataset.zip", quiet=False, fuzzy=True)

In [6]:
from google.colab import drive
import zipfile

# Mount your Google Drive
drive.mount('/content/drive')

# Specify the path to your zip file in Google Drive
zip_path = '/content/drive/MyDrive/dataset.zip'

# Extract the zip file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall('/content/dataset/')  # Specify the extraction path

# Access the extracted files
path = '/content/dataset/'  # Update with your extraction path
# You can now work with the extracted files in the extracted_files_path directory


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Prepare the data

In [7]:
data_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

dataset = torchvision.datasets.ImageFolder(root=path, transform=data_transforms)

# Split the dataset into train and test
TRAIN_SIZE = 0.8
VALIDAtION_SIZE = 0.1

train_size = int(TRAIN_SIZE * len(dataset))
validation_size = int(VALIDAtION_SIZE * len(dataset))
test_size = len(dataset) - (train_size + validation_size)
train, validation, test = torch.utils.data.random_split(dataset, (train_size, validation_size, test_size))

batch_size = 32
trainLoader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=2)
validationLoader = torch.utils.data.DataLoader(validation, batch_size=batch_size, shuffle=False, num_workers=2)
testLoader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=False, num_workers=2)



## Device

In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Availabe device for training is: {device}')

Availabe device for training is: cpu


## Model Architecture

### ResNet18

In [None]:
# Define ResNet model 
model = models.resnet18(pretrained=True)

#  Modify the first layer to be able to handle our data
model.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)

# Modify the last layer to work with two classes
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 2)

# Move the model to GPU if possible
model = model.to(device)

# Define loss function, optimizer, and lr schehduler
# criterion: The loss function used for optimization.
criterion = nn.CrossEntropyLoss()
# optimizer: The optimizer to update the model's parameters.
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# step_lr_scheduler: The learning rate scheduler for adjusting the learning rate during training.
step_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)



### DenseNet

In [None]:
# Define DensNet model 
model = models.densenet121(pretrained=True)

# Modify the first layer to be able to handle our data
model.features.conv0 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)

# Modify the last layer to work with two classes
model.classifier = nn.Linear(model.classifier.in_features, 2)

# Move the model to GPU if possible
model = model.to(device)

# Define loss function, optimizer, and lr schehduler
# criterion: The loss function used for optimization.
criterion = nn.CrossEntropyLoss()
# optimizer: The optimizer to update the model's parameters.
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# step_lr_scheduler: The learning rate scheduler for adjusting the learning rate during training.
step_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

### Resenet50

In [26]:
# Define ResNet model 
model = models.resnet50(pretrained=True)

# Modify the last layer to work with two classes
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 2)

# Move the model to GPU if possible
model = model.to(device)

# Define loss function, optimizer, and lr scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
step_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)




## Model Enhancement 

### Dropout
Dropout is a regularization technique that can help prevent overfitting by randomly disabling neurons during training. It can be added after fully connected layers to reduce the model's reliance on specific features and encourage better generalization.

In [27]:
# Add Dropout after the last fully connected layer
num_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Linear(num_features, 512),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(512, 2)
)
# use model.classifier for the densenet


## Training

In [28]:
import time
import copy

def train_model(model, trainloader, validationloader, device, criterion, optimizer, step_lr_scheduler, epochs):
    '''
    This code is designed to train a model, track its performance during training, and return the best model along with the loss and accuracy values.
    '''
    try:
        model = model.to(device)

        since = time.time()
        best_model_wts = copy.deepcopy(model.state_dict())
        best_acc = 0.0

        train_acc = []
        train_loss = []
        valid_acc = []
        valid_loss = []

        for epoch in range(epochs):
            print('-' * 80)
            print(f'Epoch {epoch + 1}/{epochs}')

            for phase in ['train', 'valid']:
                if phase == 'train':
                    model.train()
                    dataloader = trainloader
                    dataset_size = len(trainloader.dataset)
                else:
                    model.eval()
                    dataloader = validationloader
                    dataset_size = len(validationloader.dataset)

                running_loss = 0.0
                running_corrects = 0

                for data in dataloader:
                    # Get the training data items of the current batch
                    inputs, labels = data
                    inputs, labels = inputs.to(device), labels.to(device)

                    with torch.set_grad_enabled(phase == 'train'):
                        # Forward pass
                        outputs = model(inputs)
                        # Predict the current batch
                        _, preds = torch.max(outputs, 1)
                        # Compute loss
                        loss = criterion(outputs, labels)

                        if phase == 'train':
                            # Set the gradients of all the parameters of the model to zero
                            optimizer.zero_grad()
                            # Backward propagation to calculate the gradient
                            loss.backward()
                            # Update the NN weights by using the gradient
                            optimizer.step()

                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                if phase == 'train':
                    step_lr_scheduler.step()

                epoch_loss = running_loss / dataset_size
                epoch_acc = running_corrects.double() / dataset_size

                if phase == 'valid':
                    valid_acc.append(epoch_acc)
                    valid_loss.append(epoch_loss)
                else:
                    train_acc.append(epoch_acc)
                    train_loss.append(epoch_loss)

                print('{} loss: {:.4f} --------------- {} accuracy: {:.4f}'.format(phase, epoch_loss, phase, epoch_acc))

                # deep copy the model
                if phase == 'valid' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since
        print('-' * 80)
        print('Training completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
        print('Best validation accuracy: {:4f}'.format(best_acc))

        # load best model weights
        print('Loading final model weights...')
        model.load_state_dict(best_model_wts)
        return model, train_loss, train_acc, valid_loss, valid_acc

    except Exception as e:
        print(f"An error occurred during training: {str(e)}")
        return None, [], [], [], []


In [30]:
model, train_loss, train_acc, valid_loss, valid_acc = train_model(model, trainLoader, validationLoader, device, criterion, optimizer, step_lr_scheduler, epochs=2)

--------------------------------------------------------------------------------
Epoch 1/2
train loss: 0.0121 --------------- train accuracy: 1.0000
valid loss: 0.0105 --------------- valid accuracy: 1.0000
--------------------------------------------------------------------------------
Epoch 2/2
train loss: 0.0102 --------------- train accuracy: 1.0000
valid loss: 0.0073 --------------- valid accuracy: 1.0000
--------------------------------------------------------------------------------
Training completed in 26m 54s
Best validation accuracy: 1.000000
Loading final model weights...


## Evaluation

In [31]:
def predict(model, dataloader, device):
    y_pred = []
    y_true = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            _, predicted = torch.max(outputs, 1)
            
            y_pred.extend(predicted.cpu().numpy())
            y_true.extend(labels.cpu().numpy())
            
    return np.asarray(y_pred), np.asarray(y_true)

In [32]:
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, f1_score

# Test the model on the test dataset
y_pred, y_true = predict(model, testLoader, device)
        
# Evalaution        
# Calculate accuracy and F1 score
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
print('Accuracy: {:.2f}%'.format(accuracy * 100))
print('F1 Score: {:.2f}'.format(f1))

# Show the whole report
classification_report = classification_report(y_true, y_pred)
print(classification_report)


Accuracy: 100.00%
F1 Score: 0.00
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        93

    accuracy                           1.00        93
   macro avg       1.00      1.00      1.00        93
weighted avg       1.00      1.00      1.00        93



  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))


## Saving the model

In [33]:
# save model
torch.save(model.state_dict(), 'resnet50.pth')