In [46]:
import tree_sitter_python as tspython
from tree_sitter import Language, Parser

PY_LANGUAGE = Language(tspython.language())

In [47]:
from collections import Counter

import pandas as pd
import torch
from sklearn.model_selection import train_test_split

# Set up parser
parser = Parser(PY_LANGUAGE)

# Sample code snippets and their binary labels (e.g., 0 = clean, 1 = vulnerable)
samples = [
    (b"def add(a, b): return a + b", 0),
    (b"def eval_input(): eval(input())", 1),  # potentially dangerous use
]

# List all possible node types we'll count
node_types = set()


def walk_tree(node, types):
    types.append(node.type)
    for child in node.children:
        walk_tree(child, types)


def code_to_feature_vector(code):
    tree = parser.parse(code)
    types = []
    walk_tree(tree.root_node, types)
    counts = Counter(types)
    feature_vector = [counts.get(typ, 0) for typ in node_types]
    return torch.tensor(feature_vector, dtype=torch.float32)


df_gen = pd.read_csv("../../data/generated/gen_solutions.csv")
df_gen = df_gen[["spec", "solution"]]
df_gen.columns = ["task", "text"]
df_gen["generated"] = 1
df_real = pd.read_csv("../../data/db_attempts.csv")
df_real = df_real[["task", "programText"]]
df_real.columns = ["task", "text"]
df_real["generated"] = 0
df = pd.concat([df_gen, df_real], axis=0, ignore_index=True)
df = df.dropna(subset=["text"])
df["text"] = df["text"].astype(str)
df.drop_duplicates()
df["id"] = df.index + 1

df = df.reset_index(drop=True)
train_df, val_prep = train_test_split(df, test_size=0.2, stratify=df["generated"])
valid_df, test_df = train_test_split(
    val_prep, test_size=0.25, stratify=val_prep["generated"]
)
train_df = train_df.reset_index(drop=True)
valid_df = valid_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

# First pass: gather all node types
for _, row in train_df.iterrows():
    tree = parser.parse(str.encode(row["text"]))
    types = []
    walk_tree(tree.root_node, types)
    node_types.update(types)

# Create fixed feature vector mapping
node_types = sorted(node_types)
type_to_idx = {typ: i for i, typ in enumerate(node_types)}

# X = torch.stack([code_to_feature_vector(train_df["text"]) for code, _ in samples])
# y = torch.tensor([label for _, label in samples], dtype=torch.long)

In [88]:
node_types

['!=',
 '%',
 '%=',
 '&',
 '(',
 ')',
 '*',
 '**',
 '*=',
 '+',
 '+=',
 ',',
 '-',
 '-=',
 '->',
 '.',
 '/',
 '//',
 '//=',
 '/=',
 ':',
 ':=',
 ';',
 '<',
 '<<',
 '<<=',
 '<=',
 '=',
 '==',
 '>',
 '>=',
 '>>',
 '>>=',
 '@',
 'ERROR',
 '[',
 '\\',
 ']',
 '^',
 '^=',
 '_',
 'and',
 'argument_list',
 'as',
 'as_pattern',
 'as_pattern_target',
 'assert',
 'assert_statement',
 'assignment',
 'attribute',
 'augmented_assignment',
 'binary_operator',
 'block',
 'boolean_operator',
 'break',
 'break_statement',
 'call',
 'case',
 'class',
 'class_definition',
 'comment',
 'comparison_operator',
 'concatenated_string',
 'conditional_expression',
 'constrained_type',
 'continue',
 'continue_statement',
 'def',
 'default_parameter',
 'del',
 'delete_statement',
 'dictionary',
 'dictionary_comprehension',
 'dotted_name',
 'elif',
 'elif_clause',
 'else',
 'else_clause',
 'escape_sequence',
 'except',
 'expression_list',
 'expression_statement',
 'false',
 'float',
 'for',
 'for_in_clause',
 'for_

In [71]:
from torch.utils.data import DataLoader, Dataset


class CustomDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def __getitem__(self, index):
        return (
            self.dataframe["text"].iloc[index],
            code_to_feature_vector(str.encode(self.dataframe["text"].iloc[index])),
            float(self.dataframe["generated"].iloc[index]),
        )

    def __len__(self):
        return len(self.dataframe)


data_train = CustomDataset(dataframe=train_df)
dataloader_train = DataLoader(data_train, batch_size=32)
data_val = CustomDataset(dataframe=valid_df)
dataloader_val = DataLoader(data_val, batch_size=32)
data_test = CustomDataset(dataframe=test_df)
dataloader_test = DataLoader(data_test, batch_size=32)

In [73]:
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import f1_score, recall_score, roc_auc_score
from tqdm import tqdm


class CodeClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=32):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, 1)  # binary classification

    def forward(self, x):
        return torch.sigmoid(self.fc2(self.relu(self.fc1(x))))


model = CodeClassifier(input_dim=len(node_types))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Simple training loop
for epoch in range(5):
    model.train()
    for _, text, label in tqdm(dataloader_train):
        outputs = model(text)
        outputs = outputs.squeeze()
        loss = criterion(outputs, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    model.eval()
    with torch.no_grad():
        all_predictions = []
        all_truths = []
        for _, text, label in tqdm(dataloader_val):
            outputs = model(text)
            outputs = outputs.squeeze()

            all_predictions.extend(outputs.numpy().tolist())
            all_truths.extend(label.numpy().tolist())

        rounded = [round(x) for x in all_predictions]
        all_truths = [round(x) for x in all_truths]

        score = roc_auc_score(all_truths, rounded)
        f1 = f1_score(all_truths, rounded)
        recall = recall_score(all_truths, rounded)
        print(f"ROC-AUC={score}, F1={f1}, recall={recall}")


print(f"Final Loss: {loss.item():.4f}")

100%|██████████| 232/232 [00:02<00:00, 111.31it/s]
100%|██████████| 44/44 [00:00<00:00, 122.48it/s]


ROC-AUC=0.9836189442012511, F1=0.9816326530612245, recall=0.9717171717171718


100%|██████████| 232/232 [00:02<00:00, 111.92it/s]
100%|██████████| 44/44 [00:00<00:00, 121.88it/s]


ROC-AUC=0.9961903469182304, F1=0.993963782696177, recall=0.997979797979798


100%|██████████| 232/232 [00:02<00:00, 110.18it/s]
100%|██████████| 44/44 [00:00<00:00, 126.95it/s]


ROC-AUC=0.9973101677468978, F1=0.9959677419354839, recall=0.997979797979798


100%|██████████| 232/232 [00:02<00:00, 108.79it/s]
100%|██████████| 44/44 [00:00<00:00, 120.88it/s]


ROC-AUC=0.9977603583426652, F1=0.9959758551307847, recall=1.0


100%|██████████| 232/232 [00:02<00:00, 112.29it/s]
100%|██████████| 44/44 [00:00<00:00, 123.94it/s]

ROC-AUC=0.9919305032406935, F1=0.9879518072289156, recall=0.9939393939393939
Final Loss: 7.1997





In [76]:
from sklearn.metrics import mean_absolute_error, mean_squared_error

with torch.no_grad():
    all_predictions = []
    all_truths = []
    all_real_texts = []
    for real_text, text, label in tqdm(dataloader_test):
        outputs = model(text)
        outputs = outputs.squeeze()

        all_real_texts.extend(real_text)
        all_predictions.extend(outputs.numpy().tolist())
        all_truths.extend(label.numpy().tolist())

    print(
        f"MAE={mean_absolute_error(all_predictions, all_truths)}, MSE={mean_squared_error(all_predictions, all_truths)}"
    )

    rounded = [round(x) for x in all_predictions]
    all_truths = [round(x) for x in all_truths]

    score = roc_auc_score(all_truths, rounded)
    f1 = f1_score(all_truths, rounded)
    recall = recall_score(all_truths, rounded)
    print(f"ROC-AUC={score}, F1={f1}, recall={recall}")

100%|██████████| 15/15 [00:00<00:00, 120.50it/s]

MAE=0.006881102376334462, MSE=0.005617118145371848
ROC-AUC=0.9919361399227171, F1=0.9879518072289156, recall=0.9939393939393939





In [77]:
res = pd.DataFrame(
    {"code": all_real_texts, "real": all_truths, "predicted": all_predictions}
)
res.to_csv("ast.csv")

In [103]:
len(res)

463

In [102]:
new_df = res[abs(res["predicted"] - res["real"]) >= 0.3]
new_df

Unnamed: 0,code,real,predicted
123,"keyboard = {\n 'a': 's', 's': 'd', 'd': 'f'...",0,1.0
178,print(len(input().split())),0,0.546303
326,print(len(input().split())),0,0.546303
353,"\r\nv = int(input(""\nv - bus speed,\r\n""))\r\n...",1,2.562366e-12


In [87]:
print(str(new_df["code"].iloc[3]))


v = int(input("\nv - bus speed,\r\n"))
t = int(input("\nt - travel time.\r\n"))
distance = v * t
print(distance)



In [98]:
code1 = """
a,b = map(int, input().split())
if a > b:
    return 1
return 0
"""

code2 = """
x, y = map(int, input().split())
return int(x>y)
"""

code3 = """
l = map(int, input().split())
if l[0] > l[1] :
    return 1
else:
    return 0
"""

vec1 = code_to_feature_vector(str.encode(code1))
vec2 = code_to_feature_vector(str.encode(code2))
vec3 = code_to_feature_vector(str.encode(code3))

In [99]:
vec1.dot(vec2) / vec1.norm() / vec2.norm()

tensor(0.9536)

In [100]:
vec3.dot(vec2) / vec3.norm() / vec2.norm()

tensor(0.8293)