In [None]:
import os
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from google.colab import drive
drive.mount('/content/drive')
from torchvision import datasets, transforms
from sklearn.metrics import confusion_matrix, f1_score, classification_report

In [None]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = '/content/drive/MyDrive/kaggle'

In [None]:
#Kaggle API command
!kaggle datasets download -d mahmoudreda55/satellite-image-classification

In [None]:
import zipfile

file_path = '/content/satellite-image-classification.zip'

with zipfile.ZipFile(file_path, 'r') as zip_ref:
    zip_ref.extractall('/content')

In [None]:
transform = transforms.Compose([transforms.Resize((180, 180)), transforms.ToTensor(), transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])
data_path = '/content/data/'
dataset = datasets.ImageFolder(root=data_path, transform=transform)

In [None]:
from PIL import Image
class SatelliteDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        genre_folders = os.listdir(root_dir)

        for label, genre_folder in enumerate(genre_folders):
            genre_path = os.path.join(root_dir, genre_folder)
            image_files = os.listdir(genre_path)

            self.image_paths.extend([os.path.join(genre_path, img) for img in image_files])
            self.labels.extend([label] * len(image_files))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):

        img_path = self.image_paths[idx]
        label = self.labels[idx]
        img = Image.open(img_path).convert('RGB')

        if self.transform:
            img = self.transform(img)

        return img, labels

custom_dataset = SatelliteDataset(data_path, transform=transform)

In [None]:
dataset_size = len(dataset)
train_size = int(0.7 * dataset_size)
val_size = int(0.2 * dataset_size)
test_size = dataset_size - train_size - val_size

# Split the dataset randomly
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, num_workers=0, shuffle=True)
val_loader = DataLoader(val_dataset, num_workers=0, batch_size=32)
test_loader = DataLoader(test_dataset, num_workers=0, batch_size=32)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
import torch.nn.functional as F

class RCNN(nn.Module):
    def __init__(self, num_classes):
        super(RCNN, self).__init__()
        # Backbone CNN (e.g., ResNet)
        self.backbone = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=7, stride=2, padding=3),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            ResNetBlock(64, 64),
            ResNetBlock(64, 64),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )
        # Region Proposal Network (RPN)
        self.rpn = RPN(in_channels=64)
        # Region-based ROI Pooling
        self.roi_pooling = nn.AdaptiveMaxPool2d((7, 7))
        # Fully connected layers for classification
        self.fc1 = nn.Linear(7 * 7 * 64, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        # Backbone CNN
        features = self.backbone(x)
        # Region Proposal Network (RPN)
        rpn_output = self.rpn(features)
        # Region-based ROI Pooling
        roi_features = self.roi_pooling(features)
        # Flatten ROI features
        roi_features = roi_features.view(roi_features.size(0), -1)
        # Fully connected layers
        x = F.relu(self.fc1(roi_features))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity  # Residual connection
        out = self.relu(out)
        return out

class RPN(nn.Module):
    def __init__(self, in_channels):
        super(RPN, self).__init__()
        self.conv = nn.Conv2d(in_channels, 256, kernel_size=3, padding=1)
        self.bn = nn.BatchNorm2d(256)
        self.relu = nn.ReLU(inplace=True)
        self.cls_layer = nn.Conv2d(256, 2, kernel_size=1)  # Classification layer
        self.reg_layer = nn.Conv2d(256, 4, kernel_size=1)  # Regression layer

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        cls_output = self.cls_layer(x)
        reg_output = self.reg_layer(x)
        return cls_output, reg_output

In [None]:
num_classes = 4
learning_rate = 0.001
batch_size = 32

In [None]:
model = RCNN(num_classes).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

In [None]:
# Define training function
def train_model(model, criterion, optimizer, train_loader, num_epochs):
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)  # Move data to device
            optimizer.zero_grad()  # Zero the parameter gradients
            outputs = model(images)  # Forward pass
            loss = criterion(outputs, labels)  # Compute the loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights
            running_loss += loss.item() * images.size(0)  # Accumulate the loss
        epoch_loss = running_loss / len(train_dataset)  # Calculate epoch loss
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")  # Print epoch loss

In [None]:
train_model(model, criterion, optimizer, train_loader, num_epochs=50)

In [None]:
def eval_model(model, loader):
  model.eval()
  correct = 0
  total = 0
  with torch.no_grad():
    for images, labels in loader:
      images, labels = images.to(device), labels.to(device)
      outputs = model(images)  # Forward pass
      _, predicted = torch.max(outputs.data, 1)  # Get predicted labels
      total += labels.size(0)  # Accumulate total count of images
      correct += (predicted == labels).sum().item()  # Count correct predictions
  accuracy = correct / total  # Calculate accuracy
  print("Accuracy: ", accuracy)

In [None]:
eval_model(model, val_loader)

In [None]:
eval_model(model, test_loader)