In [None]:
%pip install -r ./requirements.txt

# Import Modules

In [None]:
import random
import numpy as np
import torch
import torch.nn.functional as F
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

from torch_geometric.data import Data, DataLoader

from Graph import GraphSage, GAT

In [None]:
SEED = 42
deterministic = True

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
if deterministic:
	torch.backends.cudnn.deterministic = True
	torch.backends.cudnn

# Data Pre-processing

In [None]:
# Load pokemon.csv and combats.csv
pokemon_data = pd.read_csv('./dataset/pokemon.csv')
combats_data = pd.read_csv('./dataset/combats.csv')

# Encode vertex values to unique integers
label_encoder = LabelEncoder()
pokemon_data['#'] = label_encoder.fit_transform(pokemon_data['#'])
combats_data[['First_pokemon', 'Second_pokemon', 'Winner']] = \
    combats_data[['First_pokemon', 'Second_pokemon', 'Winner']].apply(label_encoder.transform)

features_to_normalize = ['HP', 'Attack', 'Defense', 'Speed', 'Generation', 'Sp. Atk', 'Sp. Def']
features_else = ['Type 1_Bug', 'Type 1_Dark', 'Type 1_Dragon', 'Type 1_Electric',
              'Type 1_Fairy', 'Type 1_Fighting', 'Type 1_Fire', 'Type 1_Flying',
              'Type 1_Ghost', 'Type 1_Grass', 'Type 1_Ground', 'Type 1_Ice',
              'Type 1_Normal', 'Type 1_Poison', 'Type 1_Psychic', 'Type 1_Rock',
              'Type 1_Steel', 'Type 1_Water', 'Type 2_Bug', 'Type 2_Dark',
              'Type 2_Dragon', 'Type 2_Electric', 'Type 2_Fairy', 'Type 2_Fighting',
              'Type 2_Fire', 'Type 2_Flying', 'Type 2_Ghost', 'Type 2_Grass',
              'Type 2_Ground', 'Type 2_Ice', 'Type 2_Normal', 'Type 2_Poison',
              'Type 2_Psychic', 'Type 2_Rock', 'Type 2_Steel', 'Type 2_Water']

scaler = StandardScaler()

pokemon_data[features_to_normalize] = \
    scaler.fit_transform(pokemon_data[features_to_normalize])

pokemon_data = pd.get_dummies(pokemon_data, columns=['Type 1', 'Type 2'])
pokemon_data[features_else] = \
    pokemon_data[features_else].astype(int)

pokemon_data['Legendary'] = pokemon_data['Legendary'].astype(int)

In [None]:
# Split combats data into train and test
train_combats, test_combats = train_test_split(combats_data, test_size=0.2, random_state=SEED)

# Extract unique vertex values from train_combats and test_combats
train_vertices = set(train_combats['First_pokemon']).union(set(train_combats['Second_pokemon']))
test_vertices = set(test_combats['First_pokemon']).union(set(test_combats['Second_pokemon']))

# Split pokemon data into train and test based on the vertices
train_pokemon = pokemon_data[pokemon_data['#'].isin(train_vertices)]
test_pokemon = pokemon_data[pokemon_data['#'].isin(test_vertices)]

# Decode vertex values back to original values if needed
train_pokemon['#'] = label_encoder.inverse_transform(train_pokemon['#'])
test_pokemon['#'] = label_encoder.inverse_transform(test_pokemon['#'])

# Set "#" as index for train_pokemon and test_pokemon dataframes
train_pokemon.set_index('#', inplace=True)
test_pokemon.set_index('#', inplace=True)

In [None]:
features = features_to_normalize + features_else

X_data = pokemon_data[features].values
X_data = torch.tensor(X_data, dtype=torch.float)

# for train dataset
X_train = train_pokemon[features].values
X_train = torch.tensor(X_train, dtype=torch.float)
edges_train = []
neg_edge_index_train = []

for _, row in train_combats.iterrows():
    first_pokemon = row['First_pokemon']
    second_pokemon = row['Second_pokemon']
    winner = row['Winner']

    if first_pokemon == winner:
      edges_train.append((second_pokemon, first_pokemon))
      neg_edge_index_train.append((first_pokemon, second_pokemon))

    else:
      edges_train.append((first_pokemon, second_pokemon))
      neg_edge_index_train.append((second_pokemon, first_pokemon))

neg_edge_index_train = torch.tensor(neg_edge_index_train, dtype=torch.long).t()
edge_index_train = torch.tensor(edges_train, dtype=torch.long).t()

data_train = Data(x=X_data, edge_index=edge_index_train, neg_edge_index = neg_edge_index_train)
train_loader = DataLoader([data_train], batch_size=1, shuffle=True)

# for test dataset
X_test = test_pokemon[features].values
X_test = torch.tensor(X_test, dtype=torch.float)
edges_test = []
neg_edge_index_test = []

for _, row in test_combats.iterrows():
    first_pokemon = row['First_pokemon']
    second_pokemon = row['Second_pokemon']
    winner = row['Winner']

    if first_pokemon == winner:
      edges_test.append((second_pokemon, first_pokemon))
      neg_edge_index_test.append((first_pokemon, second_pokemon))

    else:
      edges_test.append((first_pokemon, second_pokemon))
      neg_edge_index_test.append((second_pokemon, first_pokemon))

edge_index_test = torch.tensor(edges_test, dtype=torch.long).t()
neg_edge_index_test = torch.tensor(neg_edge_index_test, dtype=torch.long).t()

data_test = Data(x=X_data, edge_index=edge_index_test, neg_edge_index= neg_edge_index_test)
test_loader = DataLoader([data_test], batch_size=1, shuffle=False)

# for total data
edge = edges_train
edge.extend(edges_test)
edge_index = torch.tensor(edge, dtype=torch.long).t()

data_total = Data(x=X_data, edge_index = edge_index)

# Define Train, Test, Predict Frameworks

In [None]:
def train(model, optimizer, train_loader, device):
    model.train()
    total_loss = 0

    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data.x, data.edge_index)

        pos_edge_index = data.edge_index
        neg_edge_index = data.neg_edge_index

        pos_head = out[pos_edge_index[0]]
        pos_tail = out[pos_edge_index[1]]
        neg_head = out[neg_edge_index[0]]
        neg_tail = out[neg_edge_index[1]]

        pos_pred = model.predict(pos_head, pos_tail)
        neg_pred = model.predict(neg_head, neg_tail)

        pos_loss = F.binary_cross_entropy_with_logits(pos_pred, torch.ones_like(pos_pred))
        neg_loss = F.binary_cross_entropy_with_logits(neg_pred, torch.zeros_like(neg_pred))
        loss = pos_loss + neg_loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    return total_loss / len(train_loader)

def test(model, loader, device):
    model.eval()
    auc = 0
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data.x, data.edge_index)

            pos_edge_index = data.edge_index
            neg_edge_index = data.neg_edge_index

            pos_head = out[pos_edge_index[0]]
            pos_tail = out[pos_edge_index[1]]
            neg_head = out[neg_edge_index[0]]
            neg_tail = out[neg_edge_index[1]]

            pos_pred = model.predict(pos_head, pos_tail)
            neg_pred = model.predict(neg_head, neg_tail)
            preds = torch.cat([pos_pred, neg_pred])

            pos_labels = torch.ones_like(pos_pred)
            neg_labels = torch.zeros_like(neg_pred)
            labels = torch.cat([pos_labels, neg_labels])

            auc += roc_auc_score(labels.cpu(), preds.cpu())

    return auc / len(loader)

def predict(model, head, tail, data, device):
    model.eval()
    with torch.no_grad():
        data = data.to(device)
        out = model(data.x, data.edge_index)
        head_feature = out[head]
        tail_feature = out[tail]
        score = model.predict(head_feature, tail_feature)
        reverse_score = model.predict(tail_feature, head_feature)
        return torch.sigmoid(score).item(), torch.sigmoid(reverse_score).item()

# Train

In [None]:
# Choose model from ["GCN", "GraphSage_mean", "GraphSage_maxpool", "GAT"]
def train_model(epoch, model_type, data_train, data_test, train_loader, test_loader, device):
    EPOCH = epoch    
    MODEL = model_type

    if MODEL == "GCN":
        model = GraphSage(num_layers=2, dim_in=X_data.shape[1], dim_hidden=64, dim_out=8, agg_type="gcn")
    elif MODEL == "GraphSage_mean":
        model = GraphSage(num_layers=2, dim_in=X_data.shape[1], dim_hidden=64, dim_out=8, agg_type="mean")
    elif MODEL == "GraphSage_maxpool":
        model = GraphSage(num_layers=2, dim_in=X_data.shape[1], dim_hidden=64, dim_out=8, agg_type="maxpool")
    elif MODEL == "GAT":
        model = GAT(dim_in=X_data.shape[1], dim_hidden=64, dim_out=8, dropout = 0.5, alpha = 0.2, num_heads = 8)

    model = model.to(device)

    data_train = data_train.to(device)
    data_test = data_test.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=0.01, weight_decay=5e-4)

    patience = 3 # for early stopping
    best_loss = float('inf')
    epochs_no_improve = 0

    for epoch in range(EPOCH):
        loss = train(model, optimizer, train_loader, device)
        if epoch % 10 == 0:
            test_auc = test(model, test_loader, device)
            train_auc = test(model, train_loader, device)
            print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Train AUC: {train_auc:.4f}, Test AUC: {test_auc:.4f}')
            if loss < best_loss:
                best_loss = loss
                epochs_no_improve = 0
            else:
                epochs_no_improve += 1
                if epochs_no_improve == patience:
                    print("Early stopping triggered")
                    break
    return model

# calculate prediction accuracy
def test_model(model, data_total, data_test, device):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    head_list, tail_list = data_test.edge_index
    head_list = list(head_list)
    tail_list = list(tail_list)

    pos = 0
    for idx in range(len(head_list)):
        head = head_list[idx]
        tail = tail_list[idx]
        prediction, reverse_prediction = predict(model, head, tail, data_total, device)
        if prediction > reverse_prediction:
            pos += 1

    print(f"Prediction accuracy: {pos/len(head_list)}")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# select model from model_types
model_types = ["GCN", "GraphSage_mean", "GraphSage_maxpool", "GAT"]

In [None]:
gcn_model = train_model(300, "GCN", data_train, data_test, train_loader, test_loader, device)
test_model(gcn_model, data_total, data_test, device)

In [None]:
graphsage_mean_model = train_model(300, "GraphSage_mean", data_train, data_test, train_loader, test_loader, device)
test_model(graphsage_mean_model, data_total, data_test, device)

In [None]:
graphsage_max_model = train_model(300, "GraphSage_maxpool", data_train, data_test, train_loader, test_loader, device)
test_model(graphsage_max_model, data_total, data_test, device)

In [None]:
gat_model = train_model(300, "GAT", data_train, data_test, train_loader, test_loader, device)
test_model(gat_model, data_total, data_test, device)