<a href="https://colab.research.google.com/github/LorenzoCorbinelli/MLSA-project/blob/chunking/Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Environment setup

In [56]:
!pip install transformers datasets



In [57]:
import os
import pandas as pd
import torch
import numpy as np
from transformers import RobertaTokenizer, RobertaConfig, AutoModel, DataCollatorForLanguageModeling, RobertaForMaskedLM
from datasets import Dataset as ds
from torch.utils.data import DataLoader, TensorDataset, Dataset
from torch.optim import Adam

# Load the dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Download the dataset
!wget http://files.srl.inf.ethz.ch/data/py150_files.tar.gz

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--2025-02-08 17:38:49--  http://files.srl.inf.ethz.ch/data/py150_files.tar.gz
Resolving files.srl.inf.ethz.ch (files.srl.inf.ethz.ch)... 129.132.114.90
Connecting to files.srl.inf.ethz.ch (files.srl.inf.ethz.ch)|129.132.114.90|:80... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://files.sri.inf.ethz.ch/data/py150_files.tar.gz [following]
--2025-02-08 17:38:49--  https://files.sri.inf.ethz.ch/data/py150_files.tar.gz
Resolving files.sri.inf.ethz.ch (files.sri.inf.ethz.ch)... 129.132.114.90
Connecting to files.sri.inf.ethz.ch (files.sri.inf.ethz.ch)|129.132.114.90|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 199067128 (190M) [application/gzip]
Saving to: ‘py150_files.tar.gz.1’


In [None]:
!tar -xvzf py150_files.tar.gz # unzip the folder containing the dataset

In [None]:
!tar -xvzf data.tar.gz # unzip the dataset

After unzipping the data.tar.gz folder, we retrieve the python files, removing all the comments because they are not usefull for our purposes

Extract all the source code of the python files into a dataframe. One file is reported into a single line, including the reference of the file itself

In [None]:
def load_python_files(root_dir):
    """Loads all Python files in a directory recursively into a DataFrame."""
    all_files = []
    for dirpath, dirnames, filenames in os.walk(root_dir):
        for filename in filenames:
            if filename.endswith(".py"):  # take only python files
                filepath = os.path.join(dirpath, filename)
                try:
                    with open(filepath, 'r', encoding='utf-8') as f:
                        content = f.readlines() # read the file line by line
                        filtered_content = []
                        inside_triple_quotes = False
                        for line in content:
                            stripped_line = line.strip()
                            if '"""' in stripped_line:
                                if stripped_line.count('"""') == 2:
                                    continue  # Ignore lines with both opening and closing triple quotes
                                inside_triple_quotes = not inside_triple_quotes
                                continue
                            if inside_triple_quotes or stripped_line.startswith('#'): # ignore line that starts with # or check if I am inside a multiline comment
                                continue
                            filtered_content.append(line)
                        all_files.append({'filepath': filepath, 'snippet_of_code': ''.join(filtered_content)})
                except Exception as e:
                    print(f"Error reading file {filepath}: {e}")
    return pd.DataFrame(all_files)


In [None]:
data_dir = "/content/data"
df_python_files = load_python_files(data_dir)

In [None]:
df_train = df_python_files["snippet_of_code"].iloc[0:2000] # take only the first 2000 snippets for training
df_eval = df_python_files["snippet_of_code"].iloc[2000:3000] # take 1000 snippets for evaluation

# Tokenization

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base-mlm')
# tokenizer arguments to properly handle the tokenization of the snippets
tokenizer_kwargs = dict(truncation=True, padding=True, max_length=500, add_special_tokens=True)

In [None]:
'''
We have used DataCollatorForLanguageModeling in order to tokenize the dataset and mask some tokens
It will automatically generate the labels for the masked tokens.
For the tokens not masked the label will be -100
'''
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm_probability=0.15)

In [None]:
def tokenize_dataset(dataset, tokenizer, **kwargs):
    token_ids = tokenizer(dataset, return_tensors='pt', **kwargs)
    return token_ids

In [None]:
train_tensor_dataset = tokenize_dataset(df_train.to_list(), tokenizer, **tokenizer_kwargs)
eval_tensor_dataset = tokenize_dataset(df_eval.to_list(), tokenizer, **tokenizer_kwargs)

datasetTrain = ds.from_dict(train_tensor_dataset)
datasetEval = ds.from_dict(eval_tensor_dataset)

datasetTrain.set_format(type='torch', columns=['input_ids', 'attention_mask'])
datasetEval.set_format(type='torch', columns=['input_ids', 'attention_mask'])

generator = torch.Generator()
train_loader = DataLoader(datasetTrain, batch_size=4, shuffle=True, generator=generator, collate_fn=data_collator)
eval_loader = DataLoader(datasetEval, batch_size=4, collate_fn=data_collator)

In [None]:
model = RobertaForMaskedLM.from_pretrained('microsoft/codebert-base-mlm')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = Adam(model.parameters(), lr=1e-5)

# Training loop
for epoch in range(2):
    model.train()
    for batch in train_loader:
        optimizer.zero_grad()
        inputs = {key: val.to(device) for key, val in batch.items() if key in tokenizer.model_input_names}
        inputs['labels'] = batch['labels'].to(device)
        outputs = model(**inputs)
        loss = outputs.loss
        print(loss)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch + 1} completed")


In [None]:
# Evaluation loop
'''
model.eval()
with torch.no_grad():
    for batch in eval_loader:
        input = {key: val.to(device) for key, val in batch.items()}
        outputs = model(**input)
        print(outputs.loss)'''

In [None]:
# save the model
'''
directory = 'path_to_the_model_directory'

model.save_pretrained(directory)
tokenizer.save_pretrained(directory)'''

In [None]:
import pandas as pd

# Assuming df_train is a Series and tokenizer is already defined
def split_text_into_chunks(text, max_length=500):
    # Tokenize the full text first (without truncating)
    tokens = tokenizer.encode(text, add_special_tokens=True)

    # Split into chunks of max_length
    chunks = [tokens[i:i + max_length] for i in range(0, len(tokens), max_length)]
    return chunks

def get_token_length(text):
    # Tokenize the text and return the number of tokens
    return len(tokenizer.encode(text, add_special_tokens=True))

# Create an empty list to collect new snippets
new_snippets = []

# Iterate over the Series (df_train)
for text in df_train:
    # Check the token length
    token_length = get_token_length(text)

    if token_length > 500:
        # Split the text into chunks if it's too long
        chunks = split_text_into_chunks(text)

        # Add each chunk as a new entry in the new_snippets list
        for chunk in chunks:
            # Decode the chunk back into text
            chunk_text = tokenizer.decode(chunk, skip_special_tokens=True)
            new_snippets.append(chunk_text)
    else:
        # If the text is small enough, keep the original snippet
        new_snippets.append(text)

# Create a new Series with the updated snippets
df_train_updated = pd.Series(new_snippets)

# Show the updated Series
print(df_train_updated)

In [None]:
from tabulate import tabulate
def print_result(outputs):
    table_data = []
    for output in outputs:
        token_str = f'"{output["token_str"]}"'  # Preserve leading spaces by wrapping in quotes
        table_data.append([output['sequence'], token_str, output['score']])

    print("The suggested code completions are:")
    print(tabulate(table_data, headers=["Completion", "Predicted token", "Score"], tablefmt="grid", colalign=("left", "left", "center")) )

In [None]:
from transformers import pipeline
def code_completion(code_example, iterations: int = 1):
    '''
    - code_example: snipped of code that need to be code-completed. No token <mask> needed.
    - iterations: number of subsequent code completions to be generated.
                  Each sequence generated after the first one will be based only on the previous sequence with the highest score.
    '''
    code_example = code_example + "<mask>"
    fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)
    current_example = code_example  # Start with the initial code

    for _ in range(iterations):
        outputs = fill_mask(current_example)

        # Take the first prediction and append <mask> to continue completion
        best_prediction = outputs[0]["sequence"]
        current_example = best_prediction + " <mask>"
        print_result(outputs)
    return outputs

In [None]:
result = code_completion("def is_zero(x): return x==")

In [None]:
result = code_completion("def add(a, b): return a", 2)

In [None]:
result = code_completion("for element ", 2)