In [1]:
import os
import cv2
import numpy as np
import networkx as nx
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

In [4]:
base_dir = '/Users/alyssonamaral/Documents/gcn/images'

dogs_dir = os.path.join(base_dir, 'Dog')
cats_dir = os.path.join(base_dir, 'Cat')

def load_and_resize_images(folder, target_size=(256, 256)):
    images = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        img = cv2.imread(img_path)
        if img is not None:
            img_resized = cv2.resize(img, target_size)
            img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)  # Converter BGR para RGB
            img_tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float()  # Converter para tensor e permutar dimensões
            images.append(img_tensor)
    return images

In [None]:
dog_images = load_and_resize_images(dogs_dir)
cat_images = load_and_resize_images(cats_dir)

In [None]:
dog_images[0].shape

In [None]:
def image_to_graph(image):
    channels, height, width = image.shape
    graph = nx.DiGraph()  

    # Adição de nós com atributos RGB
    for i in range(height):
        for j in range(width):
            rgb_values = image[:, i, j].float()
            graph.add_node((i, j), rgb=rgb_values)

    # Adição de arestas com pesos baseados na diferença de cor
    for i in range(height):
        for j in range(width):
            neighbors = {
                "down": (i+1, j),
                "up": (i-1, j),
                "right": (i, j+1),
                "left": (i, j-1),
                "down_right": (i+1, j+1),
                "down_left": (i+1, j-1),
                "up_right": (i-1, j+1),
                "up_left": (i-1, j-1),
            }
            for direction, (ni, nj) in neighbors.items():
                if 0 <= ni < height and 0 <= nj < width:
                    diff = torch.norm(image[:, i, j].float() - image[:, ni, nj].float())
                    graph.add_edge((i, j), (ni, nj), weight=diff, direction=direction)
                    graph.add_edge((ni, nj), (i, j), weight=diff, direction=direction + "_reverse")

    return graph

In [None]:
class MessagePassingLayer(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(MessagePassingLayer, self).__init__()
        
        # Função de atualização das arestas
        self.edge_mlp = nn.Sequential(
            nn.Linear(input_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
        
        # Função de atualização dos nós com concatenação de mensagens e recursos do nó
        self.node_mlp = nn.Sequential(
            nn.Linear(input_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, graph):
        new_edge_features = {}
        for edge in graph.edges:
            node_i, node_j = edge
            edge_feature = torch.cat([graph.nodes[node_i]["rgb"], graph.nodes[node_j]["rgb"]])
            new_edge_features[edge] = self.edge_mlp(edge_feature)

        for edge, new_feature in new_edge_features.items():
            graph.edges[edge]["feature"] = new_feature

        new_node_features = {}
        for node in graph.nodes:
            node_feature = graph.nodes[node]["rgb"]
            
            # Agregação das características das arestas conectadas ao nó atual
            neighbor_edges = [graph.edges[(node, neighbor)]["feature"] for neighbor in graph.neighbors(node)]
            if neighbor_edges:
                aggregated_edge_message = torch.mean(torch.stack(neighbor_edges), dim=0)
            else:
                aggregated_edge_message = torch.zeros_like(node_feature)
            
            # Concatenar a característica do nó com a mensagem agregada das arestas
            combined = torch.cat([node_feature, aggregated_edge_message])
            new_node_feature = self.node_mlp(combined)
            new_node_features[node] = new_node_feature

        for node, new_feature in new_node_features.items():
            graph.nodes[node]["rgb"] = new_feature

        return graph

In [None]:
# Função para agregar os valores dos nós e das arestas em um vetor
def aggregate_graph_features(graph):
    # Agregação dos atributos dos nós
    node_features = torch.stack([graph.nodes[node]["rgb"] for node in graph.nodes])
    aggregated_node_features = torch.mean(node_features, dim=0)

    # Agregação dos atributos das arestas
    edge_features = torch.stack([graph.edges[edge]["feature"] for edge in graph.edges])
    aggregated_edge_features = torch.mean(edge_features, dim=0)

    # Concatenar a agregação dos nós e das arestas em um vetor final
    aggregated_vector = torch.cat([aggregated_node_features, aggregated_edge_features])
    return aggregated_vector

In [None]:
# Classe do modelo MLP final para classificação
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return self.softmax(x)

# Parâmetros para o modelo
T = 15
input_dim = 3 
hidden_dim = 64
output_dim = 2

message_passing_layer = MessagePassingLayer(input_dim, hidden_dim)
classification_model = MLP(input_dim, hidden_dim, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(list(message_passing_layer.parameters()) + list(classification_model.parameters()), lr=0.001)

# Carrega e transforma as imagens em grafos
dog_graphs = [image_to_graph(img) for img in dog_images]
cat_graphs = [image_to_graph(img) for img in cat_images]

# Divide os grafos e rótulos em treino e teste
graphs = dog_graphs + cat_graphs
labels = [0] * len(dog_graphs) + [1] * len(cat_graphs)
train_graphs, test_graphs, train_labels, test_labels = train_test_split(graphs, labels, test_size=0.2, random_state=42)

# Função para processar os grafos e passar por rounds de message passing
def process_graphs(graphs, T):
    processed_vectors = []
    for graph in graphs:
        for _ in range(T):
            graph = message_passing_layer(graph)
        aggregated_vector = aggregate_graph_features(graph)
        processed_vectors.append(aggregated_vector)
    return torch.tensor(processed_vectors, dtype=torch.float32)

# Processa os grafos de treino e teste com a passagem de mensagem
X_train_tensor = process_graphs(train_graphs, T)
X_test_tensor = process_graphs(test_graphs, T)
y_train_tensor = torch.tensor(train_labels, dtype=torch.long)
y_test_tensor = torch.tensor(test_labels, dtype=torch.long)

# Configura o modelo e o treinamento
num_epochs = 15
for epoch in range(num_epochs):
    optimizer.zero_grad()
    outputs = classification_model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

# Avaliação do modelo
with torch.no_grad():
    test_outputs = classification_model(X_test_tensor)
    _, predicted = torch.max(test_outputs, 1)
    accuracy = (predicted == y_test_tensor).sum().item() / len(y_test_tensor)
    print(f"Accuracy on test set: {accuracy * 100:.2f}%")