## Start

In [1]:
import pandas as pd
import regex as re
import ast
from scipy.stats import chi2_contingency, fisher_exact, spearmanr, kruskal
import sys
import matplotlib.pyplot as plt
from matplotlib import gridspec
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer
import seaborn as sns
import tqdm as tqdm
import os
import pickle as pkl
from dotenv import load_dotenv
from threading import Thread
from enum import Enum
import shutil
import datetime
import math

load_dotenv(override=True)



False

In [2]:
all_information = pd.read_csv('input/all_information.csv', sep='\t')
variables_overview = pd.read_csv('input/variables_overview.csv', sep='\t')

In [3]:
FREQ_THRESHOLD = 8
inverse_str = 'not'

BUG_ID = 'F1 - Bug ID'

class Aspect(Enum):
    target = 'target'
    topic = 'topic'
    action = 'action'

ASPECTS = [Aspect.target, Aspect.topic, Aspect.action]

In [4]:
def get_variables_df() -> pd.DataFrame:
    '''Returns the variables dataframe'''
    return pd.read_excel(f'input/coded_dataset/variables.xlsx', sheet_name='Sheet1')

def get_nom_list_variables() -> pd.DataFrame:
    '''Returns all nominal and list of nominal variables'''
    return variables_overview[variables_overview['Scale'].isin(['Nominal', 'List of nominals', 'Boolean'])]

def get_num_variables() -> pd.DataFrame:
    '''Returns all numerical variables'''
    return variables_overview[variables_overview['Scale'].isin(['Integer', 'Float'])]

In [5]:
def is_identifier(variable_name: str) -> bool:
    '''Returns True if the variable name refers to an identifier, False otherwise'''
    return variable_name.startswith('Identifier') or variable_name.endswith('ID') or variable_name.endswith('IDs') or variable_name.endswith('hash')

In [6]:
def get_group(bug, cg, comp):
    ''' Returns the group of the variable based on the significance of the chi2 test
    Args:
        bug (bool): Whether the difference in distribution (variance) betweent the variables in the bug group is significant
        cg (bool): Whether the difference in distribution (variance) betweent the variables in the control group is significant
        comp (bool): Whether the difference in distribution (variance) between the two groups is significant
    Returns:
        str: The group of the variable
    '''

    if bug is None and cg is None and comp is None:
        return 'E.1'

    if None in [bug, cg] and True in [bug, cg]:
        return '32' # data not available for one group, however significant variance in the other group

    if None in [bug, cg] and False in [bug, cg]:
        return '31' # data not available for one group, no significant variance in the other group

    if bug == False and cg == False:
        return '1' # no significant variance in both groups
    
    if bug == True and cg == True:
        if comp is None: # comparison not possible, however variance in both groups are significant
            return '4.1'
        if comp == True: # significant difference between CG and BG
            return '3.1'
        if comp == False: # no significant difference between CG and BG
            return '2.1'
    
    if (bug == True and cg == False) or (bug == False and cg == True):
        # for 3.3 we do not need to compare the groups
        return '3.3' # significant variance in one group, no significant variance in the other group
    
    raise Exception(f'This should not happen... {bug} {cg} {comp}')

In [7]:
output_path = 'output/group_assignment/'
figure_output_path = f'{output_path}figures/'

def get_zip():
    '''Creates a zip file of the output folder'''
    now = datetime.datetime.now()
    shutil.make_archive(f'output_{now.strftime("%Y-%m-%d_%H-%M-%S")}', 'zip', output_path)

In [8]:
BIN_ENC_SEP = '+++'

def get_sub_column_name(column, label):
    '''Returns formatted sub column name with separator'''
    return f'{column}{BIN_ENC_SEP}{label}'

def get_labels_from_sub_columns(df: pd.DataFrame, column: str):
    '''Splits the nominal labels from the(formatted) sub columns 

    Args:
        df (pd.DataFrame): The dataframe
        column (str): The column name
    Returns:
        list: The nominal labels of column
        list: The sub columns of column
    '''
    sub_columns = [c for c in df.columns if c.startswith(column)]
    nominal_labels = [c.split(BIN_ENC_SEP)[1] for c in sub_columns if BIN_ENC_SEP in c]

    return nominal_labels, sub_columns

def multi_label_binarize(df: pd.DataFrame, column):
    '''Binarizes the multi label column

    Args:
        df (pd.DataFrame): The dataframe
        column (str): The column name that should be binarized
    Returns:
        pd.DataFrame: The binarized dataframe, new column names are formatted sub column name with separator
    '''

    labels = set()
    def get_all_labels(x):
        if x is not None:
            labels.update(x)
    
    df[column].apply(lambda x: get_all_labels(x))

    mlb = MultiLabelBinarizer()
    mlb.fit([list(labels)])

    encoded_df = pd.DataFrame(mlb.transform(df[column]), columns=[get_sub_column_name(column, mlb_class) for mlb_class in mlb.classes_]).astype(bool)
    return pd.concat([df.drop(column, axis=1).reset_index(drop=True), encoded_df], axis=1)

def single_label_binarize(df: pd.DataFrame, column):
    '''Binarizes a single column

    Args:
        df (pd.DataFrame): The dataframe
        column (str): The column name that should be binarized
    Returns:
        pd.DataFrame: The binarized dataframe (get_dummies), new column names are formatted sub column name with separator
    '''
    return pd.get_dummies(df, columns=[column], prefix=column, prefix_sep=BIN_ENC_SEP)

In [9]:
def remove_suffix(x):
    '''Removes the axial code level topic for simplification of codes'''
    if type(x) == str:
        return x.split('::(')[0]
    if type(x) == list:
        return [remove_suffix(y) for y in x]

def get_inv_name(name: str) -> str:
    '''Returns the inverse name of the variable for One versus Rest encoding'''
    return f'{name} [{inverse_str}]'

def get_aspect_columns(column):
    '''Returns the column names of aspect columns of a column'''
    return [f'{column}: {aspect.name}' for aspect in ASPECTS]

def transform_columns(df: pd.DataFrame, columns: list[str], scales: list[str]) -> pd.Series:
    '''Transforms the columns based according to the scales and binarizes the columns where necessary.

    Args:
        df (pd.DataFrame): The dataframe
        columns (list[str]): The columns that should be transformed
        scales (list[str]): The scales of the columns
    Returns:
        pd.DataFrame: The transformed dataframe
    '''

    df = df.copy().dropna(subset=columns)

    for column, scale in zip(columns, scales):
        scale = scale.lower()
        id = column.split(' - ')[0]
        name = column.split(' - ')[1]

        if scale == 'list of nominals':

            def get_aspect(index, list_of_aspects):
                if list_of_aspects is None:
                    return None
                return [x[index] for x in [x.split('_') for x in list_of_aspects] if len(x) > index]

            df[column] = df[column].apply(lambda x: ast.literal_eval(x) if not pd.isna(x) else None)

            if name.startswith('Aspect'):
                aspect = Aspect[name.split(': ')[1]]
                aspect_index = ASPECTS.index(aspect)

                df[column] = df[column].apply(lambda x: get_aspect(aspect_index, x) if x is not None else None)

            df = multi_label_binarize(df, column)
        else:
            if scale not in ['integer', 'float']:
                df[column] = df[column].astype(str)

            if scale == 'integer':
                df[column] = df[column].astype(int)

            if id in ['C2.1', 'C2.2', 'C2.3', 'B2']:
                df[column] = df[column].apply(lambda x: remove_suffix(x))

            if scale not in ['integer', 'float']:
                df = single_label_binarize(df, column)

    
    return df

In [10]:
def set_error(column_a, column_b, reason, tag=None):
    '''Returns a dictionary with the error information that can subsitute the result of the comparison functions'''
    return {
        'column_a': column_a,
        'column_b': column_b,
        'group': 'E',
        'method': tag,
        'reason': reason,
    }

In [11]:
def save_fig(fig: plt.Figure, file_path):
    '''Saves the figure to the file path'''
    fig.savefig(file_path, format='pdf', bbox_inches='tight')

In [12]:
def get_valid_labels(df, columns):
    ''' Returns valid and threshold labels based on the frequency. 
        Valid labels are those that meet the frequency threshold for both the label and its inverse.
        Threshold labels are those that meet the frequency threshold for the label.

        If there is not at least one label meeting the threshold, the variable will not be considered.
    '''
    valid_labels = set()
    threshold_labels = set()
    for column in columns:
        self_count = df[column].sum()
        rest_count = (~df[column]).sum()

        if self_count >= FREQ_THRESHOLD:
            threshold_labels.add(column)
            if rest_count >= FREQ_THRESHOLD:
                valid_labels.add(column)

    return valid_labels, threshold_labels

In [13]:
variables_main_df = None

def create_iterator(vars_a, vars_b, test_specific_pair=(None, None)):
    '''Creates an iterator that yields the columns of the variables that should be compared. Skips the identifier columns and expands the aspect columns.

    Args:
        vars_a (pd.DataFrame): The variables of group A
        vars_b (pd.DataFrame): The variables of group B
        test_specific_pair (tuple): The specific pair of variables that should be compared
    Returns:
        tuple: The columns of the variables that should be compared
    '''

    global variables_main_df
    
    variables_main_df = get_variables_df()

    def handle_aspects(column: str):
        aspect_columns = get_aspect_columns(column)
        for aspect_column in aspect_columns:
            variables_main_df[aspect_column] = variables_main_df[column]
        return aspect_columns

    variables_a_done = set()

    data_a = vars_a[vars_a['ID'] == test_specific_pair[0]] if test_specific_pair[0] is not None else vars_a
    for id_a, name_a, scale_a, aggregation_a in data_a[['ID', 'Name', 'Scale', 'Aggregation']].values:
        variables_a_done.add(id_a)
        if is_identifier(name_a):
            continue

        columns_a = [f'{id_a} - {name_a}' + (f' [{aggregation_a}]' if pd.notna(aggregation_a) else '')]
        if name_a.startswith('Aspects'):
            columns_a = handle_aspects(columns_a[0])

        for i_column_a, column_a in enumerate(columns_a):
            data_b = vars_b[vars_b['ID'] == test_specific_pair[1]] if test_specific_pair[1] is not None else vars_b
            for id_b, name_b, scale_b, aggregation_b in data_b[['ID', 'Name', 'Scale', 'Aggregation']].values:
                if id_b in variables_a_done:
                    continue

                if is_identifier(name_b):
                    continue

                columns_b = [f'{id_b} - {name_b}' + (f' [{aggregation_b}]' if pd.notna(aggregation_b) else '')]
                if name_b.startswith('Aspects'):
                    columns_b = handle_aspects(columns_b[0])

                for i_column_b, column_b in enumerate(columns_b):
                    aspect_a = None
                    aspect_b = None
                    if len(columns_b) > 1:
                        aspect_b = ASPECTS[i_column_b]
                    if len(columns_a) > 1:
                        aspect_a = ASPECTS[i_column_a]

                    yield (column_a, column_b, scale_a, scale_b, aspect_a, aspect_b)


## Nominal Pairs

Cross tablulation and fishers exact test

In [27]:
def flatten_crosstab(df: pd.DataFrame, flat_column_name) -> pd.DataFrame:
    '''Flattens the crosstab dataframe with frequencies in a single column and pairs of labels as index'''
    flattened = df.stack()
    flattened.index = flattened.index.map(lambda x: '_'.join(map(str, x)))
    return flattened.to_frame(flat_column_name)

def get_comparison_table(crosstab_bug: pd.DataFrame, crosstab_cg: pd.DataFrame) -> pd.DataFrame:
    '''Returns the comparison table based on the flattened crosstab dataframes'''
    df_bug = flatten_crosstab(crosstab_bug, 'bug')
    df_cg = flatten_crosstab(crosstab_cg, 'cg')

    comp = pd.concat([df_bug, df_cg], axis=1).fillna(0)
    return comp

def get_crosstab(variables_df: pd.DataFrame, columns_a, columns_b, inverse, filter_frequency, cg=None):
    '''Returns the crosstab of the columns_a and columns_b of the variables dataframe
    
    Args:
        variables_df (pd.DataFrame): The variables dataframe
        columns_a (list): The columns of group A
        columns_b (list): The columns of group B
        inverse (bool): Whether the inverse of the columns should be considered
        filter_frequency (bool): Whether the frequency threshold should be applied
        cg (bool): Whether the control group should be considered

    Returns:
        pd.DataFrame: The cross tabulation
    '''

    if cg is True:
        variables_df = variables_df[variables_df['CG'] == True]
    elif cg is False:
        variables_df = variables_df[variables_df['CG'] == False]

    def get_labels_df(columns):
        items = []
        for column in columns:
            column_items = []
            column_items_inverse = []
            for bug_id, item in variables_df[[BUG_ID, column]].values:
                if item == 1:
                    column_items.append({
                        BUG_ID: bug_id,
                        'label': column
                    })
                elif inverse:
                    column_items_inverse.append({
                        BUG_ID: bug_id,
                        'label': get_inv_name(column)
                    })
            if filter_frequency:
                # Only add items if they meet the frequency threshold
                if len(column_items) >= FREQ_THRESHOLD:
                    items.extend(column_items)
                
                    if len(column_items_inverse) >= FREQ_THRESHOLD:
                        items.extend(column_items_inverse)
            else:
                items.extend(column_items)
                items.extend(column_items_inverse)

        if len(items) == 0:
            items = [{BUG_ID: None, 'label': None}]
        return pd.DataFrame(items)

    labels_a = get_labels_df(columns_a)
    labels_b = get_labels_df(columns_b)

    joined = labels_a.merge(labels_b, on=BUG_ID, how='outer', suffixes=('_a', '_b'))
    crosstab = pd.crosstab(joined['label_a'], joined['label_b'])

    # crosstab.to_csv(f'{output_path}{inverse_a}_{inverse_b}_{cg}.csv')

    return crosstab


In [28]:
fisher_significance_count = []

def get_fisher_significance(crosstab):
    """ Perform a Fisher's exact test of independence on the crosstab

    Args:
        crosstab (pd.DataFrame): Crosstab of the two variables
    Returns:
        tuple: (bool, float) Whether the difference in distribution is significant, and the p-value
    """
    global fisher_significance_count

    res = fisher_exact(crosstab)
    p_value = res.pvalue

    if p_value is not None and not np.isnan(p_value):
        fisher_significance_count.append(p_value <= 0.05)

    return (p_value <= 0.05, p_value)

In [29]:
nom_bool_vars = get_nom_list_variables()

In [30]:
def get_relation(crosstab_bug, crosstab_cg, column_a, column_b):
    '''Returns the group of the variable based on the significance of the fisher test

    Args:
        crosstab_bug (pd.DataFrame): The crosstab of the bug group
        crosstab_cg (pd.DataFrame): The crosstab of the control group
        column_a (str): The name of the first column of the pair
        column_b (str): The name of the second column of the pair
    Returns:
        dict: The group of the variable and the reason
    '''

    reason = []

    def get_significance(crosstab, tag):
        significant = None
        try:
            (significant, p_value) = get_fisher_significance(crosstab)
            reason.append(f'{tag}: p-value: {p_value}, {crosstab.shape}')
        except ValueError as e:
            reason.append(f'{tag} has no data: {e}')
        return significant
    
    significant_cg = get_significance(crosstab_cg, 'CG')
    significant_bug = get_significance(crosstab_bug, 'BG')

    if significant_bug is None and significant_cg is None:
        return set_error(column_a, column_b, '\n'.join(reason))

    significant_comp = None
    comparison_table = get_comparison_table(crosstab_bug, crosstab_cg)

    significant_comp = get_significance(comparison_table, 'Comparison Table')
    side = ''

    if significant_bug is not None and significant_cg is not None:
        if significant_bug and not significant_cg:
            side = 'BG'
        if not significant_bug and significant_cg:
            side = 'CG'
        

    return {
        'group': get_group(significant_bug, significant_cg, significant_comp),
        'reason': '\n'.join(reason),
        'side': side
    }
    
def df_get(df: pd.DataFrame, a, b, fallback = 0) -> int:
    '''Returns the value of the dataframe at the index a and column b, if the value is not available, the fallback value is returned'''
    try:
        return int(df.loc[a, b])
    except:
        return fallback

In [None]:
result = []
fisher_significance_count = []

TEST_SPECIFIC_PAIR = (None, None)
# TEST_SPECIFIC_PAIR = (None, 'B2')

result_threads = []

p_bar = tqdm.tqdm(create_iterator(nom_bool_vars, nom_bool_vars, TEST_SPECIFIC_PAIR))
for column_a, column_b, scale_a, scale_b, _, _ in p_bar:
    p_bar.set_description(f'Processing {column_a}')
    p_bar.set_postfix_str(f'{column_b}')
    
    variables_df = transform_columns(variables_main_df, [column_a, column_b], [scale_a, scale_b])


    _, column_a_sub_columns = get_labels_from_sub_columns(variables_df, column_a)
    _, column_b_sub_columns = get_labels_from_sub_columns(variables_df, column_b)

    add_inverse = False
    filter_frequency = False

    crosstab_bug = get_crosstab(variables_df, column_a_sub_columns, column_b_sub_columns, add_inverse, filter_frequency, cg=False)
    crosstab_cg = get_crosstab(variables_df, column_a_sub_columns, column_b_sub_columns, add_inverse, filter_frequency, cg=True)
    
    result_ovr_ab = get_relation(crosstab_bug, crosstab_cg, column_a, column_b)


    result.append({
        'column_a': column_a,
        'column_b': column_b,
        'group': result_ovr_ab['group'],
        'reason': result_ovr_ab['reason'],
        'onesided_trend': result_ovr_ab['side'] if 'side' in result_ovr_ab else ''
    })

relation_overview_df = pd.DataFrame(result)
relation_overview_df.to_excel(output_path + 'relation_overview_fisher.xlsx', index=False)

print(f'significant results: {sum(fisher_significance_count) / len(fisher_significance_count)}')

print(f'Done - {datetime.datetime.now()}')

relation_overview_df['group'].value_counts()

Processing B1 - Build tools: : 41it [00:01, 26.40it/s, B2 - Dependency resolution]                                         

significant results: [0.12380952]
Done - 2025-04-10 21:31:11.421694





group
1      18
31     12
32      6
3.3     4
2.1     1
Name: count, dtype: int64

## Nominal/Numeric Pairs

Boxplots

In [14]:
nom_bool_vars = get_nom_list_variables()
num_vars = get_num_variables()

In [15]:
variables_df = get_variables_df()

# variables_df = variables_df.copy().dropna(subset=['I8 - does it have a wiki? or does it have a specification? [any]', 'C4 - Commit file bugginess'])
# variables_df = variables_df[variables_df['CG'] == True]
# variables_df['I8 - does it have a wiki? or does it have a specification? [any]'] = variables_df['I8 - does it have a wiki? or does it have a specification? [any]'].astype(bool).astype(str)

In [16]:

def sort_based_on_order(list_to_sort) -> list:
    '''Sorts the list based on a predefined order list, so that the order is consistent'''

    order_list = (['undecided', 'low', 'medium', 'high', 'critical']
    + ['core', 'peripheral', 'not involved']
    + ['bug', 'feature', 'improvement']
    + ['yes', 'no']
    + ['true', 'false'])

    if all([str(x).lower() in order_list for x in list_to_sort]):
        order_index = {value: index for index, value in enumerate(order_list)}
        sorted_list = sorted(list_to_sort, key=lambda x: order_index.get(str(x).lower(), float('inf')))
    else:
        sorted_list = sorted(list_to_sort, key=lambda x: str(x).lower())

    return sorted_list


def draw_barchart_distribution(variables_ab_df: pd.DataFrame, column_a, ax5: plt.Axes, ax6: plt.Axes):
    '''Draws a barchart of the distribution of the nominal variable in the dataset'''
    
    nominal_labels, _ = get_labels_from_sub_columns(variables_ab_df, column_a)
    
    variables_ab_df_bg = variables_ab_df[variables_ab_df['CG'] == False]
    variables_ab_df_cg = variables_ab_df[variables_ab_df['CG'] == True]

    id_count_bg = len(variables_ab_df_bg[BUG_ID].unique())
    id_count_cg = len(variables_ab_df_cg[BUG_ID].unique())

    labels = nominal_labels
    labels = sort_based_on_order(labels)
    sub_columns = [get_sub_column_name(column_a, label) for label in labels]

    label_counts_bg = variables_ab_df_bg[sub_columns].sum().values
    label_counts_cg = variables_ab_df_cg[sub_columns].sum().values

    def draw_barchart(ax: plt.Axes, counts_a: list[int], counts_b: list[int], labels, div_a = 1, div_b = 1):
        width = 0.35
        x = np.arange(len(labels))

        div_a = max(div_a, 1)
        div_b = max(div_b, 1)

        percentages_a = [count/div_a for count in counts_a]
        percentages_b = [count/div_b for count in counts_b]

        bars_1 = ax.bar(x - width/2, percentages_a, width, label='BG')
        bars_2 = ax.bar(x + width/2, percentages_b, width, label='CG')

        ax.bar_label(bars_1, labels=[f'{round(count/div_a, 2)}% ({count})' for count in counts_a] if div_a > 1 else [f'{count}' for count in counts_a])
        ax.bar_label(bars_2, labels=[f'{round(count/div_b, 2)}% ({count})' for count in counts_b] if div_b > 1 else [f'{count}' for count in counts_b])

        for bar, count in list(zip(bars_1, counts_a)) + list(zip(bars_2, counts_b)):
            if count < FREQ_THRESHOLD:
                bar.set_facecolor('gray')
                bar.set_alpha(0.2)

        ax.set_xticks(x)
        ax.set_ylabel('Frequency')
        ax.set_xlabel(column_a)
        ax.set_xticklabels(labels, rotation=45, ha='right')
        ax.legend(loc='upper right')

    draw_barchart(ax5, label_counts_bg, label_counts_cg, labels, id_count_bg, id_count_cg)
    draw_barchart(ax6, [id_count_bg], [id_count_cg], ['ID count'])

    return f'Abs. count: {id_count_bg} (BG); {id_count_cg} (CG)'


def draw_num_distribution_with_boxplot(variables_df: pd.DataFrame, column_a, column_b, ax1: plt.Axes, ax2: plt.Axes, cg=None) -> tuple[bool, str]:
    ''' Draw a histogram and boxplot of the variable in the dataset

        Args:
            variable_df (pd.DataFrame): The dataset
            column_a (str): The name of the nominal/boolean variable
            column_b (str): The name of the numeric variable
            cg (bool): Whether to filter the data by CG or BIC
    '''
    labels, _ = get_labels_from_sub_columns(variables_df, column_a)
    labels = sort_based_on_order(labels)

    if cg is True:
        variables_df = variables_df[variables_df['CG'] == True]
    elif cg is False:
        variables_df = variables_df[variables_df['CG'] == False]

    title = f'{column_b.split(" - ")[0]}/{column_a.split(" - ")[0]}'
    plot_data = []
    plot_data_no_inverse = []
    plot_labels = []
    plot_labels_count = []

    for label in labels:
        plot_labels.append(label)
        sub_column_name = get_sub_column_name(column_a, label)
        variables_column_b = variables_df[variables_df[sub_column_name] == True][column_b].dropna()
        plot_labels_count.append(f'{label} ({variables_column_b.count()})')
        plot_data.append(variables_column_b)
        plot_data_no_inverse.append(variables_column_b)

        if len(labels) > 2:
            plot_labels.append(get_inv_name(label))
            variables_column_b_not = variables_df[variables_df[sub_column_name] == False][column_b].dropna()
            plot_labels_count.append(f'{get_inv_name(label)} ({variables_column_b_not.count()})')
            plot_data.append(variables_column_b_not)
        
    def draw(ax: plt.Axes, fliers, labels):
        # ax.set_title(f'Distribution of {title} ({"CG" if cg else "BG"})' + (' no outliers' if not fliers else ''))
        ax.set_ylabel(column_b)
        ax.set_xlabel(column_a)

        boxplots = ax.boxplot(plot_data, showfliers=fliers, patch_artist=True)

        # if fliers == False and cg == False:
        #     ax.set_ylim(None, 45)
        
        for boxplot, data, label in zip(boxplots['boxes'], plot_data, plot_labels):
            if label.endswith(f'[{inverse_str}]'):
                boxplot.set_facecolor('skyblue')
            if len(data) < FREQ_THRESHOLD:
                boxplot.set_facecolor('gray')
                boxplot.set_alpha(0.2)
        
        ax.set_xticks(range(1, len(labels) + 1))
        ax.set_xticklabels(labels, rotation=45, ha='right')

    draw(ax1, False, plot_labels_count)
    if ax2 is not None:
        draw(ax2, True, plot_labels_count)

    kruskal_data = [data for data in plot_data_no_inverse if len(data) >= FREQ_THRESHOLD]
    if len(kruskal_data) > 1:
        kr, p = kruskal(*kruskal_data)

    return True, ('CG' if cg else 'BG') + f': Kruskal-Wallis - Significant difference {p <= 0.05}' if len(kruskal_data) > 1 else 'Not enough data for Kruskal-Wallis'
    # return True, ('CG' if cg else 'BG')

In [None]:
relation_overview = {}

variables_a_done = set()
save_threads = []

TEST_SPECIFIC_PAIR = (None, None)
# TEST_SPECIFIC_PAIR = ('R4', 'I7')

p_bar = tqdm.tqdm(create_iterator(nom_bool_vars, num_vars, TEST_SPECIFIC_PAIR))
for column_a, column_b, scale_a, scale_b, aspect, _ in p_bar:
    p_bar.set_description(f'Processing {column_a}')
    p_bar.set_postfix_str(f'{column_b}')

    variables_ab_df = variables_main_df.copy()
    variables_ab_df = transform_columns(variables_ab_df, [column_a, column_b], [scale_a, scale_b])

    nominal_labels, sub_columns_a = get_labels_from_sub_columns(variables_ab_df, column_a)
    
    hints = []

    filename = [column_a.split(' - ')[0], column_b.split(' - ')[0]]
    if aspect is not None:
        filename.append(aspect.name)
    file_path = f'{figure_output_path}boxplots/{"_".join(filename)}_histogram_boxplot.pdf'

    variables_ab_df_bg = variables_ab_df[variables_ab_df['CG'] == False]
    variables_ab_df_cg = variables_ab_df[variables_ab_df['CG'] == True]

    if len(variables_ab_df_bg) == 0 and len(variables_ab_df_cg) == 0:
        relation_overview[f'{column_a} {column_b}'] = set_error(column_a, column_b, 'No data for both groups')
        continue

    valid_labels_bg, semi_valid_labels_bg = get_valid_labels(variables_ab_df_bg, sub_columns_a)
    valid_labels_cg, semi_valid_labels_cg = get_valid_labels(variables_ab_df_cg, sub_columns_a)

    if len(semi_valid_labels_bg) == 0 and len(semi_valid_labels_cg) == 0:
        relation_overview[f'{column_a} {column_b}'] = set_error(column_a, column_b, 'No frequent labels for both groups')
        continue

    
    if len(semi_valid_labels_bg) == 0 and len(semi_valid_labels_cg) == 0:
        hints.append('BG and CG: no frequent labels')
    elif len(valid_labels_bg & valid_labels_cg) == 0:
        hints.append('BG and CG: no common fully valid labels')
    if len(semi_valid_labels_bg) == 0:
        hints.append('BG: no frequent labels')
    elif len(valid_labels_bg) == 0:
        hints.append('BG: no fully valid labels')
    if len(semi_valid_labels_cg) == 0:
        hints.append('CG: no frequent labels')
    elif len(valid_labels_cg) == 0:
        hints.append('CG: no fully valid labels')
    if len(valid_labels_bg) == 0 and len(valid_labels_cg) == 0:
        relation_overview[f'{column_a} {column_b}'] = set_error(column_a, column_b, 'No fully valid labels for both groups. Cannot compare.')
        continue

    plt.rcParams.update({'font.size': 16})
    width = max(len(nominal_labels) * 2, 14)
    # fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6)) = plt.subplots(3, 2, figsize=(width, 22))
    fig, ((ax1, ax2)) = plt.subplots(1, 2, figsize=(14, 7))
    ax3, ax4, ax5, ax6 = None, None, None, None


    drew_cg, hint_cg = draw_num_distribution_with_boxplot(variables_ab_df, column_a, column_b, ax2, ax4, cg=True) if len(semi_valid_labels_cg) else (None, None)
    drew_bug, hint_bg = draw_num_distribution_with_boxplot(variables_ab_df, column_a, column_b, ax1, ax3, cg=False) if len(semi_valid_labels_bg) else (None, None)

    hints.append(hint_bg)
    hints.append(hint_cg)
    
    if (drew_bug or drew_cg) and None not in [ax5, ax6]:
        # hints.append(f'Y-axis mean: {round(variables_ab_df_bg[column_b].mean(), 2)} (BG); {round(variables_ab_df_cg[column_b].mean())} (CG)')    
        hint = draw_barchart_distribution(variables_ab_df, column_a, ax5, ax6)
        hints.append(hint)

    if os.path.isdir(f'{figure_output_path}boxplots') is False:
        os.mkdir(f'{figure_output_path}boxplots')

    if drew_bug or drew_cg:
        plt.tight_layout()
        thread = Thread(target=save_fig, args=(fig, file_path), daemon=True)
        thread.start()
        save_threads.append(thread)

    relation_overview[f'{column_a} {column_b}'] = {
        'column_a': column_a,
        'column_b': column_b,
        'group': None,
        'reason': None,
        'hint': '\n'.join([hint for hint in hints if hint is not None]),
        'hyperlink': f'=HYPERLINK(J1 & "{file_path.replace(output_path, "")}", "CLICK")' if drew_bug or drew_cg else None
    }

    plt.close('all')

for thread in tqdm.tqdm(save_threads, desc='Saving figures'):
    thread.join()

print(f'All {len(save_threads)} done! - {datetime.datetime.now()}')

relation_overview_pd = pd.DataFrame(relation_overview.values())
relation_overview_pd.to_excel(output_path + 'relation_overview_boxplots.xlsx', index=False)

Processing R4 - Reviewer types: : 1it [00:00,  7.40it/s, I7 - # Introducing issue commits [sum]]
Saving figures: 100%|██████████| 1/1 [00:00<00:00, 22.98it/s]

All 1 done! - 2025-04-11 09:32:13.679610





## Numeric Pairs

Spearman's rho

In [89]:
num_vars = get_num_variables()

In [90]:
relation_overview = {}
spearman_significance_count = []

variables_df = get_variables_df()

variables_a_done = set()

save_threads = []

TEST_SPECIFIC_PAIR = (None, None)
# TEST_SPECIFIC_PAIR = ('I5', 'ML5')

data_a = num_vars[num_vars['ID'] == TEST_SPECIFIC_PAIR[0]] if TEST_SPECIFIC_PAIR[0] is not None else num_vars
for id_a, name_a, scale_a, aggregation_a in tqdm.tqdm(data_a[['ID', 'Name', 'Scale', 'Aggregation']].values):
    if is_identifier(name_a):
        continue

    column_a = f'{id_a} - {name_a}' + (f' [{aggregation_a}]' if pd.notna(aggregation_a) else '')

    variables_a_done.add(id_a)
    
    data_b = num_vars[num_vars['ID'] == TEST_SPECIFIC_PAIR[1]] if TEST_SPECIFIC_PAIR[1] is not None else num_vars
    for id_b, name_b, scale_b, aggregation_b in data_b[['ID', 'Name', 'Scale', 'Aggregation']].values:
        if is_identifier(name_b):
            continue

        if id_b in variables_a_done:
            continue

        column_b = f'{id_b} - {name_b}' + (f' [{aggregation_b}]' if pd.notna(aggregation_b) else '')

        variables_ab_df = variables_df.copy()
        variables_ab_df = transform_columns(variables_ab_df, [column_a, column_b], [scale_a, scale_b])

        variables_ab_df_bg = variables_ab_df[variables_ab_df['CG'] == False]
        variables_ab_df_cg = variables_ab_df[variables_ab_df['CG'] == True]

        rho_bg, p_value_bg = spearmanr(variables_ab_df_bg[column_a], variables_ab_df_bg[column_b])
        rho_cg, p_value_cg = spearmanr(variables_ab_df_cg[column_a], variables_ab_df_cg[column_b])

        bg_significant = None
        cg_significant = None
        if not math.isnan(p_value_bg):
            bg_significant = p_value_bg <= 0.05
            spearman_significance_count.append(bg_significant)
        if not math.isnan(p_value_cg):
            cg_significant = p_value_cg <= 0.05
            spearman_significance_count.append(cg_significant)


        if bg_significant and abs(rho_bg) < 0.2:
            print("WARNING: BG significant but low correlation for ", column_a, column_b)
        if cg_significant and abs(rho_cg) < 0.2:
            print("WARNING: CG significant but low correlation for ", column_a, column_b)

        if bg_significant is None and cg_significant is None:
            relation_overview[f'{column_a} {column_b}'] = set_error(column_a, column_b, 'BG and CG: Not enough data')
            continue

        relation_different = abs(rho_bg - rho_cg) > 0.2    

        group = get_group(bg_significant, cg_significant, relation_different)

        relation_overview[f'{column_a} {column_b}'] = {
            'column_a': column_a,
            'column_b': column_b,
            'group': group,
            'reason': f'Spearman correlation: BG: {round(rho_bg, 2)} (p-value: {p_value_bg}), CG: {round(rho_cg, 2)} (p-value: {p_value_cg})'
        }

print(f'significant results: {sum(spearman_significance_count) / len(spearman_significance_count)} out of {len(spearman_significance_count)}')

print(f'Done - {datetime.datetime.now()}')

relation_overview_pd = pd.DataFrame(relation_overview.values())
relation_overview_pd.to_excel(output_path + 'relation_overview_spearmanr.xlsx', index=False)

relation_overview_pd['group'].value_counts()

  rho_bg, p_value_bg = spearmanr(variables_ab_df_bg[column_a], variables_ab_df_bg[column_b])
100%|██████████| 15/15 [00:00<00:00, 117.71it/s]

significant results: 0.17699115044247787 out of 113
Done - 2025-04-10 16:40:55.620176





group
31     51
1      20
E      18
32     10
2.1     4
3.3     2
Name: count, dtype: int64

## Zip

In [91]:
get_zip()

In [92]:
# get_zip()

In [93]:
print(len([i for i in spearman_significance_count if i is not None]), len(fisher_significance_count))

significance_count = spearman_significance_count + fisher_significance_count

print(f'significant results: {sum(significance_count) / len(significance_count)}')



113 138
significant results: [0.16733068]
