<a href="https://colab.research.google.com/github/Kashara-Alvin-Ssali/ML-models/blob/main/GAT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
!pip install torch torchvision torchaudio torch-geometric networkx scipy numpy opencv-python matplotlib




In [37]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [38]:
import os
import cv2
import numpy as np
import networkx as nx
from scipy.spatial import Delaunay
import torch
from torch_geometric.data import Data, Dataset, DataLoader
from torch_geometric.nn import GATConv, global_mean_pool
import torch.nn.functional as F
import torch.optim as optim

In [39]:
# Define dataset path in Google Drive
dataset_path = "/content/drive/MyDrive/Dataset4"

In [40]:
# Function to convert image to graph using ORB + Delaunay Triangulation
def computeORBGraph(image):
    """
    Extracts keypoints from an image using ORB and constructs a graph representation.
    Nodes represent keypoints, and edges are formed using Delaunay Triangulation.
    """
    orb = cv2.ORB_create(nfeatures=700, scaleFactor=1.2, nlevels=8, edgeThreshold=15)
    keypoints, descriptors = orb.detectAndCompute(image, None)

    if not keypoints or descriptors is None:
        return None, None, None  # No keypoints detected

    points = np.array([kp.pt for kp in keypoints], dtype=np.float32)
    G = nx.Graph()

    # Add nodes with descriptors
    for i, (x, y) in enumerate(points):
        G.add_node(i, pos=(x, y), descriptor=descriptors[i])

    # Create edges using Delaunay Triangulation
    if len(points) > 2:
        tri = Delaunay(points)
        for simplex in tri.simplices:
            for i in range(3):
                G.add_edge(simplex[i], simplex[(i+1) % 3])

    return G, keypoints, descriptors


In [41]:
# Custom PyTorch Geometric Dataset for Currency Notes
class CurrencyGraphDataset(Dataset):
    def __init__(self, root, transform=None, pre_transform=None):
        self.data_list = [] # Initialize data_list here, before calling super().__init__
        super().__init__(root, transform, pre_transform)
        self.process()

    @property
    def processed_file_names(self):
        """
        Returns a list of filenames that represent the processed dataset.
        This is required by the PyTorch Geometric Dataset class to check if the
        data has already been processed.
        """
        return ['data.pt']  # You can change this to a more descriptive name or a list of files if needed.

    def process(self):
        for label, folder in enumerate(['Real', 'Fake']):
            folder_path = os.path.join(dataset_path, 'Training', folder)
            for filename in os.listdir(folder_path):
                img_path = os.path.join(folder_path, filename)
                image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

                G, keypoints, descriptors = computeORBGraph(image)
                if G is None:
                    continue

                # Convert graph to PyTorch Geometric format
                edge_index = torch.tensor(list(G.edges), dtype=torch.long).t().contiguous()
                x = torch.tensor(np.array([G.nodes[i]['descriptor'] for i in G.nodes]), dtype=torch.float)
                y = torch.tensor([label], dtype=torch.long)  # Graph-level label

                data = Data(x=x, edge_index=edge_index, y=y)
                self.data_list.append(data)

    def len(self):
        return len(self.data_list)

    def get(self, idx):
        return self.data_list[idx]

In [42]:
# Load dataset
dataset = CurrencyGraphDataset(root=dataset_path)
train_loader = DataLoader(dataset, batch_size=4, shuffle=True)

print(f"Loaded {len(dataset)} graphs.")

Processing...
Done!


Loaded 128 graphs.


In [43]:
from torch_geometric.nn import GATConv

# Define the GAT model
class GATClassifier(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim=64, output_dim=2, heads=2):
        super(GATClassifier, self).__init__()
        self.conv1 = GATConv(input_dim, hidden_dim, heads=heads)
        self.conv2 = GATConv(hidden_dim * heads, hidden_dim, heads=1)
        self.fc = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = global_mean_pool(x, data.batch)  # Graph-level pooling
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

# Get feature dimension from dataset
input_dim = dataset[0].x.shape[1]

# Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GATClassifier(input_dim=input_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.005, weight_decay=5e-4)
criterion = torch.nn.NLLLoss()


In [44]:
def train_model(model, train_loader, optimizer, criterion, epochs=50):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for data in train_loader:
            data = data.to(device)
            optimizer.zero_grad()
            out = model(data)
            loss = criterion(out, data.y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")

# Train for 50 epochs
train_model(model, train_loader, optimizer, criterion, epochs=50)


Epoch 1/50, Loss: 147.4035
Epoch 2/50, Loss: 45.5892
Epoch 3/50, Loss: 21.7696
Epoch 4/50, Loss: 22.8014
Epoch 5/50, Loss: 21.9992
Epoch 6/50, Loss: 21.4423
Epoch 7/50, Loss: 21.8487
Epoch 8/50, Loss: 21.4344
Epoch 9/50, Loss: 22.3454
Epoch 10/50, Loss: 22.9387
Epoch 11/50, Loss: 22.1473
Epoch 12/50, Loss: 21.8968
Epoch 13/50, Loss: 21.4367
Epoch 14/50, Loss: 21.1652
Epoch 15/50, Loss: 21.1892
Epoch 16/50, Loss: 21.1146
Epoch 17/50, Loss: 21.5060
Epoch 18/50, Loss: 20.6573
Epoch 19/50, Loss: 21.0229
Epoch 20/50, Loss: 21.0376
Epoch 21/50, Loss: 21.3461
Epoch 22/50, Loss: 22.8562
Epoch 23/50, Loss: 22.2794
Epoch 24/50, Loss: 22.1875
Epoch 25/50, Loss: 21.9205
Epoch 26/50, Loss: 21.7044
Epoch 27/50, Loss: 21.7820
Epoch 28/50, Loss: 22.1425
Epoch 29/50, Loss: 22.1083
Epoch 30/50, Loss: 22.1052
Epoch 31/50, Loss: 21.9903
Epoch 32/50, Loss: 21.8884
Epoch 33/50, Loss: 21.8125
Epoch 34/50, Loss: 20.3929
Epoch 35/50, Loss: 22.6596
Epoch 36/50, Loss: 22.2184
Epoch 37/50, Loss: 22.2946
Epoch 38/

In [45]:
def evaluate(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            data = data.to(device)
            out = model(data)
            pred = out.argmax(dim=1)
            correct += (pred == data.y).sum().item()
            total += data.y.size(0)
    return correct / total

# Load test dataset
test_dataset_path = os.path.join(dataset_path, 'Testing')
test_dataset = CurrencyGraphDataset(root=test_dataset_path)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

accuracy = evaluate(model, test_loader)
print(f"Test Accuracy: {accuracy * 100:.2f}%")


Processing...
Done!


Test Accuracy: 68.75%
