In [None]:
''' TESTING CELL
Run this cell to test the trained model on test set.
Instructions to Run:
    1. Uncomment the drive.mount line to load dataset from drive
    2. Define test_loader to the test set in the inference loop.
    3. Define criterion function to get the result metrics or comment out the line in the inference loop.
    4. Replace the paths to the generator.pt and model.pt
    5. Run the cell to obtain the test results
'''

# Import necessary libraries
from google.colab import drive
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import Dataset
from torchvision import transforms
import numpy as np
from PIL import Image
from PIL import ImageFilter
from tqdm import tqdm
import os
import sys
from sklearn.metrics import accuracy_score, confusion_matrix

# Mount Google Drive if needed
# drive.mount('/content/drive') # Uncomment if loading from Google Drive

# define transformations
transform = transforms.Compose([transforms.Resize((256,256)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ])

# Dataset class for paired original and corrupted images
class PairedImageDataset(Dataset):
    def __init__(self, root_dir_original, root_dir_corrupted, transform=None):
        self.root_dir_original = root_dir_original
        self.root_dir_corrupted = root_dir_corrupted
        self.transform = transform

        self.original_image_list = os.listdir(root_dir_original)
        self.corrupted_image_list = os.listdir(root_dir_corrupted)

    def __len__(self):
        return min(len(self.original_image_list), len(self.corrupted_image_list))

    def __getitem__(self, idx):
        original_img_name = self.original_image_list[idx]
        corrupted_img_name = self.corrupted_image_list[idx]

        original_img_path = os.path.join(self.root_dir_original, original_img_name)
        corrupted_img_path = os.path.join(self.root_dir_corrupted, corrupted_img_name)

        # Load the original and corrupted images
        original_img = Image.open(original_img_path).convert('RGB')
        corrupted_img = Image.open(corrupted_img_path).convert('RGB')

        # Apply transformations
        if self.transform:
            original_img = self.transform(original_img)
            corrupted_img = self.transform(corrupted_img)

        return original_img, corrupted_img, corrupted_img_name

# Generator class for image inpainting
class Generator(nn.Module):
    def __init__(self,num_channels=3,latent_dim=100):
        super(Generator,self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(num_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(512),
            nn.Flatten(),
        )
        self.fc = nn.Linear(512 * 16 * 16, latent_dim)

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 512 * 16 * 16),
            nn.ReLU(inplace=True),
            nn.Unflatten(1, (512, 16, 16)),
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.ConvTranspose2d(64, num_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )
    def forward(self,x):
        x = self.encoder(x)
        x = self.fc(x)
        x = self.decoder(x)
        return x

# BasicBlock class for ResNet
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out

# ResNet class for image classification
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Function to create a ResNet-50 model
def resnet50():
    return ResNet(BasicBlock, [3, 4, 6, 3])

# Instantiate models for inpainting and classification
model_inpainting = Generator()
g = torch.load('/content/drive/MyDrive/generator.pt') # Replace with path to generator.pt
model_inpainting.load_state_dict(g)

model_classification = resnet50()
m = torch.load('/content/drive/MyDrive/model.pt') #Replace with path to model.pt
model_classification.load_state_dict(m)

# Set models to evaluation mode
model_inpainting.eval()
model_classification.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move models to CPU or GPU
model_inpainting.to(device)
model_classification.to(device)

p = torch.tensor([])
t = torch.tensor([])

# Test the models on the test_loader
for data, target in test_loader:
    data = data.to(device)

    # Generate inpainted image
    inpainted_img_tensor = model_inpainting(data)

    # Classify the inpainted image
    output = model_classification(inpainted_img_tensor)

    # Calculate and print loss
    loss = criterion(output, target)
    _, pred = torch.max(output, 1)
    # t=torch.cat((t,target.cpu()))
    # p=torch.cat((p,pred.cpu()))
    # acc = accuracy_score(target.cpu(),pred.cpu())
    # print(f'pred: {pred}\n target: {target} \n {acc}')

In [None]:
from google.colab import drive
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import Dataset
from torchvision import transforms
import numpy as np
from PIL import Image
from PIL import ImageFilter
from tqdm import tqdm
import os
import sys
from sklearn.metrics import accuracy_score, confusion_matrix
drive.mount('/content/drive')

In [None]:
!unzip '/content/drive/MyDrive/train+val.zip'
!mkdir train/all
!mkdir train/all_paired
!cp train/Flood/* train/all
!cp train/Hurricane/* train/all
!cp train/Earthquake/* train/all
!cp train/Landslides/* train/all
!cp train/Wildfire/* train/all
!cp train/Earthquake_paired/* train/all_paired
!cp train/Flood_paired/* train/all_paired
!cp train/Hurricane_paired/* train/all_paired
!cp train/Landslides_paired/* train/all_paired
!cp train/Wildfire_paired/* train/all_paired
!rm train/all/*.txt
!rm train/all_paired/*.txt

In [None]:
# @title
class PairedImageDataset(Dataset):
    def __init__(self, root_dir_original, root_dir_corrupted, transform=None):
        self.root_dir_original = root_dir_original
        self.root_dir_corrupted = root_dir_corrupted
        self.transform = transform

        self.original_image_list = os.listdir(root_dir_original)
        self.corrupted_image_list = os.listdir(root_dir_corrupted)

    def __len__(self):
        return min(len(self.original_image_list), len(self.corrupted_image_list))

    def __getitem__(self, idx):
        original_img_name = self.original_image_list[idx]
        corrupted_img_name = self.corrupted_image_list[idx]

        original_img_path = os.path.join(self.root_dir_original, original_img_name)
        corrupted_img_path = os.path.join(self.root_dir_corrupted, corrupted_img_name)

        # Load the original and corrupted images
        original_img = Image.open(original_img_path).convert('RGB')
        corrupted_img = Image.open(corrupted_img_path).convert('RGB')

        # Apply transformations
        if self.transform:
            original_img = self.transform(original_img)
            corrupted_img = self.transform(corrupted_img)

        return original_img, corrupted_img, corrupted_img_name

In [None]:
# Specify the path to your datasets
original_dataset_root = '/content/train/all'
corrupted_dataset_root = '/content/train/all_paired'
transform = transforms.Compose([transforms.Resize((256,256)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ])
# Create the dataset
paired_image_dataset = PairedImageDataset(root_dir_original=original_dataset_root,
                                          root_dir_corrupted=corrupted_dataset_root,
                                          transform=transform)

# Create the dataloader
batch_size = 64
dataloader = DataLoader(paired_image_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

In [None]:
class Generator(nn.Module):
    def __init__(self,num_channels=3,latent_dim=100):
        super(Generator,self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(num_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(512),
            nn.Flatten(),
        )
        self.fc = nn.Linear(512 * 16 * 16, latent_dim)

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 512 * 16 * 16),
            nn.ReLU(inplace=True),
            nn.Unflatten(1, (512, 16, 16)),
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.ConvTranspose2d(64, num_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )
    def forward(self,x):
        x = self.encoder(x)
        x = self.fc(x)
        x = self.decoder(x)
        return x

In [None]:
class Discriminator(nn.Module):
    def __init__(self, num_channels=3):
        super(Discriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Conv2d(num_channels * 2, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.BatchNorm2d(256),
            nn.Flatten(),
            nn.Linear(256 * 32 * 32, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
patch_size = 16
class LocalDiscriminator(nn.Module):
    def __init__(self, input_channels=3, num_patches=4):
        super(LocalDiscriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Conv2d(input_channels * 2, 64, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2,inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2,inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2,inplace=True),
            nn.BatchNorm2d(256),
            nn.Flatten(),
            nn.Linear(256*16*16,1),
            nn.Sigmoid()
        )
        self.fc = nn.Linear(128 * patch_size * patch_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
patch_size = 64
class LocalDiscriminator64(nn.Module):
    def __init__(self, input_channels=3, num_patches=4):
        super(LocalDiscriminator64, self).__init__()

        self.model = nn.Sequential(
            nn.Conv2d(input_channels * 2, 64, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2,inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2,inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2,inplace=True),
            nn.BatchNorm2d(256),
            nn.Flatten(),
            nn.Linear(256*8*8,1),
            nn.Sigmoid()
        )
        self.fc = nn.Linear(128 * patch_size * patch_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_channels=3
def gan_loss(generator, discriminator, local_discriminator, original_images, corrupted_images, lambda_recon=100, lambda_global=100, lambda_local=100):
    generator=generator.to(device)
    discriminator=discriminator.to(device)
    local_discriminator=local_discriminator.to(device)
    original_images=original_images.to(device)
    corrupted_images=corrupted_images.to(device)

    # Forward pass through the generator
    inpainted_images = generator(corrupted_images)

    # Concatenate original and inpainted images along the channel dimension
    combined_images = torch.cat((original_images, inpainted_images), dim=1)
    combined_images=combined_images.to(device)

    # Forward pass through the discriminator
    discriminator_output = discriminator(combined_images)

    # Target labels for the discriminator
    real_labels = torch.ones_like(discriminator_output)
    fake_labels = torch.zeros_like(discriminator_output)

    # Reconstruction loss (L1 loss)
    recon_loss = nn.functional.l1_loss(inpainted_images, original_images) * lambda_recon

    # Global discriminator loss (binary cross-entropy)
    global_discriminator_loss = F.binary_cross_entropy(discriminator_output, real_labels) * lambda_global

    # Local discriminator loss (binary cross-entropy on local patches)
    local_discriminator_loss = 0.0
    patch_size = 64

    for i in range(0, original_images.size(2) - patch_size + 1, patch_size):
        for j in range(0, original_images.size(3) - patch_size + 1, patch_size):
            # Extract local patches from original and inpainted images
            original_patch = original_images[:, :, i:i+patch_size, j:j+patch_size]
            inpainted_patch = inpainted_images[:, :, i:i+patch_size, j:j+patch_size]

            # Concatenate patches along the channel dimension
            combined_patch = torch.cat((original_patch, inpainted_patch), dim=1)
            combined_patch = combined_patch.to(device)
            # Forward pass through the local discriminator
            discriminator_output_patch = local_discriminator(combined_patch)
            # Compute local discriminator loss
            local_discriminator_loss += F.binary_cross_entropy(discriminator_output_patch, real_labels) * lambda_local

    # Total GAN loss
    total_loss = recon_loss + global_discriminator_loss + local_discriminator_loss

    return total_loss, recon_loss, global_discriminator_loss, local_discriminator_loss

In [None]:
num_epochs=50
generator = Generator()
discriminator = Discriminator()
local_discriminator = LocalDiscriminator64()

generator = generator.to(device)
discriminator = discriminator.to(device)
local_discriminator = local_discriminator.to(device)

lambda_recon = 10
lambda_global = 1
lambda_local = 1


optimizer_generator = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_discriminator = torch.optim.Adam(discriminator.parameters(), lr=0.00001, betas=(0.5, 0.999))
optimizer_local_discriminator = torch.optim.Adam(local_discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

optimizer_generator.zero_grad()
optimizer_discriminator.zero_grad()
optimizer_local_discriminator.zero_grad()

for epoch in range(num_epochs):
    for i, (original_images, corrupted_images) in tqdm(enumerate(dataloader)):
        # Set model gradients to zero

        # Forward pass and calculate GAN loss
        total_loss, recon_loss, global_discriminator_loss, local_discriminator_loss = gan_loss(generator, discriminator, local_discriminator, original_images, corrupted_images, lambda_recon, lambda_global, lambda_local)

        # Backward pass and optimization steps
        total_loss.backward()
        optimizer_generator.step()
        optimizer_discriminator.step()
        optimizer_local_discriminator.step()
        optimizer_generator.zero_grad()
        optimizer_discriminator.zero_grad()
        optimizer_local_discriminator.zero_grad()

        # Print training statistics
        if i % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(dataloader)}], recon Loss: {recon_loss.item():.6f}, GD Loss: {global_discriminator_loss.item():.6f}, Local Loss: {local_discriminator_loss:.6f}, Total Loss: {total_loss.item():.6f}')
    if epoch%5==4:
        torch.save(generator.state_dict(), '/content/drive/MyDrive/generator.pt')
        torch.save(discriminator.state_dict(), '/content/drive/MyDrive/discriminator.pt')
        torch.save(local_discriminator.state_dict(), '/content/drive/MyDrive/local_discriminator.pt')
        print(f'Models saved successfully on epoch: {epoch+1}')

In [None]:
train_dataset = datasets.ImageFolder('/content/Train',transform=transform)
validation_dataset = datasets.ImageFolder('/content/validation',transform=transform)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=64, shuffle=False)

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

def resnet50():
    return ResNet(BasicBlock, [3, 4, 6, 3])

In [None]:
# Instantiate the model, optimizer, and loss function
model = resnet50()

model = model.to(device)
optimizer = optim.Adam(model.parameters(),lr=0.005,betas=(0.9,0.999))
criterion = nn.CrossEntropyLoss()

# Print the model architecture
print(model)

In [None]:
# Training Loop

num_epochs = 25
model.eval()
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad()
        inputs = inputs.to(device)
        outputs = model(inputs)
        labels = labels.to(device)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    # Calculate training accuracy
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for inputs, labels in tqdm(validation_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
    accuracy = accuracy_score(all_labels, all_preds)
    average_loss = running_loss / len(train_loader)
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {average_loss:.4f}, Accuracy: {accuracy * 100:.2f}%')
    if epoch%5==4:
        torch.save(model.state_dict(), '/content/drive/MyDrive/model.pt')

In [None]:
# Prediction
model.eval()
all_predictions = []
with torch.no_grad():
    for inputs, labels in validation_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predictions = torch.max(outputs, 1)
        predictions = predictions.cpu()
        all_predictions.extend(predictions.numpy())
result = torch.tensor(all_predictions)

In [None]:
validation_loader = DataLoader(validation_dataset, batch_size=1, shuffle=False)
true_values = []
for input, label in validation_loader:
    true_values.extend(label)

In [None]:
pred = np.array(result)
val = np.array(true_values)
accuracy = accuracy_score(val, pred)
print(f"Accuracy: {accuracy:.2f}")

conf_matrix = confusion_matrix(val, pred)
print("Confusion Matrix:")
print(conf_matrix)

Accuracy: 0.78
Confusion Matrix:
[[355   9  17   0   5]
 [ 65 175  64   0   2]
 [ 28  14 153   0   0]
 [  5   7   1  46   2]
 [ 42  12   9   4 269]]


In [None]:
!rm /content/validation/all/*
!rm /content/validation/Earthquake/*
!rm /content/validation/Flood/*
!rm /content/validation/Hurricane/*
!rm /content/validation/Landslides/*
!rm /content/validation/Wildfire/*
!rmdir /content/validation/all
!rmdir /content/validation/Earthquake
!rmdir /content/validation/Flood
!rmdir /content/validation/Hurricane
!rmdir /content/validation/Landslides
!rmdir /content/validation/Wildfire
!rm /content/validation/all_paired/*

rm: cannot remove '/content/validation/all/*': No such file or directory
rmdir: failed to remove '/content/validation/all': No such file or directory
rm: cannot remove '/content/validation/all_paired/*': No such file or directory
