<a href="https://colab.research.google.com/github/TJSun009/University-Projects/blob/main/Test_Matcher.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Import libraries

In [None]:
! pip install -Uqqq scipy networkx

In [None]:
import os
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import networkx as nx
from glob import iglob
import importlib

### python-graphs dependency

In [None]:
# Graph Generator Approach
# https://arxiv.org/pdf/2208.07461v1.pdf

# install python-graphs on startup
! apt-get -qq -y install graphviz graphviz-dev
! pip install -Uqqq python-graphs

## CODE Dirs
The Code Directory should be a path to a folder with subfolders /src and /test

Your test and src files should have the naming convention *_test.py and *.py respectively
- i.e. divide_test.py is the test for divide.py

The data.zip is also available in the following GitHub [repo](https://github.com/TJSun009/University-Projects/blob/313cc021030769362faab8904c255b58d1327dd4/data.zip), which can be extracted and pointed to

In [None]:
CODE_DIR = "PATH TO FILES"

## Feed Data to Graph Network

#### Imports

In [None]:
!pip install -Uqqq torch-scatter torch-sparse torch-geometric -f https://pytorch-geometric.com/whl/torch-1.13.0+cu116.html

In [None]:
# each edge should be weighted differently based on its type, edge should contain types
from python_graphs import program_graph_dataclasses

# for ast class list
import sys, inspect

import torch

### Pytorch Code Data Object

In [None]:
from torch_geometric.data.dataset import to_list
from torch_geometric.data import Data

# This code data class represents a single graph generated in the code corpus as a pytorch data object
class CodeData:

  def __init__(self):
    self.x = []
    self.edge_index = [[], []]
    self.edge_attr = []
    self.y = []
    self.data = None
    self.types = {
        "edge" : program_graph_dataclasses.EdgeType._member_names_,
        "ast" : [cls.__name__ for _, cls in inspect.getmembers(sys.modules["ast"], inspect.isclass)]
        }
    self.vocab = set()
    self.node_id_map = {}

  def node_value_to_vect(self, k, v):
    if k == "node_type":
      return v.value
    elif k == "ast_type":
      ast_types = self.types["ast"]
      return ast_types.index(v) if v in ast_types else -1
    elif k == "ast_value":
      # transform vocab to list 
      return list(self.vocab).index(v)


  def read(self, src_graph, test_graph):

    # set identifier to file_name of graph if graph is a module
    self.y = [int(src_graph.filename.replace("_test.py", "") == test_graph.filename.replace("_test.py", ".py"))]

    # offset to differentiate src and test nodes
    offset = 0

    for graph in [src_graph, test_graph]:

      # add nodes to graph along with their attributes
      # dict comprehension deduplicates node id
      # we can exclude the ast_node as this info should be encoded in the graphs and edges
      # exclude instruction temporarily due to complexity

      nodes = graph.all_nodes()

      # create dictionary of ast token values to context embeddings

      # do one_hot_encoding instead for ease
      self.vocab.update(set([node.ast_value for node in nodes]))

      def update_node_id_map(idx, node, offset):
        self.node_id_map[node.id] = idx + offset
        return (idx + offset, node)

      self.x.extend([update_node_id_map(idx, node, offset) for idx, node in enumerate(nodes)])

      
      # append edges to the graph along with their attributes
      # dict comprehension deduplicates node ids for edge
      
      for edge in graph.edges:
        self.edge_index[0].append(self.node_id_map[edge.id1])
        self.edge_index[1].append(self.node_id_map[edge.id2])
        self.edge_attr.append([edge.type.value])

      offset = len(self.x)

    # enumerate through self.x and add other features
    for idx, (id, node) in enumerate(self.x):
      self.x[idx] = [id] + [self.node_value_to_vect(k, v) for k, v in node.__dict__.items() if k not in ["id", "ast_node", "instruction", "syntax"]]

  
  def get_data(self):
    if (len(self.y) > 0):
      self.x = torch.tensor(self.x, dtype=torch.float32)
      self.y = torch.tensor(self.y)
      self.edge_index = torch.tensor(self.edge_index, dtype=torch.float32)
      self.edge_attr = torch.tensor(self.edge_attr, dtype=torch.long)
      
      return Data(x=self.x, edge_index=self.edge_index, edge_attr=self.edge_attr, y=self.y)

  def draw(self):
    if len(self.x) > 0:

      G = self.get_data().to_networkx()

      # create normalizer for colours
      norm = plt.Normalize()

      # use vocab and edge_types to generate colours for plot
      # edges are mapped to their position in types
      token_colors = [self.vocab.index(val) for val in list(nx.get_node_attributes(self.G, "ast_value").values())]
      edge_type_colors = [edge_type.value for edge_type in list(nx.get_edge_attributes(self.G, "type").values())]
      
      # normalize the colors between [0, 1]
      node_color, edge_color = norm(token_colors), norm(edge_type_colors)

      fig, ax = plt.subplots(1, 1, figsize=(10, 10))

      nx.draw_networkx(G, edge_color = edge_color, node_color = node_color, with_labels=True, ax = ax)



## PyTorch Conversion

In [None]:
from torch_geometric.data import Dataset
import glob
from torch_geometric.data.makedirs import makedirs
from itertools import product
from tqdm.notebook import tqdm

In [None]:
import gast
from python_graphs import program_graph
from contextlib import suppress

def get_graph(fpath):
  try:
    with open(fpath, encoding="utf-8") as f:
      graph = program_graph.get_program_graph(gast.parse(f.read()))
      graph.filename = os.path.basename(fpath)
      return graph
  except:
    return None

def save_pytorch_data(raw_paths, processed_dir, processed_file_names):
  src_paths, test_paths = [], []

  for path in raw_paths:
    test_paths.append(path) if path.find("_test.py") != -1 else src_paths.append(path)

  source_test_pairs = list(product(src_paths, test_paths))

  idx = 0

  unparseable = []

  for i, (src_path, test_path) in enumerate(pbar := tqdm(source_test_pairs)):
    
    if src_path in unparseable or test_path in unparseable:
      continue

    src_graph = get_graph(src_path)
    
    if src_graph == None:
      unparseable.append(src_path)
      pbar.set_description(f"Could not parse {os.path.basename(src_path)}")
      continue
    
    
    test_graph = get_graph(test_path)
    
    if test_graph == None:
      unparseable.append(test_path)
      pbar.set_description(f"Could not parse {os.path.basename(test_path)}")
      continue
    else:
      if (os.path.exists(os.path.join(processed_dir, f"data_{idx}.pt"))):
        idx += 1
        continue
    
    pbar.set_description(f"pairing [{os.path.basename(src_path)}, {os.path.basename(test_path)}]")
    
    paired_data = CodeData()

    paired_data.read(src_graph, test_graph)

    data = paired_data.get_data()

    data_file = f"data_{idx}.pt"

    torch.save(data, os.path.join(processed_dir, data_file))

    processed_file_names.append(data_file)

    pbar.set_description(f"saved {data_file}")
    
    idx += 1

In [None]:
import pdb
from glob import iglob
class SourceTestDataset(Dataset):
  def __init__(self, root, transform=None, pre_transform=None, pre_filter=None, gcs=False):
      self.max_props = {"nodes": 0, "edges": 0}
      self.cached_raw_files = []
      self.cached_processed_files = []
      self.vocab = set()
      
      self.types = {
        "edge" : program_graph_dataclasses.EdgeType._member_names_,
        "ast" : [cls.__name__ for _, cls in inspect.getmembers(sys.modules["ast"], inspect.isclass)]
      }

      super().__init__(root, self.transform, pre_transform, pre_filter)
      
      # self.node_id_map = {}

  def node_value_to_vect(self, k, v):
    if k == "node_type":
      return v.value
    elif k == "ast_type":
      ast_types = self.types["ast"]
      return ast_types.index(v) if v in ast_types else -1
    elif k == "ast_value":
      # transform vocab to list 
      return list(self.vocab).index(v)

  @property
  def raw_file_names(self):
    if len(self.cached_raw_files) == 0:
      source_files = os.listdir(os.path.join(CODE_DIR, "raw", "src", ''))
      
      for file in source_files:
        self.cached_raw_files.append(os.path.join(CODE_DIR, "raw", "src", file))
        self.cached_raw_files.append(os.path.join(CODE_DIR, "raw", "test", file.replace(".py", "_test.py")))
    else:
      self.cached_raw_files = [os.path.basename(file) for file in iglob(os.path.join(CODE_DIR, "**", "*.py"))]

    return self.cached_raw_files
  
  @property
  def processed_file_names(self):
    nodes = 0
    
    processed_files = []

    for file in iglob(f"{self.processed_dir}/[!pre_]*"):
      processed_files.append(os.path.basename(file))
      if self.max_props["nodes"] == 0:
        data = torch.load(os.path.join(self.processed_dir, file))
        nodes = max(nodes, data.x.size()[0])
    else:
      if(nodes > 0):
        self.max_props["nodes"] = nodes

    if len(processed_files) > 0:
      self.cached_processed_files = processed_files

    return self.cached_processed_files

  def download(self):
    # Download to `self.raw_dir`.
    raise NotImplementedError(f"No data in {self.raw_dir} directory. Use the dataset retrieval notebook to retrieve files")

  def process(self):
    # Read data into huge `Data` list.
    save_pytorch_data(self.raw_file_names, self.processed_dir, self.cached_processed_files)

  def len(self):
    return len(self.processed_file_names)
  
  def transform(self, data):
    from torch.nn import functional as F
    node_pad = int(self.max_props["nodes"] - data.x.size()[0])
    edge_pad = int(self.max_props["edges"] - data.edge_index.size()[1])
    data.x = F.pad(data.x, (0, 0, 0, node_pad)).to(torch.float32)
    data.edge_index = data.edge_index.to(torch.long)
    return data

  def get(self, idx):
    data = torch.load(os.path.join(self.processed_dir, f"data_{idx}.pt"))
    return data

## Get Dataset

In [None]:
dataset = SourceTestDataset(root=CODE_DIR)

### Inspect Dataset

In [None]:
print()
print(f'Dataset: {dataset}:')
print('====================')
print(f'Number of graphs: {len(dataset)}')
print(f'Number of features: {dataset.num_features}')
print(f'Number of classes: {dataset.num_classes}')


Dataset: SourceTestDataset(1881):
Number of graphs: 1881
Number of features: 4
Number of classes: 2


# Training

### neptune.ai Integration

See [here](https://docs.neptune.ai/setup/installation/) for setting up your own neptune.ai API key and project

If you don't require neptune you can set `run` to None

In [None]:
! pip install -Uqqq neptune-client

In [None]:
import neptune.new as neptune

neptune_api_token = "YOUR TOKEN"

project = "YOUR PROJECT NAME"

run = neptune.init_run(
    api_token=neptune_api_token,
    project=project,
)

  from neptune.version import version as neptune_client_version
  import neptune.new as neptune
  run = neptune.init_run(


https://app.neptune.ai/tjsun009/test-src-classifier/e/TES-44


### Training Parameters

In [None]:
split_ratio = 0.8
batch_size = 32
hidden_channels = 64
learning_rate = 0.01

# log params
run["parameters"] = {"split_ratio": split_ratio, "batch_size": batch_size, "hidden_channels": hidden_channels, "learning_rate": learning_rate}

### Train/Test split

In [None]:
torch.manual_seed(12345)
dataset = dataset.shuffle()

split_idx = int(len(dataset)*split_ratio)

train_dataset = dataset[:split_idx]
test_dataset = dataset[split_idx:]

print(f'Number of training graphs: {len(train_dataset)}')
print(f'Number of test graphs: {len(test_dataset)}')

Number of training graphs: 1504
Number of test graphs: 377


### Prepare Dataset Loader

In [None]:
from torch_geometric.loader import DataLoader

# use GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {}

train_loader = DataLoader(
    train_dataset, 
    batch_size=batch_size, 
    shuffle=True,
    **kwargs
    )

test_loader = DataLoader(
    test_dataset, 
    batch_size=batch_size, 
    shuffle=False,
    **kwargs
    )

for step, data in enumerate(train_loader):
  print(f'Step {step + 1}:')
  print('=======')
  print(f'Number of graphs in the current batch: {data.num_graphs}')
  print(data)
  print()

Step 1:
Number of graphs in the current batch: 32
DataBatch(x=[5827616, 4], edge_index=[2, 986526], edge_attr=[986526, 1], y=[32], batch=[5827616], ptr=[33])

Step 2:
Number of graphs in the current batch: 32
DataBatch(x=[5827616, 4], edge_index=[2, 420841], edge_attr=[420841, 1], y=[32], batch=[5827616], ptr=[33])

Step 3:
Number of graphs in the current batch: 32
DataBatch(x=[5827616, 4], edge_index=[2, 1080426], edge_attr=[1080426, 1], y=[32], batch=[5827616], ptr=[33])

Step 4:
Number of graphs in the current batch: 32
DataBatch(x=[5827616, 4], edge_index=[2, 1023737], edge_attr=[1023737, 1], y=[32], batch=[5827616], ptr=[33])

Step 5:
Number of graphs in the current batch: 32
DataBatch(x=[5827616, 4], edge_index=[2, 586733], edge_attr=[586733, 1], y=[32], batch=[5827616], ptr=[33])

Step 6:
Number of graphs in the current batch: 32
DataBatch(x=[5827616, 4], edge_index=[2, 856422], edge_attr=[856422, 1], y=[32], batch=[5827616], ptr=[33])

Step 7:
Number of graphs in the current ba

## Training a Graph Neural Network (GNN)

copied from: https://colab.research.google.com/drive/1I8a0DfQ3fI7Njc62__mVXUlcAleUclnb

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool

class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(dataset.num_node_features, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, dataset.num_classes)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

model = GCN(hidden_channels=hidden_channels)
model.to(device)
print(model)

GCN(
  (conv1): GCNConv(4, 64)
  (conv2): GCNConv(64, 64)
  (conv3): GCNConv(64, 64)
  (lin): Linear(in_features=64, out_features=2, bias=True)
)


Here, we again make use of the [`GCNConv`](https://pytorch-geometric.readthedocs.io/en/latest/modules/nn.html#torch_geometric.nn.conv.GCNConv) with $\mathrm{ReLU}(x) = \max(x, 0)$ activation for obtaining localized node embeddings, before we apply our final classifier on top of a graph readout layer.

Let's train our network for a few epochs to see how well it performs on the training as well as test set:

In [None]:
from datetime import datetime
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()

def train():
  model.train()

  for data in train_loader:  # Iterate in batches over the training dataset.
    out = model(data.x.to(device), data.edge_index.to(device), data.batch.to(device))  # Perform a single forward pass.
    loss = criterion(out, data.y.to(device))  # Compute the loss.
    if run:
      run["train/loss"].append(loss) # log loss to neptune ai
      run["train/loss-pow-2"].append(loss**2) # log loss squared to neptune ai
    loss.backward()  # Derive gradients.
    optimizer.step()  # Update parameters based on gradients.
    optimizer.zero_grad()  # Clear gradients.

def test(loader):
  model.eval()
  with torch.no_grad():
    correct = 0
    for data in loader:  # Iterate in batches over the training/test dataset.
        out = model(data.x.to(device), data.edge_index.to(device), data.batch.to(device))  
        pred = out.argmax(dim=1)  # Use the class with highest probability.
        del(out)
        correct += int((pred.to(device) == data.y.to(device)).sum())  # Check against ground-truth labels.
    return correct / len(loader.dataset)  # Derive ratio of correct predictions.

In [None]:
# At least 3 times the number of features
pbar = tqdm(range(1, 13))
for epoch in pbar:
  train()
  train_acc = test(train_loader)
  test_acc = test(test_loader)
  
  pbar.set_description(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
  if run:  
    # log accuracy to neptune ai
    run["train/accuracy"].append(train_acc)
    run["test/accuracy"].append(test_acc)

if run:
  run.stop()

  0%|          | 0/12 [00:00<?, ?it/s]

Shutting down background jobs, please wait a moment...
Done!
Waiting for the remaining 2 operations to synchronize with Neptune. Do not kill this process.
All 2 operations synced, thanks for waiting!
Explore the metadata in the Neptune app:
https://app.neptune.ai/tjsun009/test-src-classifier/e/TES-44/metadata
