# AST parser exploration
from `code_featurization.ipynb`

In [1]:
import requests
import re

response = requests.get("https://raw.githubusercontent.com/tree-sitter/tree-sitter-python/master/src/node-types.json")
response

<Response [200]>

In [2]:
types = re.findall(r'"type": "(.+)"', response.text)
type_to_int = {t: i for i, t in enumerate(list(set(types)))}

In [3]:
from tree_sitter_languages import get_parser

parser = get_parser("python")
tree = parser.parse(bytes("def foo():\n    pass", "utf8"))
root = tree.root_node

In [4]:
import networkx as nx
import tree_sitter

# Filtering for anon nodes is necessary to avoid a lot of noise and non important features??
def tree_to_graph(root: tree_sitter.Node, with_anon: bool = False) -> nx.DiGraph:
    G = nx.DiGraph()
    todo = [root]
    while todo:
        node = todo.pop()
        if with_anon or node.is_named:
            G.add_node(node.id, type=node.type)
        for child in node.children:
            if with_anon or child.is_named:
                G.add_edge(node.id, child.id)
            todo.append(child)
    return G

In [3]:
import json
import networkx as nx

In [4]:
# filter to a list of graphs
with open("features.json") as f:
    repo_features = json.load(f)

graphs = []

for commit_hash, commit_features in repo_features.items():
    if commit_features:
        graphs.append([nx.node_link_graph(file_features["ast"]) for _, file_features in commit_features.items()])

len(graphs)

1273

In [5]:
len(graphs[0]), type(graphs[0][0])

(5, networkx.classes.digraph.DiGraph)

In [6]:
G = graphs[0][0]

[G.nodes[node] for node in G.nodes]

[{'type': 'module'},
 {'type': 'comment'},
 {'type': 'import_from_statement'},
 {'type': 'expression_statement'},
 {'type': 'call'},
 {'type': 'identifier'},
 {'type': 'argument_list'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'keyword_argument'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'list'},
 {'type': 'string'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'list'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'type': 'string'},
 {'type': 'identifier'},
 {'typ

In [7]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GATConv, global_mean_pool
import torch.nn as nn
from torch_geometric.data import Data, DataLoader, Batch
import networkx as nx
import numpy as np

In [8]:
def nx_to_pyg_graph(nx_graph: nx.DiGraph) -> Data:
    x = torch.tensor([[type_to_int[nx_graph.nodes[node]['type']]] for node in nx_graph.nodes], dtype=torch.long)
    edge_index = torch.tensor(list(nx_graph.edges), dtype=torch.long).t().contiguous()
    return Data(x=x, edge_index=edge_index)

In [10]:
class GATFeatureExtractor(torch.nn.Module):
    def __init__(self, num_embeddings, embedding_dim=64, hidden_dim=64):
        super(GATFeatureExtractor, self).__init__()
        self.embedding = torch.nn.Embedding(num_embeddings, embedding_dim)
        self.conv1 = GATConv(embedding_dim, hidden_dim)
        self.conv2 = GATConv(hidden_dim, hidden_dim)

    def forward(self, data):
        # Ensure that x is not None
        assert data.x is not None, "Input feature matrix x is None."
        x, edge_index = data.x, data.edge_index
        embedded_x = self.embedding(x.squeeze())  # Embedding lookup
        x, attn_weights1 = self.conv1(embedded_x, edge_index, return_attention_weights=True)
        x = torch.relu(x)
        x, attn_weights2 = self.conv2(x, edge_index, return_attention_weights=True)
        x = global_mean_pool(x, data.batch)  # Aggregate to graph level
        return x  # Simplified to return only embeddings for clarity



class RegressionModel(nn.Module):
    def __init__(self, input_dim, output_dim=1):
        super(RegressionModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [9]:
class GATFeatureExtractor(torch.nn.Module):
    def __init__(self, num_embeddings, embedding_dim=64, hidden_dim=64):
        super(GATFeatureExtractor, self).__init__()
        self.embedding = torch.nn.Embedding(num_embeddings, embedding_dim)
        self.conv1 = GATConv(embedding_dim, hidden_dim)
        self.conv2 = GATConv(hidden_dim, hidden_dim)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        embedded_x = self.embedding(x).view(-1, 64)  # Ensure embedding dimension matches
        x = self.conv1(embedded_x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)  # Aggregate to graph level
        return x

class RegressionModel(nn.Module):
    def __init__(self, input_dim, output_dim=1):
        super(RegressionModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = "cpu"

# Preparing the dataset
prepared_graphs = [[nx_to_pyg_graph(g) for g in sublist] for sublist in graphs]

# Model setup
num_embeddings = len(type_to_int) + 1
Y = torch.tensor(np.arange(1, len(graphs) + 1) * 5, dtype=torch.float, device=device)

In [12]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [11]:
feature_extractor = GATFeatureExtractor(num_embeddings=num_embeddings, embedding_dim=64, hidden_dim=64).to(device)
regressor = RegressionModel(input_dim=64).to(device)

# Optimization setup
optimizer = torch.optim.Adam(list(feature_extractor.parameters()) + list(regressor.parameters()), lr=0.01)
criterion = nn.MSELoss()

# Training
num_epochs = 100
for epoch in range(num_epochs):
    total_loss = 0
    for i, graph_list in enumerate(prepared_graphs):
        batch_data = Batch.from_data_list(graph_list).to(device)
        optimizer.zero_grad()
        embeddings = feature_extractor(batch_data)
        prediction = regressor(embeddings).squeeze(-1)  # Ensure the prediction dimension matches the target
        loss = criterion(prediction, Y[i].unsqueeze(0).to(device))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch}, Loss: {total_loss / len(prepared_graphs)}")

../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: [64,0,0] Assertion `srcIndex < srcSelectDimSize` failed.
../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: [65,0,0] Assertion `srcIndex < srcSelectDimSize` failed.
../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: [66,0,0] Assertion `srcIndex < srcSelectDimSize` failed.
../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: [67,0,0] Assertion `srcIndex < srcSelectDimSize` failed.
../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: [68,0,0] Assertion `srcIndex < srcSelectDimSize` failed.
../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: [69,0,0] Assertion `srcIndex < srcSelectDimSize` failed.
../aten/src/ATen/native/cuda/Indexing.cu:1290: indexSelectLargeIndex: block: [1,0,0], thread: 

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
import torch

# Check if CUDA is available
if torch.cuda.is_available():
    # Create two tensors
    a = torch.tensor([1.0, 2.0, 3.0], device='cuda')
    b = torch.tensor([4.0, 5.0, 6.0], device='cuda')
    
    # Perform addition
    c = a + b
    
    # Move the result back to CPU for printing
    c_cpu = c.to('cpu')
    
    print("Result of a + b:", c_cpu)
else:
    print("CUDA is not available. Testing on CPU.")
    
    # Perform the operation on CPU
    a = torch.tensor([1.0, 2.0, 3.0])
    b = torch.tensor([4.0, 5.0, 6.0])
    c = a + b
    
    print("Result of a + b:", c)


Result of a + b: tensor([5., 7., 9.])
