### Importing essential libraries

In [1]:
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Bidirectional
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, r2_score
from sklearn.model_selection import train_test_split
import clang.cindex
import tempfile
from sklearn.model_selection import KFold
import logging
import re
# Set up logging
logging.basicConfig(level=logging.INFO)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Data file 

In [2]:
#Reading data
data = pd.read_csv('Data_Ast.csv')
data.head(5)

Unnamed: 0.1,Unnamed: 0,Question,Correct_Code,Code_with_Error,Total_Marks,AST_full
0,0,Print the factors of a number,#include <stdio.h>\nvoid printFactors(int numb...,#include <stdio.h>\nvoid printFactors(int numb...,7.0,CursorKind.FUNCTION_DECL printFactors\n Curso...
1,1,Print the factors of a number,#include <stdio.h>\nvoid printFactors(int numb...,#include <stdio.h>\nvoid printFactors(int numb...,8.0,CursorKind.FUNCTION_DECL printFactors\n Curso...
2,2,Print the factors of a number,#include <stdio.h>\nvoid printFactors(int numb...,#include <stdio.h>\nvoid printFactors(int numb...,5.0,CursorKind.FUNCTION_DECL printFactors\n Curso...
3,3,Print the factors of a number,#include <stdio.h>\nvoid printFactors(int numb...,#include <stdio.h>\n\nvoid printFactors(int nu...,7.0,CursorKind.FUNCTION_DECL printFactors\n Curso...
4,4,Print the factors of a number,#include <stdio.h>\nvoid printFactors(int numb...,#include <stdio.h>\n\nvoid printFactors(int nu...,5.0,CursorKind.FUNCTION_DECL printFactors\n Curso...


### Code to full AST and partial AST

In [4]:
####IMPLEMENTATION OF C CODE PARSING USING SIMPLE AST

import clang.cindex
import tempfile

def parse_c_code_from_string(c_code):
    
    # Create a temporary file to store the C code string
    with tempfile.NamedTemporaryFile(suffix=".c", delete=False) as tmp_file:
        tmp_file.write(c_code.encode())  
        tmp_file.flush()  
        tmp_file_path = tmp_file.name  

    index = clang.cindex.Index.create()
    translation_unit = index.parse(tmp_file_path)

    # Function to recursively build the AST string representation
    def build_ast_string(node, indent=0):
        if node.kind == clang.cindex.CursorKind.TRANSLATION_UNIT:
            ast_str = ""
            for child in node.get_children():
                ast_str += build_ast_string(child, indent) 
            return ast_str
        else:
            ast_str = '  ' * indent + f"{node.kind}\n"
            for child in node.get_children():
                ast_str += build_ast_string(child, indent + 1) 
            return ast_str

    # Build the AST string starting from the root cursor
    ast_representation = build_ast_string(translation_unit.cursor)

    return ast_representation

data['AST_full'] = data['Code_with_Error'].apply(parse_c_code_from_string)



In [7]:
data.to_csv("Data_AST.csv")

In [15]:
####IMPLEMENTATION OF C CODE PARSING USING PARTIAL AST

import clang.cindex
import tempfile

def find_parent_manually(root, target_node):
        for child in root.get_children():
            # Check if the target_node is among the children
            if any(grandchild == target_node for grandchild in child.get_children()):
                return child
            # Recursively search in deeper children
            parent = find_parent_manually(child, target_node)
            if parent:
                return parent
        return None

def parse_c_code_from_string(c_code):
    # Create a temporary file to store the C code string
    with tempfile.NamedTemporaryFile(suffix=".c", delete=False) as tmp_file:
        tmp_file.write(c_code.encode())
        tmp_file.flush()
        tmp_file_path = tmp_file.name

    # Create an index for parsing
    index = clang.cindex.Index.create()

    # Parse the C code from the temporary file
    translation_unit = index.parse(tmp_file_path)

    # Function to get original code lines for display
    original_lines = c_code.splitlines()

    # Function to find siblings using Clang's built-in functionality
    def find_next_brother(node):
        parent = find_parent_manually(translation_unit.cursor, node)
        if not parent:
            return None

        siblings = list(parent.get_children())  
        current_index = siblings.index(node) 

        # Return the next sibling if it exists
        if current_index + 1 < len(siblings):
            return siblings[current_index + 1]
        else:
            return find_next_brother(parent)
    
     # Function to find if right sibling exist
    def right_sibling_exist(node):
        parent = find_parent_manually(translation_unit.cursor, node)
        if not parent:
            return None

        siblings = list(parent.get_children())  
        current_index = siblings.index(node) 

        # Return the next sibling if it exists
        if current_index + 1 < len(siblings):
            return True
        else:
            return False
    
    
    node_same_line=[]        
    # Recursive function to build AST string, printing errors and original lines when needed
    def build_ast_string(node, indent=0):
        ast_str = ''
        # Check diagnostics at the current node
        #for d in translation_unit.diagnostics:
            #print(f"Line: {d.location.line}, Column: {d.location.column}, Severity: {d.severity}, Message: {d.spelling}")
        
        diagnostics =[]
        diagnostics_1=[]
        
        for d in translation_unit.diagnostics:
            if d.location.line == node.location.line and node.location.line not in node_same_line:
                diagnostics.append(d)
                
            elif d.location.line == node.location.line+1:
                diagnostics_1.append(d)
        

        if diagnostics and len(list(node.get_children()))==0 and not right_sibling_exist(node):
            # If there's a diagnostic, print the error and the code causing it
            ast_str += '  ' * indent + f"{node.kind} {node.spelling}\n"
            ast_str += '  ' * indent + f"{original_lines[node.location.line - 1]}\n"
            #This list ensures that if the error exist in the same line, the code is printed only once and not for all it's childen
            node_same_line.append(node.location.line)
        else:
            # Continue building the AST normally
            ast_str += '  ' * indent + f"{node.kind} {node.spelling}\n"
        
        # Find the next sibling to check lines between the current node and its sibling
        sibling = find_next_brother(node)

        # Check and print code lines between the current node and its sibling
        if sibling and sibling.location.line - node.location.line > 1:
            print(sibling.location.line - node.location.line)
            # Print lines that are between the current node and its sibling
            for line_num in range(node.location.line, sibling.location.line - 1):
                line = original_lines[line_num]
                if line.strip() and not all(char in ')}] ' for char in line):
                    print("in")
                    ast_str += '  ' * indent + f"{line}\n"
                    
        elif sibling is None and len(original_lines)-node.location.line > 1 and  len(list(node.get_children()))==0:
            print("in")
            # Print lines that are between the current node and its sibling
            for line_num in range(node.location.line, len(original_lines) - 1):
                line = original_lines[line_num]
                if line.strip() and not all(char in '(){}[] ' for char in line):
                    ast_str += '  ' * indent + f"{line}\n"
            
        # Recursively traverse and build AST for child nodes
        for child in node.get_children():
            ast_str += build_ast_string(child, indent + 1)
            
        return ast_str

    # Build the AST string starting from the root cursor
    ast_representation = build_ast_string(translation_unit.cursor)

    return ast_representation 



### Data preprocessing

In [5]:
def clean_text_code(text):
        #Preprocesses text by removing blank lines, reducing whitespace, and replacing newlines with spaces.
        text = '\n'.join(line for line in text.split('\n') if line.strip())  # Remove blank lines
        text = re.sub(r'\s{2,}', ' ', text)  # Replace multiple spaces with one
        return text.replace('\n', ' ')  # Replace newlines with spaces

def clean_text_AST(text):
    
    # Split the input into lines while preserving the structure
    text = text.replace('CursorKind.', '')
    lines = text.splitlines()
    cleaned_lines = []

    for line in lines:

        leading_spaces = len(line) - len(line.lstrip())
        stripped_line = line.lstrip()
        
        # Replace 'CursorKind.' and process each word
        words = stripped_line.split()
        cleaned_words = []
        
       
        for word in words:
            if '_' in word:
                parts = word.split('_')
                cleaned_word = parts[0][:3]  # First 3 letters before the '_'
                
                if len(parts) > 1:
                    cleaned_word += parts[1][:3]  # Next 3 letters after the '_'
                
                if len(parts) > 2:
                    cleaned_word += parts[2][:2]  # Take 2 letters after the second '_'
                
                cleaned_words.append(cleaned_word)
            else:
                cleaned_words.append(word[:4])  # Take the first 4 letters if no '_'

        # Rejoin the cleaned words into a single line
        cleaned_line = ' '.join(cleaned_words)

        # Reapply the leading spaces (indentation) to the line
        cleaned_lines.append(' ' * leading_spaces + cleaned_line)

    # Join the cleaned lines back into a single string, preserving newlines
    cleaned_text = '\n'.join(cleaned_lines)
    text = re.sub(r'\s{2,}', ' ', cleaned_text)  # Replace multiple spaces with one
    return text.replace('\n', ' ')

data['AST_full_Processed']=data['AST_full'].apply(clean_text_AST)
data['Code_with_Error_Processed']=data['Code_with_Error'].apply(clean_text_code)

### CodeT5 embedding class

In [None]:
import gc
class CodeT5Embeddings:
    def __init__(self):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # Use CodeT5 tokenizer and model
        self.tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-base")
        self.model = AutoModel.from_pretrained("Salesforce/codet5-base").to(self.device)

    def _get_embeddings(self, text):
        # General method to get CodeT5 embeddings for any text input.
        if not text.strip():  # Check if input text is empty or whitespace
            logging.warning(f"Empty input text provided.")
            return torch.zeros(1, 768).to(self.device)

        # Tokenize input
        tokens = self.tokenizer(
            text,
            return_tensors="pt",
            padding="max_length",
            max_length=512
        ).to(self.device)

        # Check if the tokenizer produced no tokens
        if tokens['input_ids'].size(1) == 0:
            logging.warning(f"Warning: No tokens found for text snippet: {text[:50]}...")
            return torch.zeros(1, 768).to(self.device)

        # Check if the token length exceeds 512
        if tokens['input_ids'].size(1) > 512:
            logging.warning(f"Warning: Token size exceeds 512 for text: {text[:50]}...")
            return torch.zeros(1, 768).to(self.device)

        # Pass tokens through the model and extract CLS embedding
        outputs = self.model.encoder(**tokens)
        # CodeT5 uses a CLS token to represent the input, so we extract it
        embeddings = outputs.last_hidden_state[:, 0, :]
        return embeddings 
    
    def process_embeddings_in_batches(self, texts, batch_size=16):

        all_embeddings = []

        # Divide the list into batches of size `batch_size`
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            # Get embeddings for the current batch
            batch_embeddings = self._get_batch_embeddings(batch_texts)
            all_embeddings.append(batch_embeddings)

        # Combine embeddings from all batches
        final_embeddings = torch.cat(all_embeddings, dim=0)

        # Clear memory
        del all_embeddings, batch_embeddings
        torch.cuda.empty_cache()
        gc.collect()

        return final_embeddings


    def process(self, code, ast_full):        
        # Process and return embeddings for code and AST.
        code_embedding = self._get_embeddings(code)
        ast_embedding = self._get_embeddings(ast_full)
        return code_embedding, ast_embedding


class CodeT5Processor:
    def __init__(self, dataframe, embedding_model, embedding_save_path="embeddings.pt"):
        # Initialize the processor with a DataFrame, embedding model, and optional save path.
        self.dataframe = dataframe
        self.embedding_model = embedding_model
        self.embedding_save_path = embedding_save_path
        self.embeddings = None  # To store the embeddings

    def compute_embeddings(self):
        # Compute embeddings for the 'Code_with_Error_Processed' and 'AST_full_Processed' columns in the DataFrame.
        code_embeddings = []
        ast_embeddings = []
        
        for idx, row in self.dataframe.iterrows():
            code = row['Code_with_Error_Processed']
            ast = row['AST_full_Processed']
            
            # Check if either tokenized code or AST exceeds the length of 512 tokens
            code_tokens = self.embedding_model.tokenizer(code, return_tensors="pt")['input_ids']
            ast_tokens = self.embedding_model.tokenizer(ast, return_tensors="pt")['input_ids']
            
            # Log and skip rows where token size is greater than 512 for either code or AST
            if len(code_tokens[0]) > 512 or len(ast_tokens[0]) > 512:
                logging.warning(f"Skipping row {idx} due to token size exceeding 512 for code or AST.")
                continue
            
            # Process embeddings for code and AST
            code_embedding, ast_embedding = self.embedding_model.process(code, ast)
            
            # Store embeddings for this row
            code_embeddings.append(code_embedding.cpu())
            ast_embeddings.append(ast_embedding.cpu())
            
            torch.cuda.empty_cache()
            gc.collect()

        # Convert to tensors and store
        self.embeddings = {
            'code_embeddings': torch.stack(code_embeddings),
            'ast_embeddings': torch.stack(ast_embeddings)
        }
        
        


    def save_embeddings(self):
        # Save the computed embeddings to a file.
        if self.embeddings is None:
            raise ValueError("No embeddings found. Please compute embeddings first.")
        
        torch.save(self.embeddings, self.embedding_save_path)
        print(f"Embeddings saved to {self.embedding_save_path}")

    def load_embeddings(self):
        # Load embeddings from the save file.
        self.embeddings = torch.load(self.embedding_save_path)
        print(f"Embeddings loaded from {self.embedding_save_path}")

    def get_embeddings(self):
        # Return the stored embeddings.
        if self.embeddings is None:
            raise ValueError("No embeddings found. Please compute or load embeddings first.")
        return self.embeddings


### Main class

In [7]:
#Embedding extraction
embedding_model = CodeT5Embeddings()
processor = CodeT5Processor(data, embedding_model)
processor.compute_embeddings()
processor.save_embeddings()
processor.load_embeddings()
embeddings = processor.get_embeddings()

print(embeddings['code_embeddings'])
print(embeddings['ast_embeddings'])

Some weights of the model checkpoint at Salesforce/codet5-base were not used when initializing T5Model: ['lm_head.weight']
- This IS expected if you are initializing T5Model from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing T5Model from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


: 