In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.model_selection import RepeatedStratifiedKFold, train_test_split, GridSearchCV, cross_val_score, cross_validate
from imblearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from time import localtime, strftime
import time
import warnings
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

# Importing the dataset
dataset_csv = pd.read_csv('dataset_before_preprocessing.csv', sep=',')

# Selecting independent variables
features = ['code_churn_avg','contributors_count','hunks_count',
            'issue_tracker_issues','cbo','wmc','dit','lcom','max_nested_blocks','total_refactorings',
            'duplicated_lines_cpd_density','comment_lines_cloc_density']

# Project names
# projects = ['arduino','arthas','azkaban','cayenne','deltaspike',
#             'exoplayer','fop','gson','javacv','jclouds','joda-time',
#             'libgdx','maven','mina','nacos','opennlp','openrefine',
#             'pdfbox','redisson','RxJava','testng','vassonic','wss4j',
#             'xxl-job','zaproxy']

projects = ['arduino','arthas','azkaban','cayenne','deltaspike',
            'exoplayer','fop','jclouds','joda-time',
            'libgdx','maven','mina','nacos','opennlp','openrefine',
            'pdfbox','redisson','RxJava','testng','wss4j',
            'xxl-job','zaproxy']
 
# projects = ['arduino']

In [None]:
# create f2 scorer
f2_scorer = metrics.make_scorer(metrics.fbeta_score, beta=2)

# create class_inspection scorer
def my_class_inspection(Y_test, y_pred):
    cm = metrics.confusion_matrix(Y_test, y_pred)
    TN = cm[0][0]
    FN = cm[1][0]
    TP = cm[1][1]
    FP = cm[0][1]
    class_inspection = (TP + FP)/(TN + FN + TP + FP)
    return class_inspection
class_inspection_scorer = metrics.make_scorer(my_class_inspection)

# create class_inspection_reduction scorer
def my_class_inspection_reduction(Y_test, y_pred):
    cm = metrics.confusion_matrix(Y_test, y_pred)
    TN = cm[0][0]
    FN = cm[1][0]
    TP = cm[1][1]
    FP = cm[0][1]
    class_inspection = (TP + FP)/(TN + FN + TP + FP)
    recall = metrics.recall_score(Y_test, y_pred, average='binary')
    class_inspection_reduction = (recall - class_inspection)/recall
    return class_inspection_reduction
class_inspection_reduction_scorer = metrics.make_scorer(my_class_inspection_reduction)

# initialize results dataframe and dict
results_df = pd.DataFrame()
results = dict()

# Get a list of models to evaluate
def get_models(X, Y):
    models = dict()
    
    # Logistic Regression
    model = LogisticRegression(class_weight='balanced', random_state=0)
    parameters = [{'solver': ['lbfgs'],
                   'penalty': ['l2', 'none'],
                   'C': [0.01, 0.1, 1, 10, 100, 1000]},
                  {'solver': ['newton-cg'],
                   'penalty': ['l2', 'none'],
                   'C': [0.01, 0.1, 1, 10, 100, 1000]},
                  {'solver': ['liblinear'],
                   'penalty': ['l1', 'l2'],
                   'C': [0.01, 0.1, 1, 10, 100, 1000]}]
    models['lr'] = {'ini_model': model, 'parameters': parameters}
    # SVM
    model = SVC(class_weight='balanced', max_iter=1000, random_state=0)
    parameters = {'kernel': ['linear', 'rbf'],
                  'C': [0.01, 0.1, 1, 10, 100, 1000]}
    models['svm'] = {'ini_model': model, 'parameters': parameters}
    # Random Forest
    model = RandomForestClassifier(class_weight='balanced', random_state=0)
    parameters = {'criterion': ['gini', 'entropy'],
                  'max_depth': [1, 2, 4, None],
                  'min_samples_leaf': [1, 2, 4],
                  'min_samples_split': [2, 5, 10],
                  'n_estimators': [100, 200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}
    models['rf'] = {'ini_model': model, 'parameters': parameters}
    # XGBoost
    weight_ratio = float(len(Y[Y == 0]))/float(len(Y[Y == 1]))
    model = XGBClassifier(scale_pos_weight=weight_ratio, random_state=0)
    parameters = {'n_estimators': [10, 20, 50, 100, 200, 400, 600, 800, 1000],
                  'max_depth': [1, 2, 5, 10, None]}
    models['xgb'] = {'ini_model': model, 'parameters': parameters}
    
    return models

# Tune and evaluate a given model using Stratified cross-validation and Grid Search to find optimal hyperparameters and scores 
def tune_model(model, parameters, X, Y):
    scoring = {'accuracy':'accuracy',
               'precision':'precision',
               'recall':'recall',
               'f1':'f1',
               'f2':f2_scorer,
               'class_inspection':class_inspection_scorer,
               'class_inspection_reduction':class_inspection_reduction_scorer}
    cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=1, random_state=0)
    grid_search = GridSearchCV(estimator=model, param_grid=parameters, scoring=scoring, cv=cv, refit='f2', n_jobs=-1, verbose = 1)
    tuned_model = grid_search.fit(X, Y)
    
    return tuned_model

# for each project
for project in projects:
    print('======== Starting experiments for %s project ========' % project)
    
    # fetch rows for a specific project
    dataset = dataset_csv.loc[dataset_csv['project_name'] == project]
    
    # create X and Y
    X = dataset[features]
    Y = dataset['Max-Ruler']
    
    # get the models to evaluate
    models = get_models(X, Y)
    
    # evaluate the models and store results
    results[project] = {}
    temp_results_df = pd.DataFrame()
    for name, model in models.items():
        print('==== Evaluating %s (%s) =====' % (name, strftime('%d-%m-%Y %H:%M:%S', localtime())))
        
        print('- running Grid Search')
        t0 = time.time()
        
        # execute Grid Search and get the tuned model
        tuned_model = tune_model(model['ini_model'], model['parameters'], X, Y)
        
        print('- grid search finished in %.4f s' % (time.time() - t0))
        print('- best parameters: %s' % tuned_model.best_params_)
        print('- best performance: %s' % round(tuned_model.best_score_, 3))
        
        # fill results dict
        results[project][name] = {}
        results[project][name]['Best Parameters'] = tuned_model.best_params_
        results[project][name]['Best Index'] = tuned_model.best_index_
        results[project][name]['Performance'] = tuned_model.cv_results_
        
        # fill results dataframe
        temp_results_df = temp_results_df.append({
            'Project': project,
            'Model': name,
            'Best Parameters': tuned_model.best_params_,
            'Accuracy':'%.3f (%.3f)' % (tuned_model.cv_results_['mean_test_accuracy'][tuned_model.best_index_], tuned_model.cv_results_['std_test_accuracy'][tuned_model.best_index_]),
            'Precision':'%.3f (%.3f)' % (tuned_model.cv_results_['mean_test_precision'][tuned_model.best_index_], tuned_model.cv_results_['std_test_precision'][tuned_model.best_index_]),
            'Recall':'%.3f (%.3f)' % (tuned_model.cv_results_['mean_test_recall'][tuned_model.best_index_], tuned_model.cv_results_['std_test_recall'][tuned_model.best_index_]),
            'F1-score':'%.3f (%.3f)' % (tuned_model.cv_results_['mean_test_f1'][tuned_model.best_index_], tuned_model.cv_results_['std_test_f1'][tuned_model.best_index_]),
            'F2-score':'%.3f (%.3f)' % (tuned_model.cv_results_['mean_test_f2'][tuned_model.best_index_], tuned_model.cv_results_['std_test_f2'][tuned_model.best_index_]),
        }, ignore_index=True)

    results_df = results_df.append(temp_results_df, ignore_index=True)

# print results dataframe
results_df.set_index('Project', inplace=True)
results_df

In [None]:
results_df

In [None]:
# for key_project, value_project in results.items():
#     print(key_project)
#     for key_model, value_model in value_project.items():
#         print(key_model)
        
# plot model performance for comparison
acc = list()
pre = list()
rec = list()
f1 = list()
f2 = list()
ci = list()

for name in results:
    acc.append(results[name]['test_accuracy'].tolist())
    pre.append(results[name]['test_precision'].tolist())
    rec.append(results[name]['test_recall'].tolist())
    f1.append(results[name]['test_f1'].tolist())
    f2.append(results[name]['test_f2'].tolist())
    ci.append(results[name]['test_class_inspection'].tolist())

fig, axs = plt.subplots(3, 2, figsize=(16, 14))
# axs[0, 0].set_title('accuracy')
# axs[0, 0].boxplot(acc, labels=list(results.keys()), showmeans=True)
axs[0, 0].set_title('precision')
axs[0, 0].boxplot(pre, labels=list(results.keys()), showmeans=True)
axs[0, 1].set_title('recall')
axs[0, 1].boxplot(rec, labels=list(results.keys()), showmeans=True)
axs[1, 0].set_title('F1')
axs[1, 0].boxplot(f1, labels=list(results.keys()), showmeans=True)
axs[1, 1].set_title('F2')
axs[1, 1].boxplot(f2, labels=list(results.keys()), showmeans=True)
axs[2, 0].set_title('Class Inspection')
axs[2, 0].boxplot(ci, labels=list(results.keys()), showmeans=True)

plt.show()

In [None]:
# # create f2 scorer
# f2_scorer = metrics.make_scorer(metrics.fbeta_score, beta=2)

# # create class_inspection scorer
# def my_class_inspection(Y_test, y_pred):
#     cm = metrics.confusion_matrix(Y_test, y_pred)
#     TN = cm[0][0]
#     FN = cm[1][0]
#     TP = cm[1][1]
#     FP = cm[0][1]
#     class_inspection = (TP + FP)/(TN + FN + TP + FP)
#     return class_inspection
# class_inspection_scorer = metrics.make_scorer(my_class_inspection)

# # create class_inspection_reduction scorer
# def my_class_inspection_reduction(Y_test, y_pred):
#     cm = metrics.confusion_matrix(Y_test, y_pred)
#     TN = cm[0][0]
#     FN = cm[1][0]
#     TP = cm[1][1]
#     FP = cm[0][1]
#     class_inspection = (TP + FP)/(TN + FN + TP + FP)
#     recall = metrics.recall_score(Y_test, y_pred, average='binary')
#     class_inspection_reduction = (recall - class_inspection)/recall
#     return class_inspection_reduction
# class_inspection_reduction_scorer = metrics.make_scorer(my_class_inspection_reduction)

# # initialize results dataframe and dict
# results_df = pd.DataFrame()
# results = dict()

# # Get a list of models to evaluate
# def get_models(X, Y):
#     models = dict()
    
#     # Logistic Regression
#     model = LogisticRegression(class_weight='balanced', random_state=0)
#     parameters = {'penalty': ['l1', 'l2', 'elasticnet', 'none'],
#                   'C': [0.01, 0.1, 1, 10, 100, 1000]}
#     models['lr'] = {'ini_model': model, 'parameters': parameters}
# #     # SVM
# #     model = SVC(class_weight='balanced', random_state=0)
# #     parameters = {'kernel': ['linear', 'rbf'],
# #                   'C': [0.01, 0.1, 1, 10, 100, 1000]}
# #     models['svm'] = {'ini_model': model, 'parameters': parameters}
# #     # Random Forest
# #     model = RandomForestClassifier(class_weight='balanced', random_state=0)
# #     parameters = {'model__criterion': ['gini', 'entropy'],
# #                   'model__max_depth': [1, 2, 4, 6, 8, 10, None],
# #                   'model__min_samples_leaf': [1, 2, 4],
# #                   'model__min_samples_split': [2, 5, 10],
# #                   'model__n_estimators': [100, 200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}
# #     models['rf'] = {'ini_model': model, 'parameters': parameters}
# #     # XGBoost
# #     weight_ratio = float(len(Y[Y == 0]))/float(len(Y[Y == 1]))
# #     model = XGBClassifier(scale_pos_weight=weight_ratio, random_state=0)
# #     parameters = {'model__n_estimators': [5, 10, 50, 100, 200, 400, 600, 800, 1000],
# #                   'model__max_depth': [None, 2, 5, 10]}
# #     models['xgb'] = {'ini_model': model, 'parameters': parameters}
    
#     return models

# # Tune a given model using Grid Search to find optimal hyperparameters
# def tune_model(model, parameters, X, Y):
#     scoring = {'accuracy':'accuracy',
#                'precision':'precision',
#                'recall':'recall',
#                'f1':'f1',
#                'f2':f2_scorer,
#                'class_inspection':class_inspection_scorer,
#                'class_inspection_reduction':class_inspection_reduction_scorer}
#     cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=0)
#     grid_search = GridSearchCV(estimator=model, param_grid=parameters, scoring=f2_scorer, cv=cv, refit=True, n_jobs=-1, verbose = 5)
#     tuned_model = grid_search.fit(X, Y)
#     best_accuracy = tuned_model.best_score_
#     best_parameters = tuned_model.best_params_
    
#     return tuned_model, best_accuracy, best_parameters

# # Evaluate a given model using Stratified cross-validation
# def evaluate_model(model, X, Y):
#     scoring = {'accuracy':'accuracy',
#                'precision':'precision',
#                'recall':'recall',
#                'f1':'f1',
#                'f2':f2_scorer,
#                'class_inspection':class_inspection_scorer,
#                'class_inspection_reduction':class_inspection_reduction_scorer}
#     cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=0)
#     scores = cross_validate(model, X, Y, scoring=scoring, cv=cv, n_jobs=-1)
    
#     return scores

# # for each project
# for project in projects:
#     print('======== Starting experiments for %s project ========' % project)
    
#     # fetch rows for a specific project
#     dataset = dataset_csv.loc[dataset_csv['project_name'] == project]
    
#     # create X and Y
#     X = dataset[features]
#     Y = dataset['Max-Ruler']
    
#     # get the models to evaluate
#     models = get_models(X, Y)
    
#     # evaluate the models and store results
#     results[project] = {}
#     temp_results_df = pd.DataFrame()
#     for name, model in models.items():
#         print('==== Evaluating %s =====' % name)
        
#         # execute Grid Search and get the tuned model
#         print('- running Grid Search')
#         tuned_model, best_accuracy, best_parameters = tune_model(model['ini_model'], model['parameters'], X, Y)
#         # re-evaluate the tuned model using Stratified cross-validation
#         print('- running stratified cross-validation')
#         scores = evaluate_model(tuned_model, X, Y)
        
#         # fill results dict
#         results[project][name] = {}
#         results[project][name]['Best Parameters'] = best_parameters
#         results[project][name]['Performance'] = scores
        
#         # fill results dataframe
#         temp_results_df = temp_results_df.append({
#             'Project': project,
#             'Model': name,
#             'Best Parameters': best_parameters,
#             'Accuracy':'%.3f (%.3f)' % (scores['test_accuracy'].mean(), scores['test_accuracy'].std()),
#             'Precision':'%.3f (%.3f)' % (scores['test_precision'].mean(), scores['test_precision'].std()), 
#             'Recall':'%.3f (%.3f)' % (scores['test_recall'].mean(), scores['test_recall'].std()),
#             'F1-score':'%.3f (%.3f)' % (scores['test_f1'].mean(), scores['test_f1'].std()),
#             'F2-score':'%.3f (%.3f)' % (scores['test_f2'].mean(), scores['test_f2'].std()),
#         }, ignore_index=True)

#     results_df = results_df.append(temp_results_df, ignore_index=True)

# # print results dataframe
# results_df.set_index('Project', inplace=True)
# results_df