In [1]:
import torch
import torch.nn as nn

from transformers import BertModel, BertForSequenceClassification, BertTokenizer
import pandas as pd
from preprocessing import Preprocess
from tests import functions
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score



In [2]:

class BertForLineClassification(torch.nn.Module):
    def __init__(self):
        super(BertForLineClassification, self).__init__()
        # self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(768, 8)  # 8 is the number of labels
        # self.preprocessor = preprocessing.Preprocessor()
        self.preprocessor = Preprocess()


    def forward(self, input_tensor):
        # outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # pooled_output = outputs.pooler_output
        pooled_output = self.dropout(input_tensor)
        logits = self.classifier(pooled_output)
        return logits





In [3]:
import numpy as np
from sklearn.metrics import mean_absolute_error
def prepare_train_test_validation(option = None):
    if(option == 'saif'):
        # read DatasetMadeBySeif.xlsx 
        train = pd.read_excel('data/train.xlsx')
        test  = pd.read_excel('data/test.xlsx')  
        # print the first 5 rows of the data
        
        return train , test
    # Prepare your train, test, and validation data here
    # from data folder read data_train.parquet, data_test.parquet, and data_validation.L
    data_train = pd.read_parquet('data/data_train_lines_combined.parquet')
    data_test = pd.read_parquet('data/data_lines_test.parquet')
    data_validation = pd.read_parquet('data/data_lines_val.parquet')
    # lines is data column and lable is label column
    train_data = data_train[['lines', 'label']]
    test_data = data_test[['lines', 'label']]
    validation_data = data_validation[['lines', 'label']]
    return train_data, test_data
    

# Train the model
def train(model, train_data, num_epochs=10, print_every=100, option = None):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        total_loss = 0
        num_samples = 0

        for i, data in enumerate(train_data.iterrows()):
            # Get line and label from data
            if(option == 'saif'):
                line = data[1]['code']
                label = data[1]['label']
            else:    
                line = data[1]['lines']
                label = data[1]['label']
            if label == 1 or label == 2 or label == 3:
                label = torch.tensor([1])
            elif label == 4 or label == 5 or label == 6 or label == 7:
                label = torch.tensor([2])
            else:
                label = torch.tensor([0])

            # label = torch.tensor([label//3])


            # Transform line to tensor using generate_line_embeddings method
            line_tensor = model.preprocessor.generate_line_emdeddings(line)

            # Train the model using line_tensor and label
            logits = model(line_tensor)
            loss = criterion(logits, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * label.size(0)
            num_samples += label.size(0)

            if (i + 1) % print_every == 0:
                avg_loss = total_loss / num_samples
                print(f'Epoch [{epoch + 1}/{num_epochs}], Iteration [{i + 1}/{len(train_data)}], Average Loss: {avg_loss:.4f}')

        # Print statistics for the epoch
        avg_loss = total_loss / num_samples
        print(f'Epoch [{epoch + 1}/{num_epochs}], Average Loss: {avg_loss:.4f}')
        #validate(model, validation_data)

# Test the model

def test(model, test_data , option = None):
    criterion = nn.CrossEntropyLoss()
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    all_labels = []
    all_predictions = []
    total_absolute_difference = 0
    with torch.no_grad():
        for data in test_data.iterrows():
            if(option == 'saif'):
                line = data[1]['code']
                label = data[1]['label']
            else:    
                line = data[1]['lines']
                label = data[1]['label']
            if label == 1 or label == 2 or label == 3:
                label = torch.tensor([1])
            elif label == 4 or label == 5 or label == 6 or label == 7:
                label = torch.tensor([2])
            else:
                label = torch.tensor([0])
            # label = torch.tensor([label//3])

            line_tensor = model.preprocessor.generate_line_emdeddings(line)
            logits = model(line_tensor)
            loss = criterion(logits, label)

            total_loss += loss.item()
            _, predicted = torch.max(logits, 1)
            
            predicted = predicted.item()
            label = label.item()
            all_labels.append(label)
            all_predictions.append(predicted)
            correct_predictions += (predicted == label)
            total_samples += 1            
            total_absolute_difference += abs(predicted - label)
            
            
    average_loss = total_loss / len(test_data)
    accuracy = accuracy_score(all_labels, all_predictions)
    accuracy_manual = correct_predictions / total_samples
    precision = precision_score(all_labels, all_predictions, average='macro')
    recall = recall_score(all_labels, all_predictions, average='macro')
    f1 = f1_score(all_labels, all_predictions, average='macro')
    average_absolute_difference = total_absolute_difference / total_samples

    
    # calculate average of absolute difference between label and predicated
    
    print(f'Manual Accuracy: {accuracy_manual:.4f}')
    print(f'Test Loss: {average_loss:.4f}, Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}')
    print(f'Average Absolute Difference: {average_absolute_difference:.4f}')

In [12]:
#create function to save the model
def save_model(model, path):
    torch.save(model.state_dict(), path)

In [4]:
model = BertForLineClassification()

In [7]:
train_data, test_data = prepare_train_test_validation('saif')

In [None]:

train(model, train_data, 5 , 1000, 'saif')

In [None]:
test(model, test_data , 'saif')

Manual Accuracy: 0.5287
Test Loss: 11.4629, Accuracy: 0.5287
Precision: 0.2643, Recall: 0.5000, F1 Score: 0.3458
Average Absolute Difference: 0.4713


  _warn_prf(average, modifier, msg_start, len(result))


In [16]:
# save_model(model, 'bert_model.pth')
# Save the trained model
torch.save(model.state_dict(), 'bert_model_generic_yarab.pth')


# save the model in the same directory as the code

In [5]:
# Load the saved model
model_raw = BertForLineClassification()
model_raw.load_state_dict(torch.load('bert_model_generic_2.pth'))
model_raw.eval()  # Put the model in evaluation mode for inference
# train model after load it 
# train(model_raw, train_data, validation_data, 5 , 1000)

BertForLineClassification(
  (dropout): Dropout(p=0.1, inplace=False)
  (classifier): Linear(in_features=768, out_features=8, bias=True)
)

In [46]:
test(model_raw, test_data)

Manual Accuracy: 0.9245
Test Loss: 0.3722, Accuracy: 0.9245
Precision: 0.2060, Recall: 0.2017, F1 Score: 0.2033
Average Absolute Difference: 0.1657


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [6]:
# Load the saved model
model_generic = BertForLineClassification()
model_generic.load_state_dict(torch.load('bert_model_2.pth'))
model_generic.eval()  # Put the model in evaluation mode for inference

BertForLineClassification(
  (dropout): Dropout(p=0.1, inplace=False)
  (classifier): Linear(in_features=768, out_features=8, bias=True)
)

In [7]:
def parse_inference(lines_list, raw_labels, generic_labels):
    print(len(lines_list), len(raw_labels), len(generic_labels))
    for line, raw_label, generic_label in zip(lines_list, raw_labels, generic_labels):
        print(f'Line: {line},  Label M1: {raw_label},  Label M2 : {generic_label}')


In [8]:
def infere(model,lines_tensor):
    logits = model(lines_tensor)
    _, predicted = torch.max(logits, 1)
    return predicted

In [9]:
# print ( functions)
lines_tensors =  model.preprocessor.generate_embeddings(functions[0])
cleaned_func = model.preprocessor.clean_function_source(functions[0])
# print(cleaned_func)
# print(cleaned_func)
raw_inference = []
generic_inference = []
# print("Inference results for model_raw : ")
for tensor in lines_tensors:
    raw_inference.append(infere(model_raw,tensor))
    generic_inference.append(infere(model_generic,tensor))

# print("Inference results for model_generic : ")
# for tensor in lines_tensors:
    
parse_inference(cleaned_func, raw_inference, generic_inference)
    


def simple_function(var_0, var_2):
    var_6 = "SELECT * FROM products WHERE var_4=" + \
        str(var_0) + " AND var_5='" + str(var_2) + "'"
    var_6 += "'; DROP TABLE users; --"

    var_1 = var_0 + var_2
    os.system("echo Hello from the system!")
    if var_0 == 0:
        print("var_0 is zero!")
    var_7 = var_0 * var_2
    var_3 = eval(input("Enter an expression: "))
    return var_1, var_7

def simple_function(var_0, var_2):
    var_6 = "SELECT * FROM products WHERE var_4=" + \
        str(var_0) + " AND var_5='" + str(var_2) + "'"
    var_6 += "'; DROP TABLE users; --"

    var_1 = var_0 + var_2
    os.system("echo Hello from the system!")
    if var_0 == 0:
        print("var_0 is zero!")
    var_7 = var_0 * var_2
    var_3 = eval(input("Enter an expression: "))
    return var_1, var_7

13 13 13
Line: def simple_function(var_0, var_2):,  Label M1: tensor([0]),  Label M2 : tensor([0])
Line: var_6 = "SELECT * FROM products WHERE var_4=" + \,  Label M1: tensor([4]),  Label M

In [55]:
import re
import inspect
def transform_to_generic_form(function_code):

  function_code = inspect.getsource(simple_function)
  def _replace_names(match):
    name_type = match.group(1)  # Capture if it's a function (def/class) or variable
    orig_name = match.group(2) if name_type else match.group(0)

    if not name_type:  # Only replace variable names
      generic_name = f"VAR_{len(name_map) + 10}"  # Start from VAR_10
      name_map[orig_name] = generic_name
      return generic_name

    return match.group()  # Don't modify function names

  name_map = {}  # Map original names to generic names

  # Replace variable names with generic format
  transformed_code = re.sub(r"(def|class)\s+(\w+)\b|\b(\w+)\b", _replace_names, function_code)

  return transformed_code


In [None]:
print(transform_to_generic_form(process_data))