In [None]:
# %reset

# Data prepartion
These scripts load the reports from source code metrics (NDepend), repository mining (PyDriller) and profiler (dotTrace) and filter and change the methods name in order to match the metrics. 

Import libraries

In [None]:
import csv
from xml.etree import ElementTree
import pandas as pd
from sklearn import preprocessing
import matplotlib.pyplot as plt
import numpy as np
import os
from matplotlib.ticker import StrMethodFormatter


Locations:

In [None]:
source_code_metrics_file = "/ndepend/export_query_v12.csv"
repo_mining_file = "/commits-to-v12.0.0.csv"
usage_folder = "/profiler/usage/"

repo_mining_file_for_chglines = None

test_coverage_file = None

save_to_folder = "/merged/"

Helper methods to replace parameter types

In [None]:
type_dict = {'byte': 'Byte', 'sbyte': 'SByte', 'int': 'Int32', 'uint': 'UInt32', 'short': 'Int16', 'ushort': 'UInt16',
             'long': 'Int64', 'ulong': 'UInt64', 'float': 'Single', 'double': 'Double', 'char': 'Char',
             'bool': 'Boolean', 'object': 'Object', 'string': 'String', 'decimal': 'Decimal', 'dynamic': 'Object'}

def get_type_outbox(param_type):
    if param_type in type_dict:
        return type_dict[param_type]
    return param_type


def handle_token(token, remove_parentclass, replace_type):
    if remove_parentclass:
        token = token[token.rfind(".") + 1:]
    if replace_type:
        token = get_type_outbox(token)
    return token


def replace_by_token(param_type, remove_parentclass, replace_type):
    delims = "<>,[]()"
    token = ''
    new_param_type = ''
    for i in range(len(param_type)):
        if (param_type[i] in delims):
            replace = handle_token(token, remove_parentclass, replace_type)
            token = ''
            new_param_type += replace + param_type[i]
        else:
            token = token + param_type[i]
            if (i == len(param_type) - 1):
                replace = handle_token(token, remove_parentclass, replace_type)
                new_param_type += replace
    return new_param_type


def replace_types(param_type):
    for k,v in type_dict.items():
        if k in param_type:
            param_type = param_type.replace(k, v)
    return param_type


def ctor_to_class_name(value):
    if "..ctor" in value:
        idx2 = value.rfind("..ctor")
        idx1 = value[:idx2].rfind(".")
        cls_name = value[idx1:idx2]
        # in case the class has generics keep only the class name
        if '<' in cls_name:
            cls_name = cls_name[:cls_name.find('<')]
        value = value.replace("..ctor", cls_name)
    if "..cctor" in value:
        idx2 = value.rfind("..cctor")
        idx1 = value[:idx2].rfind(".")
        cls_name = value[idx1:idx2]
        # in case the class has generics keep only the class name
        if '<' in cls_name:
            cls_name = cls_name[:cls_name.find('<')]
        value = value.replace("..cctor", cls_name)
    return value

## Source code metrics

In [None]:
def change_method_name_metrics(method):
    # for inner classes
    method = method.replace('+', '.')
    # for out, ref paramaters
    method = method.replace('&', '')
    # some parameter types include also the parent; and other reports don't include it eg SqlMapper.GridReader
    start_parameters = method.rfind('(')
    method_name = method[:start_parameters]
    parameters_text = method[start_parameters:]
    if len(parameters_text) > 2:
        parameters_text = replace_by_token(parameters_text, True, False)

    new_name = method_name + parameters_text
            
    return new_name


def get_code_metrics_data(file):
    data = pd.read_csv(file, sep=';', decimal=',')
    data['FullName'] = data['FullName'].apply(change_method_name_metrics)
    return data

Load data

In [None]:
metrics_data = get_code_metrics_data(source_code_metrics_file)
metrics_data

In [None]:
metrics_data[metrics_data['FullName'].str.contains(".Migrations.")]

Prepare the data by excluding setter, getters, anonymous types and methods with LOC not set or 0.
The final dataset contains the set of metrics used in the research.

In [None]:
metrics_data["NbLinesOfCode"].replace({0: np.nan}, inplace=True)

no_getters = metrics_data['IsPropertyGetter'] == False
no_setters = metrics_data['IsPropertySetter'] == False
no_operators = metrics_data['IsOperator'] == False
no_empty_method = metrics_data['NbLinesOfCode'].notna()
annonymous = metrics_data['FullName'].str.contains("f__AnonymousType")
migrations = metrics_data['FullName'].str.contains(".Migrations.")

filtered_metrics_data = metrics_data.copy()
filtered_metrics_data = filtered_metrics_data[no_setters & no_getters & no_operators & no_empty_method & ~annonymous & ~migrations].reset_index()

sc_metrics_data = filtered_metrics_data[["FullName", "NbLinesOfCode", "CyclomaticComplexity", "NbParameters", "NbVariables",
                                "ILNestingDepth", "NbMethodsCallingMe", "NbMethodsCalledInternal"]].copy()
sc_metrics_data.columns = ["Method", "LOC", "CC", "NP", "NV", "NEST", "Ca", "Ce"]

# Constructors are listed as 'ctor' or 'cctor', replace these with the class name (the actual name of the constructor)
sc_metrics_data['Method'] = sc_metrics_data['Method'].apply(ctor_to_class_name)
# ..ctor and ..cctor might result in the same method; however the source doesn't have all the constructors find by the Ndepend
sc_metrics_data.drop_duplicates(subset=['Method'], inplace=True, ignore_index=True)
sc_metrics_data

## Repository mining metrics

In [None]:
def change_method_name_commits(method):
    if method is np.nan:
        return method
    
    method = method.replace('::', '.')
    start_parameters = method.rfind('(')
    method_name = method[:start_parameters]
    parameters_text = method[start_parameters:]
    if len(parameters_text) > 2:
        parameters_text = parameters_text[1:-1]  # remove ()
        # remove [FromBody] from parameters text
        parameters_text = parameters_text.replace("[ FromBody ] ", "")
        parameters = parameters_text.split(', ')
        params_types = []
        for param in parameters:
            if "=" in param:
                param = param[:param.find("=")]
                
            # remove potential spaces from start and end
            param = param.strip(" ")
            
            p = param.split(' ')
            if " ? " in param:
                # if int ? _ -> Nullable<Int32>
                p_t = replace_by_token(p[0], True, True)
                param_type = "Nullable<" + p_t + ">"
                if "[]" in ''.join(p[0:-1]):
                    param_type = param_type + "[]"
            else:
                start = 0
                end = -1
                if (p[0] == "params") or (p[0] == "this") or (p[0] == "out") or (p[0] == "in") or (p[0] == "ref"):
                    # if params string [] _ -> String[] or params Func<T,object> [] _ -> Func<T,Object>[]
                    start = 1
                p_t = ''.join(p[start:end])
                param_type = replace_by_token(p_t, True, True)

            params_types.append(param_type)
        parameters_text = '(' + ','.join(params_types) + ')'

    new_name = method_name + parameters_text
    return new_name


def get_change_metrics_data(file):
    data = pd.read_csv(file, sep=';')
    data['Method_Parsed'] = data['Method'].apply(change_method_name_commits)
    return data

Load change metrics data

In [None]:
change_data = get_change_metrics_data(repo_mining_file)
change_data

Read data for number of changed lines

In [None]:
change_lines_data = None
change_lines_data_org = None
if repo_mining_file_for_chglines is not None:
    change_lines_data = get_change_metrics_data(repo_mining_file_for_chglines)
    change_lines_data['Previous_Method_Parsed'] = change_lines_data['Previous_name'].apply(change_method_name_commits)
    change_lines_data_org = change_lines_data
    change_lines_data = change_lines_data[['Method_Parsed', 'Previous_Method_Parsed', 'ChgLines']]
    change_lines_data.to_csv(save_to_folder + "change_lines.csv", sep=';', index=False)
change_lines_data

In [None]:
if change_lines_data_org is not None:
    print(change_lines_data_org['Previous_name'].isna().sum())
    diff_chg = change_data[~change_data['Method'].isin(change_lines_data_org['Previous_name'])]

In [None]:
#change_data[~change_data['Method_Parsed'].isin(change_lines_data['Previous_Method_Parsed'])]

In [None]:
#sc_metrics_data[sc_metrics_data['Method'].isin(change_lines_data['Previous_Method_Parsed'])]


In [None]:
if change_lines_data is not None:
    df = pd.merge(change_data, change_lines_data[['Previous_Method_Parsed', 'ChgLines']], how='inner', left_on='Method', right_on='Previous_Method_Parsed')

## Profiler metrics

In [None]:
def change_method_name_usage(method):
    if '`' in method:
        idx = method.find('`')
        part = method[idx+1:]
        method = method[:idx] + part[part.find('.'):]
    params_start = method.rfind('(')
    method_name = method[:params_start]
    params_text = method[params_start:]
    if len(params_text) > 2:
        parameters_text = params_text[1:-1]  # remove ()
        parameters = parameters_text.split(',')
        params_types = []
        for param in parameters:
            param = param.strip(" ")
            if param.startswith("params "):
                param = param[len("params"):]
            elif param.startswith("out "):
                param = param[len("out"):]
            elif param.startswith("in "):
                param = param[len("in"):]
            elif param.startswith("ref "):
                param = param[len("ref"):]
            param = param.strip(" ")
            params_types.append(param)
        params_text = '(' + ','.join(params_types) + ')'
        
    # for inner classes
    method_name = method_name.replace('+', '.')

    new_name = method_name + params_text
    return new_name


def get_profiler_metrics_data(file):
    calls_metrics = {'Method': [], 'Calls': []}
    root = ElementTree.parse(file).getroot()
    for type_tag in root.findall('Function'):
        method = type_tag.get('FQN')
        calls = type_tag.get('Calls')

        calls_metrics['Method'].append(str(method))
        calls_metrics['Calls'].append(int(calls))
    df = pd.DataFrame(data=calls_metrics)
    df['Method'] = df['Method'].apply(change_method_name_usage)
    return df


def get_all_profiler_metrics_data(folder):
    calls_dfs = []
    with os.scandir(folder) as entries:
        for entry in entries:
            if entry.is_file() and entry.name.endswith('.xml'):
                full_path = os.path.join(folder, entry.name)
                calls_dfs.append(get_profiler_metrics_data(full_path))
    df = pd.concat(calls_dfs)
    df = df.groupby('Method')['Calls'].sum().reset_index()
    return df


Load the profiler metrics and replace 'ctor' and 'cctor' with the class name.

In [None]:
calls_data = get_all_profiler_metrics_data(usage_folder)

calls_data['Method'] = calls_data['Method'].apply(ctor_to_class_name)
calls_data

Extra method to collect the metrics from Visual Studio Performance Profiler report

In [None]:
def get_vs_profiler_metrics_data():
    file = "C:/Users/aprodea/work/profiler/vs/Report20201015-1152_FunctionSummary.xml"
    calls_metrics = {'Method': [], 'Calls': [], 'SourceFile': []}
    root = ElementTree.parse(file).getroot()
    for type_tag in root.findall('FunctionSummary/Function'):
        method = type_tag.get('FunctionName')
        calls = type_tag.get('NumCalls')
        source_file = type_tag.get('SourceFile')
        # line_no = type_tag.get('LineNumber')

        if method.startswith('GES_GRT'):
            calls_metrics['Method'].append(str(method))
            calls_metrics['Calls'].append(int(calls))
            calls_metrics['SourceFile'].append(str(source_file))

    return pd.DataFrame(data=calls_metrics)

## Test coverage

In [None]:
test_coverage = {'Method': [], 'TotalStatements': [], 'CoveredStatements': []}

def get_children(all_path, element):
    name = element.get("Name")
    if element.tag in ['Method', 'Constructor']:
        all_path += name
        method_name = str(all_path)
        idx = method_name.rfind(':')
        if idx > 0:
            method_name = method_name[:idx]
        test_coverage['Method'].append(method_name)
        test_coverage['TotalStatements'].append(int(element.get("TotalStatements")))
        test_coverage['CoveredStatements'].append(str(element.get("CoveredStatements")))
    elif name is not None:
        all_path += name + '.'
        
    for el in list(element):
        get_children(all_path, el)


def change_name_coverage(method):
    start_parameters = method.rfind('(')
    method_name = method[:start_parameters]
    parameters_text = method[start_parameters:]
    if len(parameters_text) > 2:
        parameters_text = parameters_text.replace('params ', '')
        parameters_text = parameters_text.replace('out ', '')
        parameters_text = parameters_text.replace('ref ', '')
        parameters_text = parameters_text.replace('in ', '')
        parameters_text = replace_by_token(parameters_text, True, True)

    new_name = method_name + parameters_text
    return new_name
        
        
def get_test_coverage_data(test_coverage_file):
    root = ElementTree.parse(test_coverage_file).getroot()

#     for type_tag in root.findall('Project/*'):
    # in the new version we have assembly instead of project
    for type_tag in root.findall('Assembly/*'):
        get_children('', type_tag)

    df = pd.DataFrame(data=test_coverage)
    df['Method'] = df['Method'].apply(change_name_coverage)
    return df

Load test coverage data

In [None]:
test_coverage_data = None
if test_coverage_file is not None:
    test_coverage_data = get_test_coverage_data(test_coverage_file)
test_coverage_data

Write the resulting datasets to file

In [None]:
sc_metrics_data.to_csv(save_to_folder + "subset_metrics.csv", sep=';', index=False)
calls_data.to_csv(save_to_folder + "all_calls.csv", sep=';', index=False)
change_data.to_csv(save_to_folder + "all_changes.csv", sep=';', index=False)

if test_coverage_data is not None:
    test_coverage_data.to_csv(save_to_folder + "test_coverage.csv", sep=';', index=False)

## Few checks

In [None]:
diff_ch_and_metrics = change_data[~change_data['Method_Parsed'].isin(sc_metrics_data['Method'])].dropna()
diff_ch_and_metrics.to_csv(save_to_folder + "ch_not_found_in_metrics.csv", sep=';', index=False, na_rep=0)
diff_ch_and_metrics

In [None]:
diff_metrics_cov = None
if test_coverage_data is not None:
    diff_metrics_cov = sc_metrics_data[~sc_metrics_data['Method'].isin(test_coverage_data['Method'])].dropna()
    for m in diff_metrics_cov['Method']:
        print(m)
diff_metrics_cov

## Merging data

Select the change metrics we need and rename the columns, and sum the results (in case there are multiple rows for the same method)

In [None]:
sub_change_data = change_data[['Method_Parsed', 'Changes']]
sub_change_data.columns = ['Method', 'NChg']
sub_change_data = sub_change_data.groupby('Method')['NChg'].sum().reset_index()
sub_change_data

### Merge change data to soure code metrics data

In [None]:
merged_left_ch = pd.merge(left=sc_metrics_data, right=sub_change_data, how='left', left_on='Method', right_on='Method')
merged_left_ch

In [None]:
missing_ch = merged_left_ch[pd.isnull(merged_left_ch['NChg'])]
missing_ch.to_csv(save_to_folder + "missing_ch_in_merged.csv", sep=';', index=False)
missing_ch

In [None]:
for m in missing_ch['Method']:
    print(m)

On the merging result; remove the generics type of methods in order to match with the method names from profiler report 

In [None]:
def remove_generics_as_in_calls(params_decl):
    new_decl = ''
    count = 0

    for i in range(len(params_decl)):
        c = params_decl[i]
        if c == '<':
            count += 1
            continue
        if c == '>':
            count -= 1
            continue
        if c == '&':
            continue
        if count == 0:
            new_decl += c

    return new_decl


merged_left_ch['Method_Parsed'] = merged_left_ch['Method'].apply(remove_generics_as_in_calls)
merged_left_ch

Select the profiler metrics we need and rename the columns, and sum the results (in case there are multiple rows for the same method)

In [None]:
sub_calls_data = calls_data[['Method', 'Calls']]
sub_calls_data.columns = ['Method_calls', 'NCall']
sub_calls_data = sub_calls_data.groupby('Method_calls')['NCall'].sum().reset_index()
sub_calls_data

### Merge also the profiler metrics based on the renamed variable

In [None]:
merged_left_ca = pd.merge(left=merged_left_ch, right=sub_calls_data, how='left', left_on='Method_Parsed', right_on='Method_calls')
missing_merged_left_ca = merged_left_ca[pd.isnull(merged_left_ca['NCall'])]
merged_left_ca


Drop the renamed variable and the variable from the profiler, to keep only the previous method name

In [None]:
merged_left_ca.drop(['Method_Parsed', 'Method_calls'], axis=1, inplace=True)
merged_left_ca

Save the resulting dataset

In [None]:
merged_left_ca.to_csv(save_to_folder + "merged.csv", sep=';', index=False)

Check to see how many rows do not have values

In [None]:
merged_left_ca.isna().sum()

Replace missing rows with 1 for change (assuming that every method had at least the initial commit), and 0 for the rest

In [None]:
merged_left_ca['NChg'].fillna(1, inplace=True)
merged_left_ca['NCall'].fillna(0, inplace=True)

In [None]:
merged_left_ca.isna().sum()

In [None]:
merged_left_ca.dtypes

Save the resulting dataset with filled missing variables

In [None]:
merged_left_ca.to_csv(save_to_folder + "merged_filledna.csv", sep=';', index=False)

### Merge metrics with test coverage

In [None]:
from pathlib import Path

metrics_file = save_to_folder + "merged_filledna.csv"
metrics_data = pd.read_csv(metrics_file, sep=';')

test_cov_file = save_to_folder + "test_coverage.csv"
data_combined = None
if Path(test_cov_file).is_file():
    test_data = pd.read_csv(test_cov_file, sep=';')

    data_combined = pd.merge(metrics_data, test_data, on='Method', how='left')

    data_combined.to_csv(save_to_folder + "merged_complete.csv", sep=';', index=False)

data_combined

In [None]:
if data_combined is not None:
    print(data_combined.isna().sum())

## Check and visualise the data

In [None]:
merged_left_ca.shape

In [None]:
list_columns = merged_left_ca.columns.tolist()
list_columns.remove('Method')

In [None]:
merged_left_ca[list_columns].describe()

In [None]:
print(merged_left_ca['NCall'].describe()[2])

In [None]:
merged_left_ca['LOC'].sum()

In [None]:
import statsmodels.api as sm
import matplotlib.pyplot as plt
import scipy.stats as stats


In [None]:
fig, axes = plt.subplots(nrows=3, ncols=3, figsize=(20,15))
ax= axes.flatten()
for i in range(len(list_columns)):
    # ax = plt.subplot(10, 1, i+1)
    for label in (ax.get_xticklabels() + ax.get_yticklabels()):
        label.set_fontsize(16)
    col_name = list_columns[i]
    sm.qqplot(merged_left_ca[col_name], marker='o', markerfacecolor='none', markeredgecolor='k', alpha=0.5,
              ax = ax[i])
    ax[i].set_ylabel(col_name)

# plt.tight_layout(pad=1.5)
# plt.show()

plt.savefig(save_to_folder + 'plots/qqplots_unscaled.pdf', bbox_inches = 'tight', pad_inches = 0)

In [None]:
stats.probplot(change_data['Changes'], dist="norm", plot=plt)
plt.show()

In [None]:
stats.probplot(merged_left_ca['NP'], dist="norm", plot=plt)
plt.show()

## Scale data

In [None]:
from pyclustertend import hopkins, vat, assess_tendency_by_mean_metric_score
from sklearn.preprocessing import scale, MinMaxScaler, minmax_scale, RobustScaler,robust_scale


In [None]:
X = robust_scale(merged_left_ca[list_columns])
hopkins(X, merged_left_ca.shape[0])

In [None]:
X = minmax_scale(merged_left_ca[list_columns])
hopkins(X, merged_left_ca.shape[0])

In [None]:
scaled_data = merged_left_ca.copy()

for col_name in list_columns:
    col = scaled_data[col_name]
    min_col, max_col = col.min(), col.max()
#     print(col_name, min_col, max_col)
    scaled_data[col_name] = (col - min_col) / (max_col - min_col)
    
scaled_data

In [None]:
hopkins(scaled_data[list_columns], scaled_data.shape[0])

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=3, figsize=(20,15))
ax= axes.flatten()
for i in range(len(list_columns)):
    col_name = list_columns[i]
    sm.qqplot(scaled_data[col_name], marker='o', markerfacecolor='none', markeredgecolor='k', alpha=0.5,
              ax = ax[i])
    ax[i].set_ylabel(col_name)

# plt.tight_layout(pad=1.5)
# plt.show()

plt.savefig(save_to_folder + '/plots/qqplots_scaled.pdf', bbox_inches = 'tight', pad_inches = 0)

In [None]:
from collections import Counter

x = merged_left_ca['CC']
x = x[~np.isnan(x)]
print(Counter(x))

# plt.hist(x)