In [None]:
import pandas as pd
import numpy as np
from imblearn.over_sampling import SMOTE
from sklearn import preprocessing
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectPercentile, chi2, f_classif, RFECV, mutual_info_classif
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, brier_score_loss
from sklearn.naive_bayes import BernoulliNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from warnings import filterwarnings
from sklearn.model_selection import GridSearchCV
from functools import partial

In [None]:
filterwarnings(action='ignore')

In [None]:
def get_all_classifiers():
    models = {
        'LinearDiscriminantAnalysis': LinearDiscriminantAnalysis(),
        'QuadraticDiscriminantAnalysis': QuadraticDiscriminantAnalysis(),
        'LogisticRegression': LogisticRegression(),
        'BernoulliNaiveBayes': BernoulliNB(),
        'K-NearestNeighbor': KNeighborsClassifier(),
        'DecisionTree': DecisionTreeClassifier(),
        'RandomForest': RandomForestClassifier(),
        'SupportVectorMachine': SVC(),
        'MultilayerPerceptron': MLPClassifier()
    }

    params = {
        'LinearDiscriminantAnalysis': {},
        'QuadraticDiscriminantAnalysis': {},
        'LogisticRegression': {'C': list(np.logspace(-4, 4, 3))},
        'BernoulliNaiveBayes': {},
        'K-NearestNeighbor': {},
        'DecisionTree': {'criterion': ['gini', 'entropy'], },
        'RandomForest': {'n_estimators': [10, 100]},
        'SupportVectorMachine': {'C': [0.1, 100]},
        'MultilayerPerceptron': {'hidden_layer_sizes': [(17, 8, 17)],
                                 'activation': ['tanh', 'relu']}
    }
    return models, params


In [None]:
def get_all_methods():
    selection_methods = {
        'chi2_20p': SelectPercentile(chi2, percentile=20),
        'chi2_50p': SelectPercentile(chi2, percentile=50),
        'mutual_info_classif_20p': SelectPercentile(mutual_info_classif, percentile=20),
        'mutual_info_classif_50p': SelectPercentile(mutual_info_classif, percentile=50),
        'f_classif_20': SelectPercentile(f_classif, percentile=20),
        'f_classif_50': SelectPercentile(f_classif, percentile=50),
        'recursive_elimination': RFECV(RandomForestClassifier(), min_features_to_select=3, step=1, cv=5, scoring='f1')
    }
    return selection_methods


In [None]:
models, params = get_all_classifiers()
methods = get_all_methods()

In [None]:

class Analysis(ABC):
    # models = {
    #     'LinearDiscriminantAnalysis': LinearDiscriminantAnalysis(),
    #     'QuadraticDiscriminantAnalysis': QuadraticDiscriminantAnalysis(),
    #     'LogisticRegression': LogisticRegression(),
    #     'BernoulliNaiveBayes': BernoulliNB(),
    #     'K-NearestNeighbor': KNeighborsClassifier(),
    #     'DecisionTree': DecisionTreeClassifier(),
    #     'RandomForest': RandomForestClassifier(),
    #     'SupportVectorMachine': SVC(),
    #     'MultilayerPerceptron': MLPClassifier()
    # }
    models = {'SupportVectorMachine': SVC()}

    # params = {
    #     'LinearDiscriminantAnalysis': {},
    #     'QuadraticDiscriminantAnalysis': {},
    #     'LogisticRegression': {'C': list(np.logspace(-4, 4, 3))},
    #     'BernoulliNaiveBayes': {},
    #     'K-NearestNeighbor': {},
    #     'DecisionTree': {'criterion': ['gini', 'entropy'], },
    #     'RandomForest': {'n_estimators': [10, 100]},
    #     'SupportVectorMachine': {'C': [0.1, 100]},
    #     'MultilayerPerceptron': {'hidden_layer_sizes': [(17, 8, 17)],
    #                              'activation': ['tanh', 'relu']}
    # }
    params = {'SupportVectorMachine': {'C': [0.1, 100]}}

    selection_methods = {
        'chi2_20p': SelectPercentile(chi2, percentile=20),
        'chi2_50p': SelectPercentile(chi2, percentile=50),
        'mutual_info_classif_20p': SelectPercentile(mutual_info_classif, percentile=20),
        'mutual_info_classif_50p': SelectPercentile(mutual_info_classif, percentile=50),
        'f_classif_20': SelectPercentile(f_classif, percentile=20),
        'f_classif_50': SelectPercentile(f_classif, percentile=50),
        'recursive_elimination': RFECV(RandomForestClassifier(), min_features_to_select=3, step=1, cv=5, scoring='f1')
    }
    # selection_methods = {}

    def __init__(self, log_name, project: ProjectName, metric: str):
        self.logs = Logs(log_name)
        self.project = project
        self.project_name = project.github()
        self.metric = metric
        self.versions = self._get_versions()
        self.caching = Caching(self.project_name, self.metric, log_name)

    def set_classifiers(self, models, params):
        self.models = models
        self.params = params
        return self

    def set_selection_methods(self, methods):
        self.selection_methods = methods
        return self

    def _get_versions(self):
        self.logs.general("{0} | {1} | 1/11 | Getting Versions ...".format(self.metric, self.project_name))
        versions_dir = Config.get_work_dir_path(os.path.join("paper", "versions"))
        versions_path = os.path.join(versions_dir, self.project_name + ".csv")
        versions = pd.read_csv(versions_path)['version'].to_list()
        self.logs.success("{0} | {1} | 1/11 | Got Versions.".format(self.metric, self.project_name))
        return versions

    def analyse(self):
        try:
            datasets = self.build_datasets(self.versions)
            training_df, testing_df = self.split_dataset(datasets)
            training_df, testing_df = self.handle_missing_values(training_df, testing_df)
            selected_features, selected_training = self.select_features(training_df)
            oversampled_training = self.oversample(selected_training, training_df)
            selected_testing = self.get_selected_testing(testing_df, selected_features)
            summaries = self.hyper_parameterize(oversampled_training)
            top_summaries = self.get_top_summaries(summaries)
            configurations = self.get_configurations(top_summaries)
            self.calculate_scores(configurations, oversampled_training, selected_testing)
            self.logs.summary("{0} | {1} | project succeeded.".format(self.metric, self.project_name))

        except Analysis.FailedBuildDataset:
            self.logs.failure("{0} | {1} | 2/11 | Failed BUILDING dataset".format(self.metric, self.project_name),
                              verbose=True)
            self.logs.summary("{0} | {1} | project failed.".format(self.metric, self.project_name))
            return

        except Analysis.FailedSplit:
            self.logs.failure("{0} | {1} | 3/11 | There are missing datasets".format(self.metric, self.project_name))
            self.logs.summary("{0} | {1} | project failed.".format(self.metric, self.project_name))
            return

        except:
            self.logs.failure("{0} | {1} | Failed to analyse project".format(self.metric, self.project_name),
                              verbose=True)
            self.logs.summary("{0} | {1} | project failed.".format(self.metric, self.project_name))
            return

    @abstractmethod
    def build_datasets(self, versions):
        self.logs.general("{0} | {1} | 2/11 | Building Datasets ...".format(self.metric, self.project_name))
        pass

    def split_dataset(self, datasets):
        self.logs.general("{0} | {1} | 3/11 | Splitting dataset ...".format(self.metric, self.project_name))
        if any(dataset is None for dataset in datasets):
            raise Analysis.FailedSplit()
        training_df = pd.concat(datasets[:-1], ignore_index=True).drop(["File", "Class"], axis=1)
        testing_df = datasets[-1].drop(["File", "Class"], axis=1)
        self.caching.store_datasets(training_df, testing_df)
        self.logs.success(
            "{0} | {1} | 3/11 | Splitted training and testing datasets.".format(self.metric, self.project_name))
        return training_df, testing_df

    class FailedSplit(Exception):
        pass

    @abstractmethod
    def handle_missing_values(self, training_df, testing_df):
        self.logs.general("{0} | {1} | 4/11 | Handling missing values".format(self.metric, self.project_name))
        pass

    def select_features(self, training_df):
        self.logs.general("{0} | {1} | 5/11 | Selecting Features ...".format(self.metric, self.project_name))
        dataset = pd.DataFrame.copy(training_df)
        y = dataset.pop('Bugged').values
        X = dataset.values
        features = dataset.columns
        selector = FeatureSelectionHelper(self.selection_methods, features)
        selector.select(X, y)
        selected_features = selector.get_selected_features()
        selected_dataset = selector.get_selected_dataset()
        self.caching.store_selected_features(selected_features, selected_dataset)
        self.logs.success("{0} | {1} | 5/11 | Selected Versions.".format(self.metric, self.project_name))
        return selected_features, selected_dataset

    def oversample(self, selected_datasets, training_df):
        self.logs.general("{0} | {1} | 6/11 | Oversampling dataset ...".format(self.metric, self.project_name))
        y = training_df['Bugged'].values
        oversampled_datasets = {method: SMOTE().fit_resample(X, y) for method, X in selected_datasets.items()}
        self.caching.store_oversamples(oversampled_datasets)
        self.logs.success("{0} | {1} | 6/11 | Oversampled dataset.".format(self.metric, self.project_name))
        return oversampled_datasets

    def hyper_parameterize(self, oversample_datasets):
        def get_summary(X, y):
            helper = EstimatorSelectionHelper(self.models, self.params)
            helper.fit(X, y)
            return helper.score_summary()

        self.logs.general("{0} | {1} | 7/11 | Tuning models and parameters ...".format(self.metric, self.project_name))
        summaries = {method: get_summary(data[0], data[1])
                     for method, data in oversample_datasets.items()}
        self.caching.store_summaries(summaries)
        self.logs.success("{0} | {1} | 7/11 | Tuned models and parameters.".format(self.metric, self.project_name))
        return summaries

    def get_top_summaries(self, summaries, n=10):
        self.logs.general("{0} | {1} | 8/11 | Getting Top Summaries ...".format(self.metric, self.project_name))
        top_summaries = {method: summary[:n] for method, summary in summaries.items()}
        self.caching.store_top_summaries(top_summaries)
        self.logs.success("{0} | {1} | 8/11 | Got Top Summaries.".format(self.metric, self.project_name))
        return top_summaries

    def get_configurations(self, top_summaries):
        self.logs.general("{0} | {1} | 9/11 | Getting Configurations ...".format(self.metric, self.project_name))
        configurations = {method: list(map(lambda x: x[1].to_dict(),
                                           top_summary.drop(EstimatorSelectionHelper.get_scores_info(),
                                                            axis=1)
                                           .where(pd.notnull(top_summary), None).iterrows()))
                          for method, top_summary in top_summaries.items()}
        self.logs.success("{0} | {1} | 9/11 | Got Configurations.".format(self.metric, self.project_name))
        return configurations

    def get_selected_testing(self, testing_df, selected_features):
        self.logs.general("{0} | {1} | 10/11 | Get Selected Testing Dataset ...".format(self.metric, self.project_name))
        testing_y = testing_df.pop('Bugged').values
        selected_testing_datasets = {
            method: (testing_df[testing_df.columns.intersection(features)].values, testing_y)
            for method, features in selected_features.items()
        }
        self.caching.store_selected_testing_datasets(selected_testing_datasets)
        self.logs.success("{0} | {1} | 10/11 | Got Selected Testing Dataset.".format(self.metric, self.project_name))
        return selected_testing_datasets

    def calculate_scores(self, configurations, oversampled_training, selected_testing):
        def calculate_score(method_name, training, testing, configuration):
            estimator = self.models[configuration['estimator']]
            params = {key: val for key, val in configuration.items() if not (val is None or key == 'estimator')}
            estimator.set_params(**params)
            training_X, training_y = training
            estimator.fit(training_X, training_y)
            testing_X, testing_y = testing
            prediction_y = estimator.predict(testing_X)
            scores_dict = {
                'estimator': configuration['estimator'],
                'configuration': str(params),
                'feature_selection': method_name,
                'precision': precision_score(testing_y, prediction_y),
                'recall': recall_score(testing_y, prediction_y),
                'f1-measure': f1_score(testing_y, prediction_y),
                'auc-roc': roc_auc_score(testing_y, prediction_y),
                'brier score': brier_score_loss(testing_y, prediction_y)
            }
            return scores_dict

        self.logs.general("{0} | {1} | 11/11 | Calculate Scores ...".format(self.metric, self.project_name))
        method_names = configurations.keys()
        scores_dicts = list(map(lambda method_name:
                                list(map(lambda configuration:
                                         calculate_score(method_name,
                                                         oversampled_training[method_name],
                                                         selected_testing[method_name],
                                                         configuration),
                                         configurations[method_name])), method_names))
        scores_df = [pd.DataFrame(score) for score in scores_dicts]
        scores = pd.concat(scores_df)
        self.caching.store_scores(scores)
        self.logs.success("{0} | {1} | 11/11 | Calculated Scores.".format(self.metric, self.project_name))
        return scores

    class FailedBuildDataset(Exception):
        pass


In [None]:
class Designite(Analysis):

    def build_datasets(self, versions):
        def build_dataset(version):
            db = Builders.get_designite_builder(self.project, version)
            classes_df, methods_df = db.build()
            if classes_df.empty:
                raise Analysis.FailedBuildDataset("Designite Smells dataset is empty.")
            return classes_df

        super().build_datasets(versions)
        datasets = list(map(build_dataset, versions))
        self.logs.success("{0} | {1} | 2/11 | Built Datasets.".format(self.metric, self.project_name))
        return datasets

    def handle_missing_values(self, training_df, testing_df):
        super().handle_missing_values(training_df, testing_df)
        training_df = training_df.dropna().astype(int)
        testing_df = testing_df.dropna().astype(int)
        self.logs.success("{0} | {1} | 4/11 | Handling missing values.".format(self.metric, self.project_name))
        return training_df, testing_df
