In [1]:
import ast
import os
import torch
import random
import pandas as pd

from datasets import Dataset, load_dataset
from transformers import AutoTokenizer, SummarizationPipeline, AutoModelForCausalLM, AutoModelForSeq2SeqLM, BitsAndBytesConfig

In [2]:
def read_files_to_dict(directory):
    data_dict = {}
    
    # Walk through directory and subdirectories
    for root, dirs, files in os.walk(directory):
        for filename in files:
            # Check for CSV files
            if filename.lower().endswith('.csv'):
                file_path = os.path.join(root, filename)
                
                try:
                    # Read the file into a DataFrame
                    df = pd.read_csv(file_path, sep=',', quotechar="'", low_memory=False)
                    # Store DataFrame in the dictionary with the relative file path as the key
                    relative_path = os.path.relpath(file_path, directory)
                    data_dict[relative_path] = df
                except Exception as e:
                    print(f"Error reading {file_path}: {e}")
    
    return data_dict

# Define the directory containing the CSV files
directory_path = "F:/Projects/TestMap/TestMap/Output/"

# Read all files and store in dictionary
data_dict = read_files_to_dict(directory_path)

In [3]:
def combine_csvs(data_dict, keyword):
    # Filter DataFrames based on the keyword in the key
    filtered_dfs = [df for key, df in data_dict.items() if keyword in key]
    
    if filtered_dfs:
        # Concatenate all filtered DataFrames
        combined_df = pd.concat(filtered_dfs, ignore_index=True)
        # Write combined DataFrame to a CSV file
        return combined_df
    else:
        print(f"No files found with keyword '{keyword}'")
        
# Combine all CSVs with 'test_method' in the key and write to a single CSV
test_methods_df = combine_csvs(data_dict, 'test_methods')

# Combine all CSVs with 'test_class' in the key and write to a single CSV
test_classes_df = combine_csvs(data_dict, 'test_classes')

## Initial Formatting

### Test Method Formatting

The CSV format from CSharp needs to formatted for Python.

Test Method records are different than the Test Class records.

They need to formatted differently.

#### Converting Fields to List of Strings

Our list of fields from the testing class is separated with `<<SEP>>`.

In [4]:
def convert_class_fields_to_list(s):
    try:
        if isinstance(s, str):
            s = s.replace("<<NEWLINE>>", "\n")
            s = s.replace("<<SINGLE-QUOTE>>", "\'")
            temp_list = []
            substrs = s.split("<<SEP>>")
            
            for substr in substrs:
                temp_list.append(substr)
                
            return temp_list
        else:
            return []
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list {e}")
        return []

# Apply the function to the column
test_methods_df['ClassFields'] = test_methods_df['ClassFields'].apply(convert_class_fields_to_list)

#### Converting Using Statements to List of Strings

In [5]:
def convert_usings_to_list(s):
    try:
        if isinstance(s, str):
            s = s.replace("<<NEWLINE>>", "\n")
            s = s.replace("<<SINGLE-QUOTE>>", "\'")
            temp_list = []
            substrs = s.split("<<SEP>>")
            
            for substr in substrs:
                temp_list.append(substr)
                
            return temp_list
        else:
            return []
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list {e}")
        return []

# Apply the function to the column
test_methods_df['UsingStatements'] = test_methods_df['UsingStatements'].apply(convert_class_fields_to_list)

#### Converting Method Invocations to a List of Tuples

When creating the CSV from the original program, we had a list of tuples in CSharp.

CSharp doesn't print lists to strings like Python would. 

So we had to add keywords and special formatting so we
could convert to a list that Python would understand.

In [6]:
# Function to add brackets and convert to list of tuples
def convert_method_invocations(s):
    try:
        s = s.replace("<<NEWLINE>>", "\n")
        s = s.replace("<<SINGLE-QUOTE>>", "\'")
        temp_list = []
        substrs = s.split("<<SEP>>")
        
        for substr in substrs:
            substr = substr.lstrip("(").rstrip(")")
            sub = substr.split('<<TUPLE>>')
            if sub[-1] != ' ':
                tup = (sub[0].rstrip(', '), sub[-1])
                temp_list.append(tup)
        return temp_list
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list of tuples: {e}")
        return []

# Apply the function to the column
test_methods_df['MethodInvocations'] = test_methods_df['MethodInvocations'].apply(convert_method_invocations)

#### Formatting Test Method

In [7]:
# Function to add brackets and convert to list of tuples
def format_test_method(s):
    try:
        str = s.replace("<<NEWLINE>>", "\n")
        str = str.replace("<<SINGLE-QUOTE>>", "\'")
        
        return str
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list of tuples: {e}")
        return ""

# Apply the function to the column
test_methods_df['MethodBody'] = test_methods_df['MethodBody'].apply(convert_method_invocations)

### Test Class Formatting

#### Converting Fields to List of Strings

Our list of fields from the testing class is separated with `<<SEP>>`.

In [8]:
def convert_class_fields_to_list(s):
    try:
        if isinstance(s, str):
            s = s.replace("<<NEWLINE>>", "\n")
            s = s.replace("<<SINGLE-QUOTE>>", "\'")
            temp_list = []
            substrs = s.split("<<SEP>>")
            
            for substr in substrs:
                temp_list.append(substr)
                
            return temp_list
        else:
            return []
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list {e}")
        return []

# Apply the function to the column
test_classes_df['ClassFields'] = test_classes_df['ClassFields'].apply(convert_class_fields_to_list)

#### Converting Using Statements to List of Strings

In [9]:
def convert_usings_to_list(s):
    try:
        if isinstance(s, str):
            s = s.replace("<<NEWLINE>>", "\n")
            s = s.replace("<<SINGLE-QUOTE>>", "\'")
            temp_list = []
            substrs = s.split("<<SEP>>")
            
            for substr in substrs:
                temp_list.append(substr)
                
            return temp_list
        else:
            return []
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list {e}")
        return []

# Apply the function to the column
test_classes_df['UsingStatements'] = test_classes_df['UsingStatements'].apply(convert_class_fields_to_list)

#### Format Code

In [13]:
# Function to add brackets and convert to list of tuples
def format_code(s):
    try:
        st = s.replace("<<NEWLINE>>", "\n")
        st = st.replace("<<SINGLE-QUOTE>>", "\'")

        return st
        
    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list of tuples: {e}")
        return ""

# Apply the function to the column
test_classes_df['ClassBody'] = test_classes_df['ClassBody'].apply(format_code)
test_classes_df['SourceBody'] = test_classes_df['SourceBody'].astype(str).apply(format_code)

### Remove Any Empties

In [14]:
test_methods_df_filtered = test_methods_df[test_methods_df['MethodInvocations'].apply(lambda x: len(x) > 0)]
test_classes_df_filtered = test_classes_df[test_classes_df['SourceBody'].apply(lambda x: len(x) > 0)]

In [15]:
# drop duplicates
df_methods_dropped = test_methods_df_filtered.drop_duplicates(subset=['MethodBody'])
df_classes_dropped = test_classes_df_filtered.drop_duplicates(subset=['ClassBody'])

In [16]:
# Find duplicates based on specific columns
test_methods_dup = df_methods_dropped.duplicated(subset=['MethodBody'])
test_classes_dup = df_classes_dropped.duplicated(subset=['ClassBody'])

# Count the number of duplicate rows based on specified columns
num_method_duplicates_based_on_columns = test_methods_dup.sum()
num_class_duplicates_based_on_columns = test_classes_dup.sum()


print(f"Number of duplicate rows based on specific columns: {num_method_duplicates_based_on_columns}")
print(f"Number of duplicate rows based on specific columns: {num_class_duplicates_based_on_columns}")

Number of duplicate rows based on specific columns: 0
Number of duplicate rows based on specific columns: 0


In [17]:

df_methods_dropped.to_csv(os.path.join("data", "test_methods_full.csv"), index=False)
df_classes_dropped.to_csv(os.path.join("data", "test_classes_full.csv"), index=False)

In [117]:
# Function to add brackets and convert to list of tuples
def format_code(s):
    try:
        str = "// Hello this is a test"

        return str

    except (ValueError, SyntaxError) as e:
        print(f"Error converting string to list of tuples: {e}")
        return ""


# Apply the function to the column
df_methods_dropped['MethodBody'] = df_methods_dropped['MethodBody'].apply(format_code)
df_classes_dropped['ClassBody'] = df_classes_dropped['ClassBody'].apply(format_code)

df_methods_dropped.to_csv(os.path.join("data", "valid_test_methods_initial.csv"), index=False)
df_classes_dropped.to_csv(os.path.join("data", "valid_test_classes_initial.csv"), index=False)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_methods_dropped['MethodBody'] = df_methods_dropped['MethodBody'].apply(format_code)


In [2]:
df_test_method_full = pd.read_csv(os.path.join("data", "test_methods_full.csv"))
df_test_classes_full = pd.read_csv(os.path.join("data", "test_classes_full.csv"))

## Normal Finetune

Does the formatting for the typical instruction based finetuning.


In [4]:
def formatted_prompt(question, answer)-> str:
    return f"<|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant:\n{answer}<|im_end|>"

### Method Level

In [29]:
## Prompts
method_prompts = [
    "Generate a test method for this source code.",
    "Create a test method to cover this source code.",
    "Generate a test method to test the functionality of this code.",
    "Generate a test method based on the following source code.",
    "Write a test method to validate the behavior of this code.",
]

In [50]:
def create_method_question(row):
    method_invocations = ast.literal_eval(row['MethodInvocations'])
    # Joining method and description, discarding if description is missing
    source_code = '\n'.join(
        f"Method: {method}\nDefinition: {description}"
        for method, description in method_invocations
        if description  # This checks if description is not None and not an empty string
    )
    
    # Check if source_code is empty or consists of blank lines
    if not source_code.strip():  # strip() removes whitespace
        return None
    
    class_fields = '\n'.join(ast.literal_eval(row['ClassFields']))
    using_statements = '\n'.join(ast.literal_eval(row['UsingStatements']))
    question = f"""
{random.choice(method_prompts)}
Here is some contextual information:
Source Code and definitions (if any):
{source_code}

Test Namespace: {row['Namespace']}

Test Class Declaration: {row['ClassDeclaration']}

Test Class Fields: 
{class_fields}

Test Using Statements: 
{using_statements}

Test Framework: {row['TestFramework']}

Language Framework: {row['LanguageFramework']}

Please delimit the code with ```
    """
    return question

In [51]:
def create_method_answer(row):
    test_code = ast.literal_eval(row['MethodBody'])
    answer = f"""```
    {test_code[0][0]}
    ```
    """
    return answer

In [56]:
method_instructions = []
method_questions = []
method_answers = []
method_repo = []

for index, row in df_test_method_full.iterrows():
    question = create_method_question(row)
    answer = create_method_answer(row)
    if question:
        instruction = formatted_prompt(question, answer)
        method_questions.append(question)
        method_answers.append(answer)
        method_instructions.append(instruction)
        method_repo.append(row['Repo'])

formatted_method_df = pd.DataFrame({
    'Instructions': method_instructions,
    'Prompt': method_questions,
    'Response': method_answers,
    'Repo': method_repo
})

In [71]:
# Get unique values from 'column1'
unique_values = formatted_method_df['Repo'].unique()

# Convert to a list if needed
unique_values_list = unique_values.tolist()

# Calculate the number of unique values
num_unique = len(unique_values)

# Calculate 5% of the number of unique values
subset_size = max(1, int(num_unique * 0.20))  # Ensure at least one item

# Select a random subset of unique values
random_subset = random.sample(list(unique_values), subset_size)

In [72]:
# Create two DataFrames
formatted_method_df_valid = formatted_method_df[formatted_method_df['Repo'].isin(random_subset)]  # DataFrame with selected subset
formatted_method_df_train = formatted_method_df[~formatted_method_df['Repo'].isin(random_subset)]  

In [76]:
formatted_method_train_dataset = Dataset.from_pandas(formatted_method_df_train)
formatted_method_valid_dataset = Dataset.from_pandas(formatted_method_df_valid)
original_method_dataset = Dataset.from_pandas(df_test_method_full)

In [77]:
# Upload to huggingface
# upload originals
test_method_original_name = "Distaste1194/csharp_test_methods_original"
original_method_dataset.push_to_hub(test_method_original_name, private=True)

# upload formatted
test_method_train = "Distaste1194/csharp_test_methods_formatted_training"
formatted_method_train_dataset.push_to_hub(test_method_train, private=True)

# validation_set
test_method_valid = "Distaste1194/csharp_test_methods_formatted_validation"
formatted_method_valid_dataset.push_to_hub(test_method_valid, private=True)

Uploading the dataset shards:   0%|          | 0/6 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/108 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/108 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/108 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/108 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/108 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/108 [00:00<?, ?ba/s]

Uploading the dataset shards:   0%|          | 0/3 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/109 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/109 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/109 [00:00<?, ?ba/s]

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/65 [00:00<?, ?ba/s]

CommitInfo(commit_url='https://huggingface.co/datasets/Distaste1194/csharp_test_methods_formatted_validation/commit/2fbf4fd3f3041bb7ef12061abfbf1770d69f5839', commit_message='Upload dataset', commit_description='', oid='2fbf4fd3f3041bb7ef12061abfbf1770d69f5839', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/Distaste1194/csharp_test_methods_formatted_validation', endpoint='https://huggingface.co', repo_type='dataset', repo_id='Distaste1194/csharp_test_methods_formatted_validation'), pr_revision=None, pr_num=None)

### Class Level

In [87]:
## Prompts
class_prompts = [
    "Generate a test class for this source code.",
    "Create a test class to cover this source code.",
    "Generate a test class to test the functionality of this code.",
    "Generate a test class based on the following source code.",
    "Write a test class to validate the behavior of this code.",
]

In [86]:
def create_class_question(row):
    source_code = row['SourceBody']
    
    class_fields = '\n'.join(ast.literal_eval(row['ClassFields']))
    using_statements = '\n'.join(ast.literal_eval(row['UsingStatements']))
    question = f"""
{random.choice(class_prompts)}
Here is some contextual information:
Source Code Class:
{source_code}

Test Namespace: {row['Namespace']}

Test Class Declaration: {row['ClassDeclaration']}

Test Class Fields: 
{class_fields}

Test Using Statements: 
{using_statements}

Test Framework: {row['TestFramework']}

Language Framework: {row['LanguageFramework']}

Please delimit the code with ```
    """
    return question

In [88]:
def create_class_answer(row):
    test_code = row['ClassBody']
    answer = f"""```
    {test_code}
    ```
    """
    return answer

In [89]:
class_instructions = []
class_repo = []
for index, row in df_test_classes_full.iterrows():
    question = create_class_question(row)
    answer = create_class_answer(row)
    if question:
        instruction = formatted_prompt(question, answer)
        class_instructions.append(instruction)
        class_repo.append(row['Repo'])

formatted_class_df = pd.DataFrame({
    'Instructions': class_instructions,
    'Repo': class_repo
})

In [90]:
# Get unique values from 'column1'
unique_values = formatted_class_df['Repo'].unique()

# Convert to a list if needed
unique_values_list = unique_values.tolist()

# Calculate the number of unique values
num_unique = len(unique_values)

# Calculate 5% of the number of unique values
subset_size = max(1, int(num_unique * 0.20))  # Ensure at least one item

# Select a random subset of unique values
random_subset = random.sample(list(unique_values), subset_size)

In [91]:
# Create two DataFrames
formatted_class_df_valid = formatted_class_df[formatted_class_df['Repo'].isin(random_subset)]  # DataFrame with selected subset
formatted_class_df_train = formatted_class_df[~formatted_class_df['Repo'].isin(random_subset)]  

In [92]:
formatted_class_train_dataset = Dataset.from_pandas(formatted_class_df_train)
formatted_class_valid_dataset = Dataset.from_pandas(formatted_class_df_valid)
original_class_dataset = Dataset.from_pandas(df_test_classes_full)

In [93]:
# Upload to huggingface
# upload originals
test_class_original_name = "Distaste1194/csharp_test_classes_original"
original_class_dataset.push_to_hub(test_class_original_name, private=True)

# upload formatted
test_class_train = "Distaste1194/csharp_test_classes_formatted_training"
formatted_class_train_dataset.push_to_hub(test_class_train, private=True)

# validation_set
test_class_valid = "Distaste1194/csharp_test_classes_formatted_validation"
formatted_class_valid_dataset.push_to_hub(test_class_valid, private=True)

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/15 [00:00<?, ?ba/s]

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/13 [00:00<?, ?ba/s]

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/3 [00:00<?, ?ba/s]

CommitInfo(commit_url='https://huggingface.co/datasets/Distaste1194/csharp_test_classes_formatted_validation/commit/618acbda044b6bf1619d210a739774417f8ad707', commit_message='Upload dataset', commit_description='', oid='618acbda044b6bf1619d210a739774417f8ad707', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/Distaste1194/csharp_test_classes_formatted_validation', endpoint='https://huggingface.co', repo_type='dataset', repo_id='Distaste1194/csharp_test_classes_formatted_validation'), pr_revision=None, pr_num=None)

## Retrieval Augmented Generation

### Method Level

In [120]:
## Prompts

### Class Level

In [121]:
## Prompts

## Chain-of-thoughts Finetune

### Method Level

In [122]:
## Prompts

### Class Level

In [123]:
## Prompts