In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import convert

import torchvision
import torchvision.transforms as transforms

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data preprocessing
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-10 dataset
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Define graph construction function
def construct_graph(image):
    _, height, width = image.size()
    num_nodes = height * width

    # Reshape the image tensor to a flat vector
    image_flat = image.view(-1)

    # Create node features from image pixel values
    x = torch.zeros(num_nodes, 3)
    for i in range(num_nodes):
        pixel_value = image_flat[i]
        x[i, :] = pixel_value

    # Define the connectivity of the graph (e.g., 8-connected grid)
    edge_index = torch.tensor([[i, j] for i in range(num_nodes) for j in get_neighbors(i, height, width)], dtype=torch.long).t().contiguous()

    return Data(x=x, edge_index=edge_index)

# Function to get neighbors of a node in a 2D grid
def get_neighbors(node_idx, height, width):
    row = node_idx // width
    col = node_idx % width
    neighbors = []

    for drow in [-1, 0, 1]:
        for dcol in [-1, 0, 1]:
            new_row = row + drow
            new_col = col + dcol

            if 0 <= new_row < height and 0 <= new_col < width:
                neighbor_idx = new_row * width + new_col
                if neighbor_idx != node_idx:
                    neighbors.append(neighbor_idx)

    return neighbors

# Convert CIFAR-10 images to graph data
train_graphs = []
test_graphs = []

for image, label in train_dataset:
    graph = construct_graph(image)
    graph.y = torch.tensor(label, dtype=torch.long)
    train_graphs.append(graph)

for image, label in test_dataset:
    graph = construct_graph(image)
    graph.y = torch.tensor(label, dtype=torch.long)
    test_graphs.append(graph)

# Define the GNN model
class GraphConvNet(MessagePassing):
    def __init__(self):
        super(GraphConvNet, self).__init__(aggr='max')

        self.lin = nn.Linear(3, 16)
        self.conv = nn.Conv1d(16, 32, kernel_size=1)
        self.fc = nn.Linear(32, 10)

    def forward(self, x, edge_index):
        x = self.lin(x)
        x = self.propagate(edge_index, size=(x.size(0), x.size(0)), x=x)
        x = self.conv(x.unsqueeze(0)).squeeze(0)
        x = self.fc(x)
        return x

    def message(self, x_j):
        return x_j

# Initialize the GNN model
model = GraphConvNet().to(device)

# Convert graph data to PyG format
train_data = [convert.graph_to_pyg_data(graph) for graph in train_graphs]
test_data = [convert.graph_to_pyg_data(graph) for graph in test_graphs]

# Define the loss criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for data in train_data:
        data = data.to(device)
        optimizer.zero_grad()

        outputs = model(data.x, data.edge_index)
        loss = criterion(outputs, data.y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_data)}")

# Evaluation
model.eval()
total_correct = 0
total_samples = 0

for data in test_data:
    data = data.to(device)
    outputs = model(data.x, data.edge_index)
    _, predicted = torch.max(outputs, 1)
    total_correct += (predicted == data.y).sum().item()
    total_samples += data.y.size(0)

accuracy = total_correct / total_samples
print(f"Accuracy on test set: {accuracy * 100:.2f}%")


Files already downloaded and verified
Files already downloaded and verified


In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

# Step 1: Load and preprocess the CIFAR-10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize image data
])

train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

# Step 2: Define the CNN model for image classification
class CNNClassifier(nn.Module):
    def __init__(self):
        super(CNNClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc = nn.Linear(8 * 8 * 32, 10)  # 8x8x32 is the size after the convolutions and pooling

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor for fully connected layer
        x = self.fc(x)
        return x

# Step 3: Train the CNN model
model = CNNClassifier()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.to(device)

for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if (i + 1) % 200 == 0:
            print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {running_loss / 200:.4f}")
            running_loss = 0.0

# Step 4: Evaluate the model on the test set
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy on the test set: {(100 * correct / total):.2f}%")


Files already downloaded and verified
Files already downloaded and verified
Epoch [1/10], Step [200/782], Loss: 2.2195
Epoch [1/10], Step [400/782], Loss: 2.0217
Epoch [1/10], Step [600/782], Loss: 1.8803
Epoch [2/10], Step [200/782], Loss: 1.7440
Epoch [2/10], Step [400/782], Loss: 1.6759
Epoch [2/10], Step [600/782], Loss: 1.6173
Epoch [3/10], Step [200/782], Loss: 1.5238
Epoch [3/10], Step [400/782], Loss: 1.4734
Epoch [3/10], Step [600/782], Loss: 1.4545
Epoch [4/10], Step [200/782], Loss: 1.3919
Epoch [4/10], Step [400/782], Loss: 1.3598
Epoch [4/10], Step [600/782], Loss: 1.3426
Epoch [5/10], Step [200/782], Loss: 1.3107
Epoch [5/10], Step [400/782], Loss: 1.2984
Epoch [5/10], Step [600/782], Loss: 1.2523
Epoch [6/10], Step [200/782], Loss: 1.2374
Epoch [6/10], Step [400/782], Loss: 1.2375
Epoch [6/10], Step [600/782], Loss: 1.2146
Epoch [7/10], Step [200/782], Loss: 1.1992
Epoch [7/10], Step [400/782], Loss: 1.1689
Epoch [7/10], Step [600/782], Loss: 1.1594
Epoch [8/10], Step [2