In [3]:
from datasets import load_dataset


def load_big_clone_bench(split=None):
    bcb = load_dataset("code_x_glue_cc_clone_detection_big_clone_bench")
    if split == None:
        return bcb
    else:
        return bcb[split]


In [4]:
import difflib
import javalang.tokenizer as java_tokenizer
import pandas as pd
from javalang.tree import Literal, MemberReference, ConstructorDeclaration, IfStatement, WhileStatement, ForStatement
from javalang.parser import Parser as JavaParser
from tqdm import tqdm

PARSE_RESULTS = dict()


def line_diff(s1: str, s2: str, match_speed="real_quick"):
    differ = difflib.Differ()
    diff = differ.compare(s1.splitlines(True), s2.splitlines(True))
    result = {"unique_s1_lines": 0, "unique_s2_lines": 0, "shared_lines": 0}
    for line in diff:
        if line.startswith("- "):
            result["unique_s1_lines"] += 1
        elif line.startswith("+ "):
            result["unique_s2_lines"] += 1
        elif line.startswith("  "):
            result["shared_lines"] += 1
    seq_match = difflib.SequenceMatcher(None, s1, s2)
    if match_speed == "slow":
        result["similarity_ratio"] = seq_match.ratio()
    elif match_speed == "quick":
        result["similarity_ratio"] = seq_match.quick_ratio()
    elif match_speed == "real_quick":
        result["similarity_ratio"] = seq_match.real_quick_ratio()
    return result


def parse_java(java_string):
    parse_results = dict()
    tokens = java_tokenizer.tokenize(java_string)
    parser = JavaParser(tokens)
    ast = parser.parse_member_declaration()
    nodes = [node for _, node in ast]
    parse_results["num_literals"] = 0
    parse_results["num_if_statements"] = 0
    parse_results["num_while_statements"] = 0
    parse_results["num_for_statements"] = 0
    unique_identifiers = set()

    for node in nodes:
        if isinstance(node, Literal):
            parse_results["num_literals"] += 1
        elif isinstance(node, IfStatement):
            parse_results["num_if_statements"] += 1
        elif isinstance(node, WhileStatement):
            parse_results["num_while_statements"] += 1
        elif isinstance(node, ForStatement):
            parse_results["num_for_statements"] += 1
        elif isinstance(node, MemberReference):
            unique_identifiers.add(node.member)

    if isinstance(ast, ConstructorDeclaration):
        parse_results["is_constructor"] = "True"
    else:
        parse_results["is_constructor"] = "False"

    if "parameters" in ast.attrs:
        parse_results["num_parameters"] = len(ast.parameters)
    else:
        parse_results["num_parameters"] = 0

    if "return_type" in ast.attrs:
        parse_results["return_type"] = ast.return_type
    else:
        parse_results["return_type"] = None

    if "throws" in ast.attrs and ast.throws != None:
        parse_results["num_throws"] = len(ast.throws)
    else:
        parse_results["num_throws"] = 0

    parse_results["num_identifiers"] = len(unique_identifiers)

    return parse_results


def get_parse_results(example, func_num):
    if example[f"id{func_num}"] not in PARSE_RESULTS:
        PARSE_RESULTS[example[f"id{func_num}"]] = parse_java(
            example[f"func{func_num}"])
    return PARSE_RESULTS[example[f"id{func_num}"]]


def combine_parse_results(s1_parse_results, s2_parse_results):
    shared_results = dict()
    for key in s1_parse_results:
        if isinstance(s1_parse_results[key], int):
            shared_results[f"{key}_s1"] = s1_parse_results[key]
            shared_results[f"{key}_s2"] = s2_parse_results[key]
        elif s1_parse_results[key] == s2_parse_results[key]:
            shared_results[f"same_{key}"] = 1
        else:
            shared_results[f"same_{key}"] = 0
    return shared_results


def merge_dicts(*dict_args):
    result = dict()
    for dictionary in dict_args:
        result.update(dictionary)
    return result


def big_clone_bench_preprocess(bcb, csv_filename=None):
    '''
    Generates DataFrame of features from an input in the format of the BigCloneBench dataset https://huggingface.co/datasets/code_x_glue_cc_clone_detection_big_clone_bench
    '''
    df = pd.DataFrame()
    print(f"Preprocessing {len(bcb)} examples...")
    for i in tqdm(range(len(bcb))):
        example = bcb[i]
        example_dict = line_diff(example["func1"], example["func2"])
        func1_parse_results = get_parse_results(example, 1)
        func2_parse_results = get_parse_results(example, 2)
        shared_results = combine_parse_results(
            func1_parse_results, func2_parse_results)
        example_dict = merge_dicts(example_dict, shared_results)
        example_dict["target"] = int(example["label"])
        df = pd.concat([df, pd.DataFrame([example_dict])], ignore_index=True)
    if csv_filename != None:
        df.to_csv(csv_filename)
    return df


In [5]:
from data_loader import load_big_clone_bench
from data_preprocess import big_clone_bench_preprocess

import click
from sklearn.metrics import accuracy_score
import pandas as pd
from joblib import load
import matplotlib.pyplot as plt


def load_model(classifier):
    if classifier == 'svm':
        # Get the SVM model
        trained_model = load('svm.pkl')
        pass
    elif classifier == 'random_forest':
        # get the Random Forest model
        trained_model = load('random_forest.joblib')
        pass
    return trained_model


def obtain_features(data_df):
    features = data_df.drop("target", axis=1)

    # TODO: Remove this when models are trained on new features
    if (len(features.columns) > 4):
        features = features[['unique_s1_lines',
                             'unique_s2_lines', 'shared_lines', 'similarity_ratio']]

    return features


def evaluate_clone_detection(classifier, val_output, test_output):
    """
    evaluate the clone detection model on the given validation, and test datasets.
    """
    # Load validation datasets
    val_bcb = load_big_clone_bench("validation")
    val_df = big_clone_bench_preprocess(val_bcb)

    # load test dataset
    test_bcb = load_big_clone_bench("test")
    test_df = big_clone_bench_preprocess(test_bcb)

    # load the trained model
    trained_model = load_model(classifier)

    # Evaluate the model on the validation set
    val_predictions = trained_model.predict(obtain_features(val_df))
    # Save val predictions to a CSV file
    pd.DataFrame({"validation_prediction": val_predictions}
                 ).to_csv(val_output, index=False)

    # get the validation accuracy
    val_accuracy = accuracy_score(val_df["target"], val_predictions)

    # Evaluate the model on the test set
    test_predictions = trained_model.predict(obtain_features(test_df))
    # Save test predictions to a CSV file
    pd.DataFrame({"test_prediction": val_predictions}
                 ).to_csv(test_output, index=False)

    # get the test accuracy
    test_accuracy = accuracy_score(test_df["target"], test_predictions)

    # Print results
    click.echo(f'Validation accuracy: {val_accuracy}')
    click.echo(f'Test accuracy: {test_accuracy}')


def predict_custom_data(custom_data, classifier, prediction_output):
    bcb_format_data = pd.read_csv(
        custom_data, names=["id", "id1", "id2", "func1", "func2", "label"])
    data_dicts = bcb_format_data.to_dict('records')
    data_df = big_clone_bench_preprocess(data_dicts)

    trained_model = load_model(classifier)

    predictions = trained_model.predict(obtain_features(data_df))
    pd.DataFrame({"prediction": predictions}).to_csv(
        prediction_output, index=False)
    click.echo(f'Predictions saved to {prediction_output}')


@click.command()
@click.option('--custom_data', type=click.Path(), default="", help="csv file containing snippets in the same schema as https://huggingface.co/datasets/code_x_glue_cc_clone_detection_big_clone_bench")
@click.option('--prediction_output', type=click.Path(), default="prediction.csv", help="Path to save custom dataset prediction")
@click.option('--classifier', type=click.Choice(['svm', 'random_forest']), default='random_forest', help='Set the trained model')
@click.option('--val_output', type=click.Path(), default='val_prediction.csv', help="Path to save the val prediction")
@click.option('--test_output', type=click.Path(), default='test_prediction.csv', help="Path to save the test prediction")
def main(custom_data, prediction_output, classifier, val_output, test_output):
    if custom_data == "":
        evaluate_clone_detection(classifier, val_output, test_output)
    else:
        predict_custom_data(custom_data, classifier, prediction_output)


if __name__ == '__main__':
    main()


Usage: ipykernel_launcher.py [OPTIONS]
Try 'ipykernel_launcher.py --help' for help.

Error: No such option: --ip Did you mean --help?


AssertionError: 