<a href="https://colab.research.google.com/github/deepw98/project2/blob/main/project2_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!cp -r /content/drive/MyDrive/fire_detection_few_shot /content/fire_detection_few_shot

In [None]:
folder_path = '/content/fire_detection_few_shot'

In [None]:
from PIL import Image
import os

def resize_images(folder_path):
    for img_name in os.listdir(folder_path):
        img_path = os.path.join(folder_path, img_name)
        img = Image.open(img_path)
        img = img.resize((224, 224))  # Resize to 224x224
        img.save(img_path)  # Overwrite with resized image

resize_images("fire_detection_few_shot/train/Fire")
resize_images("fire_detection_few_shot/train/No_Fire")


In [None]:
# import random
# import os

# def create_episode(root_dir, N=2, K=5):
#     """
#     Create an episode with:
#     - N classes (fire vs. no fire)
#     - K support images per class
#     - 1 query image
#     """
#     episode = {"support": [], "query": [], "labels": []}

#     # Define class folders
#     class_folders = ["Fire", "No_Fire"]

#     # Loop over classes
#     for label, folder in enumerate(class_folders):
#         folder_path = os.path.join(root_dir, folder)
#         images = os.listdir(folder_path)

#         # Pick K images for support set
#         support_images = random.sample(images, K)
#         episode["support"].extend([(os.path.join(folder_path, img), label) for img in support_images])

#         # Pick 1 query image (not in support set)
#         remaining_images = list(set(images) - set(support_images))
#         query_image = random.choice(remaining_images)
#         episode["query"].append((os.path.join(folder_path, query_image), label))

#     return episode


In [None]:
# from torch.utils.data import Dataset
# from PIL import Image
# import torch

# class FewShotDataset(Dataset):
#     def __init__(self, root_dir, transform=None, N=2, K=5):
#         self.root_dir = root_dir
#         self.transform = transform
#         self.N = N
#         self.K = K

#     def __getitem__(self, index):
#         episode = create_episode(self.root_dir, self.N, self.K)

#         # Load images
#         support_images = [Image.open(img_path).convert("RGB") for img_path, _ in episode["support"]]
#         query_images = [Image.open(img_path).convert("RGB") for img_path, _ in episode["query"]]

#         # Apply transformations
#         if self.transform:
#             support_images = [self.transform(img) for img in support_images]
#             query_images = [self.transform(img) for img in query_images]

#         # Convert labels to tensors
#         support_labels = torch.tensor([label for _, label in episode["support"]])
#         query_labels = torch.tensor([label for _, label in episode["query"]])

#         return support_images, query_images, support_labels, query_labels

#     def __len__(self):
#         return 1000  # Arbitrary large number (episodic dataset)


In [None]:
# from torch.utils.data import DataLoader
# import torchvision.transforms as transforms

# transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
# ])

# dataset = FewShotDataset(root_dir="fire_detection_few_shot/train", transform=transform)
# dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

# # Test data loading
# support_images, query_images, support_labels, query_labels = next(iter(dataloader))
# print("Support Images:", len(support_images))
# print("Query Images:", len(query_images))


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
import numpy as np
import random
import os
from PIL import Image


In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self, backbone='resnet50'):
        super(SiameseNetwork, self).__init__()

        # Load pretrained ResNet50 and remove fully connected layer
        self.backbone = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        self.backbone.fc = nn.Identity()  # Remove classification head

        # Projection head (reduces dimensionality)
        self.fc = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Linear(512, 128)  # Output 128-d embeddings
        )

    def forward(self, x1, x2):
        emb1 = self.fc(self.backbone(x1))
        emb2 = self.fc(self.backbone(x2))
        return emb1, emb2  # Return embeddings


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
import os
from PIL import Image
from torch.utils.data import DataLoader, Dataset

transform = transforms.Compose([
     transforms.Resize((224, 224)),
     transforms.ToTensor(),
     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
 ])

# 1️⃣ Optimized Dataset (Preload Images into RAM)
class FireDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform

        fire_dir = os.path.join(root_dir, "Fire")
        no_fire_dir = os.path.join(root_dir, "No_Fire")

        self.fire_images = [os.path.join(fire_dir, img) for img in os.listdir(fire_dir)]
        self.no_fire_images = [os.path.join(no_fire_dir, img) for img in os.listdir(no_fire_dir)]

        self.min_length = min(len(self.fire_images), len(self.no_fire_images))  # Ensure equal pairs

    def __len__(self):
        return self.min_length  # Number of pairs

    def __getitem__(self, index):
        fire_img = Image.open(self.fire_images[index]).convert("RGB")
        no_fire_img = Image.open(self.no_fire_images[index]).convert("RGB")

        if self.transform:
            fire_img = self.transform(fire_img)
            no_fire_img = self.transform(no_fire_img)

        return fire_img, no_fire_img, torch.tensor(1.0)  # Label = 1 (fire and no-fire pair)



In [None]:
# import torch.nn as nn

# class SiameseNetwork(nn.Module):
#     def __init__(self):
#         super(SiameseNetwork, self).__init__()
#         self.cnn = nn.Sequential(
#             nn.Conv2d(3, 64, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(),
#             nn.BatchNorm2d(64),  # Add BN
#             nn.MaxPool2d(2, 2),

#             nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(),
#             nn.BatchNorm2d(128),  # Add BN
#             nn.MaxPool2d(2, 2),

#             nn.Conv2d(128, 256, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(),
#             nn.BatchNorm2d(256),  # Add BN
#             nn.MaxPool2d(2, 2),
#         )
#         self.fc = nn.Sequential(
#             nn.Linear(256 * 28 * 28, 512),
#             nn.ReLU(),
#             nn.BatchNorm1d(512),  # Add BN
#             nn.Linear(512, 128),
#         )

#     def forward(self, x1, x2):
#      emb1 = self.cnn(x1)
#      emb1 = emb1.view(emb1.size(0), -1)
#      emb1 = self.fc(emb1)

#      emb2 = self.cnn(x2)
#      emb2 = emb2.view(emb2.size(0), -1)
#      emb2 = self.fc(emb2)

#      return emb1, emb2  # Return both embeddings



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ContrastiveLoss(nn.Module):
    def __init__(self, margin=2.0):  # Increased margin
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # Compute Euclidean distance
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim=True)

        # Apply contrastive loss function
        loss = torch.mean(
            label * torch.pow(euclidean_distance, 2) +
            (1 - label) * torch.pow(torch.relu(self.margin - euclidean_distance), 2)  # Improved stability
        )
        return loss

        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = torch.nn.functional.pairwise_distance(output1, output2)
        loss = torch.mean(label * torch.pow(euclidean_distance, 2) +
                          (1 - label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss

In [None]:
def train_siamese(model, dataloader, optimizer, device, num_epochs=500):
    model.train()
    criterion = ContrastiveLoss()  # Use optimized loss

    for epoch in range(num_epochs):
        total_loss = 0
        for img1, img2, label in dataloader:
            img1, img2, label = img1.to(device), img2.to(device), label.to(device)

            optimizer.zero_grad()
            output1, output2 = model(img1, img2)
            loss = criterion(output1, output2, label)  # Use optimized loss
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        if epoch % 50 == 0:
            print(f"Epoch [{epoch}/{num_epochs}], Loss: {total_loss / len(dataloader):.4f}")

  print("Training complete!")

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transform = transforms.Compose([
     transforms.Resize((224, 224)),
     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
 ])

dataset = FireDataset(root_dir="/content/fire_detection_few_shot/train", transform=transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=2, pin_memory=True)

model = SiameseNetwork().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001)

train_siamese(model, dataloader, optimizer, device, num_epochs=200)



Epoch [0/200], Loss: 0.6476
Epoch [50/200], Loss: 0.0002


KeyboardInterrupt: 

In [None]:
class FireTestDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.fire_images = [os.path.join(root_dir, "Fire", img) for img in os.listdir(os.path.join(root_dir, "Fire"))]
        self.no_fire_images = [os.path.join(root_dir, "No_Fire", img) for img in os.listdir(os.path.join(root_dir, "No_Fire"))]

    def __len__(self):
        return min(len(self.fire_images), len(self.no_fire_images))

    def __getitem__(self, index):
     fire_img_path = self.fire_images[index]  # Get file path
     no_fire_img_path = self.no_fire_images[index]  # Get file path

    # Load images
     fire_img = Image.open(fire_img_path).convert("RGB")
     no_fire_img = Image.open(no_fire_img_path).convert("RGB")

    # Apply transforms
     if self.transform:
        fire_img = self.transform(fire_img)
        no_fire_img = self.transform(no_fire_img)

     return fire_img, no_fire_img, torch.tensor(1)  # Include label

In [None]:
def test_siamese(model, dataloader, device, threshold=0.5):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():
     for img1, img2, label in dataloader:  # Unpack all three values
            img1, img2 = img1.to(device), img2.to(device)

            output1, output2 = model(img1, img2)  # Get embeddings
            distance = torch.abs(output1 - output2)  # Calculate absolute difference
            similarity = torch.sigmoid(torch.sum(distance, dim=1))  # Convert to probability

            predictions = (similarity > threshold).float()  # Predict class (1=similar, 0=dissimilar)
            total += predictions.shape[0]
            correct += (predictions == 1).sum().item()  # Assuming 1 means "fire vs no fire"

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")


In [None]:
# Load test data
test_dataset = FireTestDataset(root_dir="/content/fire_detection_few_shot/test", transform=transform)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False)

# Run inference
test_siamese(model, test_dataloader, device)


RuntimeError: stack expects each tensor to be equal size, but got [3, 250, 201] at entry 0 and [3, 197, 256] at entry 1

In [None]:
def test_siamese_debug(model, dataloader, device, threshold=0.5):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
     for img1, img2, label in dataloader:  # Unpack all three values
            img1, img2 = img1.to(device), img2.to(device)

            output1, output2 = model(img1, img2)
            distance = torch.abs(output1 - output2)
            similarity = torch.sigmoid(torch.sum(distance, dim=1))  # Convert to probability

            predictions = (similarity > threshold).float()

            print(f"Similarity Scores: {similarity.cpu().numpy()}")  # Debug output

            total += predictions.shape[0]
            correct += (predictions == 1).sum().item()

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")

# Run debug test
test_siamese_debug(model, test_dataloader, device)


RuntimeError: stack expects each tensor to be equal size, but got [3, 250, 201] at entry 0 and [3, 197, 256] at entry 1

In [None]:
import torch.nn.functional as F

def test_siamese_cosine(model, dataloader, device, threshold=0.5):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
     for img1, img2, label in dataloader:  # Unpack all three values
            img1, img2 = img1.to(device), img2.to(device)


            output1, output2 = model(img1, img2)
            similarity = F.cosine_similarity(output1, output2)  # Cosine similarity

            predictions = (similarity > threshold).float()

            print(f"Cosine Similarity Scores: {similarity.cpu().numpy()}")  # Debug output

            total += predictions.shape[0]
            correct += (predictions == 1).sum().item()

    accuracy = correct / total
    print(f"Test Accuracy (Cosine Similarity): {accuracy:.4f}")

# Run test with cosine similarity
test_siamese_cosine(model, test_dataloader, device, threshold=0.9)


RuntimeError: stack expects each tensor to be equal size, but got [3, 250, 201] at entry 0 and [3, 197, 256] at entry 1

In [None]:
sample_batch = next(iter(test_dataloader))
print(len(sample_batch))  # How many items are in the batch?
for i, item in enumerate(sample_batch):
    print(f"Item {i} shape: {item.shape}")


RuntimeError: stack expects each tensor to be equal size, but got [3, 250, 201] at entry 0 and [3, 197, 256] at entry 1

In [None]:
img1, img2 = next(iter(test_dataloader))  # No labels in test set
img1, img2 = img1.to(device), img2.to(device)

# Compute similarity
with torch.no_grad():
    output1, output2 = model(img1, img2)
    similarity = F.cosine_similarity(output1, output2)

# Print results
for i in range(len(similarity)):
    print(f"Pair {i+1} Similarity Score: {similarity[i].item():.4f}")

# Visualize first pair
plt.subplot(1, 2, 1)
plt.imshow(img1[0].cpu().permute(1, 2, 0))  # Convert Tensor to Image
plt.title("Image 1")

plt.subplot(1, 2, 2)
plt.imshow(img2[0].cpu().permute(1, 2, 0))
plt.title("Image 2")

plt.show()


RuntimeError: stack expects each tensor to be equal size, but got [3, 250, 201] at entry 0 and [3, 197, 256] at entry 1

In [None]:
import seaborn as sns
import torch

# Compute embeddings
with torch.no_grad():
    emb1, emb2 = model(img1.to(device), img2.to(device))

# Convert to NumPy for visualization
emb1 = emb1.cpu().numpy()
emb2 = emb2.cpu().numpy()

# Plot heatmaps
sns.heatmap(emb1[0].reshape(1, -1), cmap="coolwarm")
sns.heatmap(emb2[0].reshape(1, -1), cmap="coolwarm")


NameError: name 'img1' is not defined

In [None]:
import torch

img1, img2, _ = next(iter(test_dataloader))
img1, img2 = img1.to(device), img2.to(device)

with torch.no_grad():
    emb1, emb2 = model(img1, img2)

print("Embedding 1 Mean:", torch.mean(emb1, dim=1))
print("Embedding 2 Mean:", torch.mean(emb2, dim=1))


RuntimeError: stack expects each tensor to be equal size, but got [3, 250, 201] at entry 0 and [3, 197, 256] at entry 1

In [None]:
print("Embedding 1 Std:", torch.std(emb1, dim=0))
print("Embedding 2 Std:", torch.std(emb2, dim=0))


Embedding 1 Std: tensor([0.0232, 0.0301, 0.0184, 0.0113, 0.0180, 0.0125, 0.0182, 0.0166, 0.0162,
        0.0183, 0.0144, 0.0178, 0.0123, 0.0130, 0.0219, 0.0110, 0.0163, 0.0174,
        0.0158, 0.0155, 0.0160, 0.0114, 0.0202, 0.0138, 0.0113, 0.0167, 0.0132,
        0.0141, 0.0166, 0.0218, 0.0106, 0.0182, 0.0137, 0.0161, 0.0189, 0.0158,
        0.0144, 0.0146, 0.0122, 0.0110, 0.0281, 0.0265, 0.0106, 0.0186, 0.0140,
        0.0131, 0.0170, 0.0159, 0.0215, 0.0154, 0.0216, 0.0117, 0.0162, 0.0089,
        0.0195, 0.0195, 0.0206, 0.0144, 0.0141, 0.0138, 0.0119, 0.0148, 0.0130,
        0.0126, 0.0108, 0.0191, 0.0203, 0.0202, 0.0195, 0.0127, 0.0241, 0.0178,
        0.0138, 0.0168, 0.0191, 0.0160, 0.0157, 0.0147, 0.0172, 0.0187, 0.0265,
        0.0196, 0.0220, 0.0226, 0.0254, 0.0175, 0.0163, 0.0176, 0.0162, 0.0066,
        0.0175, 0.0162, 0.0157, 0.0223, 0.0105, 0.0120, 0.0177, 0.0137, 0.0133,
        0.0149, 0.0182, 0.0174, 0.0086, 0.0145, 0.0124, 0.0150, 0.0219, 0.0159,
        0.0095, 0.0096,

In [None]:
from torch.nn.functional import pairwise_distance

distances = pairwise_distance(emb1, emb2)
print("Distance Mean:", torch.mean(distances))
print("Distance Std:", torch.std(distances))


Distance Mean: tensor(0.2541, device='cuda:0')
Distance Std: tensor(0.0527, device='cuda:0')
