<h2>Python R code analysis</h2>

<h4><u>Parsing and beautifing data</u></h4>

In [None]:
# Install and load the necessary packages
import matplotlib.pyplot as plt
import seaborn as sns
import json
import pandas as pd
import re
import numpy as np
from scipy.stats import kurtosis, skew, shapiro, spearmanr, pearsonr 
from statsmodels.robust.scale import mad
from FisherExact import fisher_exact
from matplotlib import ticker
from relis_types.FieldClassificationType import FieldClassificationType
from relis_types.Multivalue import Multivalue
from relis_types.Policies import Policies
from relis_types.Variable import Variable
from relis_types.DataFrame import DataFrame
from relis_types.NominalVariables import NominalVariables
from relis_types.NominalDataFrame import NominalDataFrame
from relis_types.ContinuousVariables import ContinuousVariables
from relis_types.ContinuousDataFrame import ContinuousDataFrame

Utils

Should we create an utils class/file ?

In [None]:
def removeEmptyStrings(df: pd.DataFrame) -> None:
    df.replace('', np.nan, inplace=True)

def get_variable(field_name: str, variables) -> Variable:
    return variables[field_name].value

def split_multiple_values(value):
    if not pd.isna(value):
        return [item.strip() for item in re.split(rf'\{Multivalue.SEPARATOR.value}', value)] 
    return [value]

def process_multiple_values(values: pd.Series, multiple: bool):
    if (multiple):
        return values.apply(lambda x: split_multiple_values(x))

    return values.apply(lambda x: [x])

Project data objects will not be in JSON files. They will be accessible directly from the relis application environment

In [None]:

with open('../data/relis_classification_rsc_CV.json', 'r', encoding='utf8') as f:
   classification_data: list[dict[str, str]] =  json.loads(f.read())

print(classification_data)

Preprocessing data for analysis 

In [None]:

# Split config file based on data type
def filter_row_by_field_type(paper, field_type):
    pd_row = {key: value["value"] for key, value in paper.items() if value['type'] == field_type}
    return pd_row

nominal_data = pd.DataFrame([filter_row_by_field_type(paper, FieldClassificationType.NOMINAL.value) for paper in classification_data])
continuous_data = pd.DataFrame([filter_row_by_field_type(paper, FieldClassificationType.CONTINUOUS.value) for paper in classification_data])

nominal = NominalDataFrame(nominal_data, NominalVariables)
continuous = ContinuousDataFrame(continuous_data, ContinuousVariables)

# For test purpose ?
if (Policies.DROPNA.value):
    removeEmptyStrings(nominal.data)
    removeEmptyStrings(continuous.data)


<h4><u>DESCRIPTIVE STATS</u></h4>

In [None]:
def beautify_data_desc(field_name: str, data: pd.DataFrame):
    # Get metadata
    variable = get_variable(field_name, NominalVariables)

    # Split the values by the "|" character and flatten the result
    split_values = process_multiple_values(data[field_name], variable.multiple)
    flattened_values = np.concatenate(split_values)

    # Generate the frequency table
    freq_table = pd.Series(flattened_values, dtype=str).value_counts().reset_index()
    freq_table.columns = ['value', 'n']

    # Calculate the percentage
    freq_table['percentage'] = (freq_table['n'] / freq_table['n'].sum()) * 100

    return freq_table

<h5 style="color:orange">Frequency tables<h5>

What do we do with the empty values ? Do they need to be part of the statistics calculation. <br>
Currently, I'll proceed by deleting them in the begining of the file

In [None]:
desc_distr_vector = {NominalVariables[field_name]: beautify_data_desc(field_name, nominal.data)
                      for field_name in nominal.data.columns}

print(desc_distr_vector)

What do we do with the empty values ? Do they need to be part of the statistics calculation

<h5 style="color:orange">Bar Plots<h5>

In [None]:
def generate_bar_plot(field_name: str, data: pd.DataFrame):
    df = beautify_data_desc(field_name, data)
    
    if (len(df) == 0):
        return

    # Set the theme
    sns.set_theme(style="whitegrid")

    # Create the plot
    plt.figure(figsize=(10, 6))
    barplot = sns.barplot(data=df, x="value", y="percentage", hue="n")

    # Get metadata
    variable = get_variable(field_name, NominalVariables)

    # Set labels and title
    title = f"{variable.title} ~ Bar plot"
    barplot.set_title(title)
    barplot.set_xlabel(variable.title)
    barplot.set_ylabel("Percentage")

    return barplot.figure

bar_plot_vector = {NominalVariables[field_name]: generate_bar_plot(field_name, nominal.data)
                    for field_name in nominal.data.columns}

print(desc_distr_vector)

<h5 style="color:orange">Statistics
<h5>

In [None]:
def generate_statistics(field_name: str, data: pd.DataFrame):
    series =  data[field_name]
    
    series.replace('', np.nan, inplace=True)
    
    if (len(data) == 0):
        return

    nan_policy = 'omit' if Policies.DROPNA.value else 'propagate'
    results = {
    "vars": 1,
    "n": series.count(),
    "mean": series.mean(),
    "sd": series.std(),
    "median": series.median(),
    "trimmed": series[series.between(series.quantile(0.25), series.quantile(0.75))].mean(),
    "mad": mad(series),
    "min": series.min(),
    "max": series.max(),
    "range": series.max() - series.min(),
    "skew": skew(series, nan_policy=nan_policy),
    "kurtosis": kurtosis(series, nan_policy=nan_policy, fisher=True),
    "se": series.std() / np.sqrt(series.count())  
    }
    return results

statistics_vector = {ContinuousVariables[field_name]: generate_statistics(field_name, continuous.data)
                      for field_name in continuous.data.columns}
print(statistics_vector)

<h5 style="color:orange">Box Plots<h5>

In [None]:
def generate_box_plot(field_name: str, data: pd.DataFrame):
    series = data[field_name]

    variable = get_variable(field_name, ContinuousVariables)

    # Create the box plot
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=series, color='lightblue')

    # Overlay the mean point
    mean_value = series.mean()
    plt.scatter(x=0, y=mean_value, color='red', s=50, zorder=3)  # s is the size of the point

    # Set the title and labels
    title = f"{variable.title} ~ Box plot"
    plt.title(title)
    plt.ylabel(variable.title)
    plt.xlabel('')

    plt.gca().yaxis.set_major_formatter(ticker.FormatStrFormatter('%0.0f'))
    plt.close()
    return plt

box_plot_vector = {ContinuousVariables[field_name]: generate_box_plot(field_name, continuous.data)
                    for field_name in continuous.data.columns}

box_plot_vector[ContinuousVariables.publication_year].show()

<h5 style="color:orange">Violin Plots<h5>

In [None]:
def generate_violin_plot(field_name: str, data: pd.DataFrame):
    series = data[field_name]
    
    variable = get_variable(field_name, ContinuousVariables)

    plt.figure(figsize=(10, 6))
    sns.violinplot(data=series, color="lightgray")

    plt.title(f"{variable.title} ~ Violin plot")
    plt.ylabel(variable.title)
    plt.xlabel("Density")
    plt.xticks([])

    return plt

violin_plot_vector = {ContinuousVariables[field_name]: generate_violin_plot(field_name, continuous.data)
                       for field_name in continuous.data.columns}
print(violin_plot_vector)

<h4><u>EVOLUTIVE STATS</u></h4>

Is Publication.year a classfication field which is mandatory for each of the project ? <br>
It's currently hard coded and doesn't follow any patern.

In [None]:
def beautify_data_evo(field_name: str, publication_year: pd.Series, variable: Variable, data: pd.DataFrame):
    series = data[field_name]
    
    # Create new DataFrame with specified columns
    subset_data = pd.DataFrame({
        'Year': publication_year,
        'Value': process_multiple_values(series, variable.multiple)
    })
    
    subset_data = subset_data.explode('Value')

    # Remove rows with empty values
    subset_data = subset_data[(subset_data['Value'] != '')]

    subset_data = subset_data.groupby(['Year', 'Value']).size().reset_index(name='Frequency')

    return subset_data

<h5 style="color:pink">Frequency tables<h5>

In [None]:
def expand_data(field_name: str, publication_year: pd.Series, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)

    subset_data = beautify_data_evo(field_name, publication_year, variable, data)

    # Pivoting the data
    subset_data = subset_data.pivot(index='Year', columns='Value', values='Frequency').fillna(0)

    subset_data.columns.name = None
    subset_data.reset_index(inplace=True)

    return subset_data 


evo_distr_vector = {NominalVariables[field_name]: expand_data(field_name, continuous.data["publication_year"], nominal.data)
                       for field_name in nominal.data.columns}

<h5 style="color:pink">Evolution Plots<h5>

In [None]:
def generate_evo_plot(field_name: str, publication_year: pd.Series, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)
    
    subset_data = beautify_data_evo(field_name, publication_year, variable, data)

    # Create a plot
    plt.figure(figsize=(10, 6))
    sns.lineplot(data=subset_data, x='Year', y='Frequency', hue='Value', style='Value', markers=True)

    # Setting title, labels, and theme
    plt.title(f"{variable.title} ~ Evolution plot")
    plt.xlabel('Year')
    plt.ylabel('Frequency')
    plt.grid(True)
    return plt

evolution_plot_vector = {NominalVariables[field_name]: generate_evo_plot(field_name, continuous.data["publication_year"], nominal.data)
                          for field_name in nominal.data.columns}
print(evolution_plot_vector)

<h4><u>COMPARATIVE STATS</u></h4>

In [None]:
def beautify_data_comp(field_name: str, dependency_field_name: str,
                        variable: Variable, dependency_variable: Variable, data: pd.DataFrame):    
    subset_data = pd.DataFrame({
        field_name: data[field_name],
        dependency_field_name: data[dependency_field_name]
    })
    
    # Filtering out rows where any of the variables is empty
    subset_data = subset_data[(subset_data[field_name] != "") & (subset_data[dependency_field_name] != "")]

    # Splitting the strings and expanding into separate rows
    subset_data[field_name] = process_multiple_values(subset_data[field_name], variable.multiple)
    subset_data = subset_data.explode(field_name)

    subset_data[dependency_field_name] = process_multiple_values(subset_data[dependency_field_name],
                                                                  dependency_variable.multiple)
    subset_data = subset_data.explode(dependency_field_name)

    # Counting occurrences
    subset_data = subset_data.groupby([field_name, dependency_field_name]).size().reset_index(name='Frequency')

    return subset_data

def evaluate_comparative_dependency_field(field_name: str, dataFrame: DataFrame, strategy):
    """
    Perform a statistical analysis strategy for each 
    dependency field of a given classification field.
    Act as a wrapper for the comparative statistical
    functions
    """
    field_names = list(dataFrame.data.columns)

    return {dataFrame.variable_type[dependency_field_name]: strategy(field_name, dependency_field_name, dataFrame.data)
             for dependency_field_name in field_names if dependency_field_name != field_name}

<h5 style="color:#98c377">Frequency Tables<h5>

In [None]:
def generate_comparative_violin_plot(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)
    dependency_variable = get_variable(dependency_field_name, NominalVariables)

    return beautify_data_comp(field_name, dependency_field_name,
                                      variable, dependency_variable, data)

violin_plot_vector = {NominalVariables[field_name]: evaluate_comparative_dependency_field(field_name, nominal, generate_comparative_violin_plot)
                       for field_name in nominal.data.columns}

print(violin_plot_vector[NominalVariables.industrial][NominalVariables.domain])

<h5 style="color:#98c377">Bar Plots<h5>

In [None]:
def generate_stacked_bar_plot(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)
    dependency_variable = get_variable(dependency_field_name, NominalVariables)

    subset_data = beautify_data_comp(field_name, dependency_field_name,
                                      variable, dependency_variable, data)

    if subset_data.empty: return

    # Pivot the data to get a matrix form
    pivoted_data = subset_data.pivot(index=field_name, columns=dependency_field_name, values='Frequency')

    # Replace NaN values with 0
    pivoted_data = pivoted_data.fillna(0)

    # Plot
    plt.figure(figsize=(10, 6))

    # Bottom value for stacking
    bottom_value = pd.Series([0] * pivoted_data.shape[0], index=pivoted_data.index)

    for col in pivoted_data.columns:
        plt.bar(pivoted_data.index, pivoted_data[col], bottom=bottom_value, label=col)
        bottom_value += pivoted_data[col]

    plt.title(f"{variable.title} and {dependency_variable.title} ~ Stacked bar plot")
    plt.xlabel(variable.title)
    plt.ylabel('Frequency')
    plt.legend(title=dependency_field_name)

    return plt

stacked_bar_plot_vector = {NominalVariables[field_name]: evaluate_comparative_dependency_field(field_name, nominal, generate_stacked_bar_plot)
                       for field_name in nominal.data.columns}
print(stacked_bar_plot_vector)

<h5 style="color:#98c377">Grouped Bar Plots<h5>

Only diverge from the stacked bar plot by the dodge attribute set to True

In [None]:
def generate_grouped_bar_plot(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)
    dependency_variable = get_variable(dependency_field_name, NominalVariables)

    subset_data = beautify_data_comp(field_name, dependency_field_name,
                                      variable, dependency_variable, data)

    if subset_data.empty: return

    plt.figure(figsize=(10, 6))
    sns.barplot(x=field_name, y='Frequency', hue=dependency_field_name, data=subset_data, dodge=True)

    plt.title(f"{variable.title} and {dependency_variable.title} ~ Grouped bar plot")
    plt.gca().set_xlabel('')
    plt.ylabel('Frequency')

    return plt

grouped_bar_plot_vector = {NominalVariables[field_name]: evaluate_comparative_dependency_field(field_name, nominal, generate_grouped_bar_plot)
                       for field_name in nominal.data.columns}
print(grouped_bar_plot_vector)

<h5 style="color:#98c377">Bubble Charts<h5>

Legend in the original script has more steps for the values (step value of 0.5)<br>
This type of figure will benefit from coloring the dots based on the frequency

In [None]:
def generate_bubble_chart(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)
    dependency_variable = get_variable(dependency_field_name, NominalVariables)

    subset_data = beautify_data_comp(field_name, dependency_field_name,
                                      variable, dependency_variable, data)

    if subset_data.empty: return

    # Creating the bubble chart
    plt.figure(figsize=(10, 6))
    sns.scatterplot(data=subset_data, x=field_name, y=dependency_field_name, size='Frequency', color='black')

    # Adding labels and title
    plt.title(f"{variable.title} and {dependency_variable.title} ~ Bubble Chart")
    plt.gca().set_xlabel('')
    plt.gca().set_ylabel('')

    return plt

bubble_chart_vector = {NominalVariables[field_name]: evaluate_comparative_dependency_field(field_name, nominal, generate_bubble_chart)
                       for field_name in nominal.data.columns}
print(bubble_chart_vector)

<h5 style="color:#98c377">Fisher's Exact Test<h5>

The library used: https://github.com/maclandrol/FisherExact has an obsolete numpy dtype which causes the test to fail when the simulate_pval is set to True. An issue has been opened to let the devs know about the problem.

simulate_pval is mandatory to make the R results reproducdive in the python env.

To temporary fix the issue, the library should be pull from git and rebuild with the fix

1) change np.float to np.float32 in the fact = np.zeros(wkslimit + 1, dtype=np.float, order='F') assignation 
2) python3 setup.py install to build the library locally

In [None]:
def fisher_exact_test(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    variable = get_variable(field_name, NominalVariables)
    dependency_variable = get_variable(dependency_field_name, NominalVariables)

    subset_data = beautify_data_comp(field_name, dependency_field_name,
                                      variable, dependency_variable, data)

    if subset_data.empty: return

    # Check for the condition where there's only one row and both variables are NaN
    if len(subset_data) == 1 and pd.isna(subset_data[field_name]).all() and pd.isna(subset_data[dependency_field_name]).all():
        return

    # Create contingency table
    contingency_table = pd.crosstab(subset_data[field_name], subset_data[dependency_field_name],
                                     values=subset_data['Frequency'], aggfunc='sum', dropna=False).fillna(0)

    # Perform Fisher's Exact Test
    fisher_result = fisher_exact(contingency_table, simulate_pval=True)

    # return fisher_result
    return fisher_result

fisher_exact_test_vector = {NominalVariables[field_name]: evaluate_comparative_dependency_field(field_name, nominal, fisher_exact_test)
                       for field_name in nominal.data.columns}
print(fisher_exact_test_vector)

<h5 style="color:#98c377">Shapiro Wilk's Correlation Test<h5>

In [None]:
## Shapiro Wilk's Correlation Test

def shapiro_wilk_test(field_name: str, continuous_df: pd.DataFrame):
    subset_data = continuous_df[field_name].fillna(0)

    shapiro_result = shapiro(subset_data)

    return shapiro_result

shapiro_wilk_test_vector = {ContinuousVariables[field_name]: shapiro_wilk_test(field_name, continuous.data)
                          for field_name in continuous.data.columns}
print(shapiro_wilk_test_vector)

<h5 style="color:#98c377">Pearson's Correlation Test<h5>

In [None]:
def pearson_cor_test(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    _, pvalue = shapiro_wilk_test_vector[ContinuousVariables[field_name]]
    _, dpvalue = shapiro_wilk_test_vector[ContinuousVariables[dependency_field_name]]

    if not (pvalue > 0.05 and dpvalue > 0.05): return
    
    # Perform Pearson's correlation test
    pearson_coefficient, p_value = pearsonr(data[field_name].fillna(0), data[dependency_field_name].fillna(0))

    return pearson_coefficient, p_value

pearson_cor_test_vector = {ContinuousVariables[field_name]: evaluate_comparative_dependency_field(field_name, continuous, pearson_cor_test)
                       for field_name in continuous.data.columns}

print(pearson_cor_test_vector)

<h5 style="color:#98c377">Spearman's Correlation Test<h5>

In [22]:
def spearman_cor_test(field_name: str, dependency_field_name: str, data: pd.DataFrame):
    _, pvalue = shapiro_wilk_test_vector[ContinuousVariables[field_name]]
    _, dpvalue = shapiro_wilk_test_vector[ContinuousVariables[dependency_field_name]]

    if  pvalue > 0.05 and dpvalue > 0.05: return
  
    # Perform Spearman's correlation test
    spearman_result = spearmanr(data[field_name].fillna(0), data[dependency_field_name].fillna(0))

    return spearman_result

spearman_cor_test_vector = {ContinuousVariables[field_name]: evaluate_comparative_dependency_field(field_name, continuous, spearman_cor_test)
                       for field_name in continuous.data.columns}

print(spearman_cor_test_vector)

{<ContinuousVariables.publication_year: <relis_types.Variable.Variable object at 0x7fa619bc9fc0>>: {<ContinuousVariables.targeted_year: <relis_types.Variable.Variable object at 0x7fa619bca050>>: SignificanceResult(statistic=0.12423546683420235, pvalue=0.7694442841303335)}, <ContinuousVariables.targeted_year: <relis_types.Variable.Variable object at 0x7fa619bca050>>: {<ContinuousVariables.publication_year: <relis_types.Variable.Variable object at 0x7fa619bc9fc0>>: SignificanceResult(statistic=0.12423546683420235, pvalue=0.7694442841303335)}}
