In [1]:
pip install -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split


# Function to read and vectorize the log file
def vectorize_log_file(file_path, vectorizer=None, fit=True):
    all_sentence = []
    with open(file_path, 'r') as file:
        log_content = file.read().strip().split('\n')
        all_sentence.append(extract_all_word_from_log(log_content))
    if not log_content:
        raise ValueError(f"The log file {file_path} is empty or contains only whitespace.")
    if fit:
        log_vector = vectorizer.fit_transform(all_sentence)
        
    else:
        log_vector = vectorizer.transform(all_sentence)
    return log_vector.toarray()[0]

def extract_all_word_from_log(log_context):
    words = ""
    for line in log_context:
        words += line.split('|')[-1].strip()

    return words

# Function to parse execution trace
def parse_execution_trace(file_path):
    return pd.read_csv(file_path)

def path_to_method_and_line_no(path):
    line_number = path.split('@')[1]
    method_name = '.'.join(path.split('.')[:-1])
    return method_name, line_number

# Function to vectorize execution trace for each method
def vectorize_execution_trace(trace_df, unique_methods):
    execution_vectors = {}
    # new one
    # total_lines_of_code = {
    # 'src/test/java/functions/CalculatorTest.java': 290,
    # 'src/main/java/functions/Calculator.java': 168,
    # 'src/main/java/functions/distributions/Calculator2.java': 27,
    # 'src/main/java/functions/distributions/Calculator3.java': 24,
    # }
    # old one (before remove logging)
    total_lines_of_code = {
    'src/test/java/functions/CalculatorTest.java': 338,
    'src/main/java/functions/Calculator.java': 192,
    'src/main/java/functions/distributions/Calculator2.java': 27,
    'src/main/java/functions/distributions/Calculator3.java': 25,
    }

    for method in unique_methods:
        coverage = {path: [0] * lines for path, lines in total_lines_of_code.items()}
        method_trace = trace_df[trace_df['signature'] == method]

        method = method.replace('src/test/java/functions/CalculatorTest.java;CalculatorTest.', '')
        method = method.replace('#', '')
        for _, row in trace_df.iterrows():
            if str(row['line_no']) in map(str, method_trace['line_no'].values):
                coverage[row['signature'].split(';')[0]][row['line_no']-1] = 1
        execution_vectors[method] = [x for y in coverage.values() for x in y]
    return execution_vectors

In [3]:
trace_file = 'trace_data.csv'
log_directory = 'data/original/log'

trace_df = parse_execution_trace(trace_file)
trace_df['line_no'] = trace_df['path'].apply(lambda x: int(x.split('@')[1]))
unique_methods = trace_df['signature'].unique()

execution_vectors = vectorize_execution_trace(trace_df, unique_methods)

log_vectors = []
execution_vectors_for_model = []
method_names = [] 

vectorizer = CountVectorizer()

for method in unique_methods:
    method = method.replace('src/test/java/functions/CalculatorTest.java;CalculatorTest.', '')
    method = method.replace('#', '')
    log_file_path = os.path.join(log_directory, f"log_{method}.log")
    log_file_path = log_file_path.replace('"', '')

    if os.path.exists(log_file_path):
        try:
            log_vector = vectorize_log_file(log_file_path, vectorizer, fit=True)
            log_vectors.append(log_vector)
            execution_vectors_for_model.append(execution_vectors[method])
            method_names.append(method)
        except ValueError as e:
            print(e)
    else:
        print(f"Log file for method {method} not found.")

if not log_vectors or not execution_vectors_for_model:
    raise ValueError("Log vectors or execution vectors for model are empty. Check your data.")

max_log_length = max(len(v) for v in log_vectors)
max_exec_length = max(len(v) for v in execution_vectors_for_model)

padded_log_vectors = np.array([np.pad(v, (0, max_log_length - len(v)), 'constant') for v in log_vectors])
padded_execution_vectors = np.array([np.pad(v, (0, max_exec_length - len(v)), 'constant') for v in execution_vectors_for_model])

X = np.array(padded_log_vectors)
y = np.array(padded_execution_vectors)

y = y.astype(int)

# Split the data into training (90%) and testing (10%) sets
X_train, X_test, y_train, y_test, method_train, method_test = train_test_split(X, y, method_names, test_size=0.1, random_state=42)

# Print shapes for debugging
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

# Train the Random Forest model
model = RandomForestClassifier()
model.fit(X_train, y_train)

X_train shape: (33, 12)
y_train shape: (33, 509)
X_test shape: (4, 12)
y_test shape: (4, 509)


In [8]:

y_pred = model.predict(X_test)

import numpy as np
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)


    # new one
# total_lines_of_code = {
#     'src/test/java/functions/CalculatorTest.java': 290,
#     'src/main/java/functions/Calculator.java': 168,
#     'src/main/java/functions/distributions/Calculator2.java': 27,
#     'src/main/java/functions/distributions/Calculator3.java': 24,
#     }
    # old one (before remove logging)
total_lines_of_code = {
    'src/test/java/functions/CalculatorTest.java': 338,
    'src/main/java/functions/Calculator.java': 192,
    'src/main/java/functions/distributions/Calculator2.java': 27,
    'src/main/java/functions/distributions/Calculator3.java': 25,
    }
def extract_result(coverage_vector_result):
    result = {}
    for path, lines in total_lines_of_code.items():
        result[path] = extract_line_from_result(coverage_vector_result[:lines])
        coverage_vector_result = coverage_vector_result[lines:]
    return result

def extract_line_from_result(result):
    lines = []
    for index, value in enumerate(result):
        if value == 1:
            lines.append(index)
    return lines

def align_lists(a, b):
    # Initialize indexes
    i, j = 0, 0

    # New lists with inserted 0s
    new_a, new_b = [], []

    while i < len(a) and j < len(b):
        if a[i] == b[j]:
            new_a.append(a[i])
            new_b.append(b[j])
            i += 1
            j += 1
        elif a[i] < b[j]:
            new_a.append(a[i])
            new_b.append(0)
            i += 1
        else:
            new_a.append(0)
            new_b.append(b[j])
            j += 1

    # Append remaining elements
    while i < len(a):
        new_a.append(a[i])
        new_b.append(0)
        i += 1

    while j < len(b):
        new_a.append(0)
        new_b.append(b[j])
        j += 1

    return new_a, new_b

def evaluate_predictions(method_test, y_test, y_pred):
    prediction_percentage = []
    preision_scores = []
    recall_scores = []
    f1_scores = []

    for i in range(len(y_test)):
        actual_coverage = extract_result(y_test[i])
        predicted_coverage = extract_result(y_pred[i])
        print(f"Method: {method_test[i]}")
        for path in total_lines_of_code.keys():
            actual = actual_coverage[path]
            predicted = predicted_coverage[path]
            actual, predicted = align_lists(actual, predicted)
            print(f"  {path}")
            print(f"    Actual: {actual}")
            print(f"    Predicted: {predicted}")
            print(f'    Accuracy: {accuracy_score(actual, predicted):.2%}')

        # print(f'  Accuracy: {correct/len(y_test[i]):.2%}')
        conf_matrix = confusion_matrix(y_test[i], y_pred[i])
        print("Confusion Metric:")
        print(conf_matrix)
        print()
        acc = accuracy_score(*align_lists(extract_line_from_result(y_test[i]), extract_line_from_result(y_pred[i])))
        prec = precision_score(y_test[i], y_pred[i])
        recall = recall_score(y_test[i], y_pred[i])
        f1 = f1_score(y_test[i], y_pred[i])
        print(f'  Accuracy: {acc:.2%}')
        print(f'  Precision: {prec:.2%}')
        print(f'  Recall: {recall:.2%}')
        print(f'  F1 Score: {f1:.2%}')

        prediction_percentage.append(acc)
        preision_scores.append(prec)
        recall_scores.append(recall)
        f1_scores.append(f1)

    print("Overall Accuracy: ", sum(prediction_percentage)/len(prediction_percentage))
    print("Overall Precision: ", sum(preision_scores)/len(preision_scores))
    print("Overall Recall: ", sum(recall_scores)/len(recall_scores))
    print("Overall F1 Score: ", sum(f1_scores)/len(f1_scores))


# Evaluate the predictions
evaluate_predictions(method_test, y_test, y_pred)

Method: testIsPrime_N003
  src/test/java/functions/CalculatorTest.java
    Actual: [10, 15, 17, 18, 19, 20, 0, 0, 0, 0, 0, 130, 134, 139, 140]
    Predicted: [10, 15, 17, 18, 19, 20, 101, 105, 106, 109, 110, 0, 0, 0, 0]
    Accuracy: 40.00%
  src/main/java/functions/Calculator.java
    Actual: []
    Predicted: []
    Accuracy: nan%
  src/main/java/functions/distributions/Calculator2.java
    Actual: []
    Predicted: []
    Accuracy: nan%
  src/main/java/functions/distributions/Calculator3.java
    Actual: []
    Predicted: []
    Accuracy: nan%
Confusion Metric:
[[494   5]
 [  4   6]]

  Accuracy: 40.00%
  Precision: 54.55%
  Recall: 60.00%
  F1 Score: 57.14%
Method: testFibonacci_N003
  src/test/java/functions/CalculatorTest.java
    Actual: [10, 15, 17, 18, 19, 20, 114, 118, 119, 124, 125, 126]
    Predicted: [10, 15, 17, 18, 19, 20, 0, 0, 0, 0, 0, 0]
    Accuracy: 50.00%
  src/main/java/functions/Calculator.java
    Actual: []
    Predicted: []
    Accuracy: nan%
  src/main/java/f

  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
