# Classical statistical hypothesis testing
Approaches to determine correlation by analysing statistical probabilities purely in the data

Author: {{ cookiecutter.author_name }}
Created: {{ cookiecutter.timestamp }}

## How to use the notebook

The following cells:
- specify objective, variables, and variable types,
- set up the statistical tests,
- read dataset,
- present results from the tests,

By default, the notebook is set up to run with an example (wine quality). To see how it works, run the notebook without changing the code.

For your project, adjust the code in the linked cells with your objectives, variables, dataset etc. and then execute all cells in order.

Please refer to classical_ht.board for detailed instructions.

In [ ]:
# Link to project experiments folder hypothesis_experiment_learnings.board (refresh and hit enter on this line to see the link)

### Imports

In [0]:
import numpy as np
import pandas as pd
import scipy.stats as stats
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import STL

from bioinfokit.analys import stat 

import matplotlib.pyplot as plt

### Project

In [0]:
experiment_name = '{{cookiecutter.use_case_name}}'  # please provide a name for the hypothesis testing experiment

### Dataset

In [0]:
time_series = False
path = '{{cookiecutter.data_path}}' # Specify the filepath of the data eg. './data/file.csv'
project_data_folder_prefix = './../../../../data/'

# If you are working in a Halerium project template folder, and the notebook is in the /experiments folder, and the data is in the /data folder
# Uncomment the next line to append the prefix of the relative data path from the experiments folder. 
#path = project_data_folder_prefix + path

if path =='default example':
    # Wine quality example dataset
    path = 'https://raw.githubusercontent.com/erium/halerium-example-data/main/hypothesis_testing/WineQT.csv'

if time_series:
    df = pd.read_csv(path, parse_dates=['Date'], index_col="Date")
else:
    df = pd.read_csv(path, sep=None)
df

### Features

In [0]:
# Array of ['feature name', 'type'] where type is 'continuous', 'binary_categorical', 'multi_categorical'
x = [['residual sugar', 'continuous']]
y = [['quality', 'multi_categorical']]

time_features = []

### Level of Significance

In [0]:
significance = 0.05 # Level of significance

In [0]:
x_cont = [feature[0] for feature in x if feature[1] == 'continuous']
y_cont = [feature[0] for feature in y if feature[1] == 'continuous']
x_binary = [feature[0] for feature in x if feature[1] == 'binary_categorical']
y_binary = [feature[0] for feature in y if feature[1] == 'binary_categorical']
x_multi = [feature[0] for feature in x if feature[1] == 'multi_categorical']
y_multi = [feature[0] for feature in y if feature[1] == 'multi_categorical']

In [0]:
num_samples = df.shape[0]
print('Number of samples:', num_samples)

In [0]:
results_x = {x_para[0]:[] for x_para in x}
results_y = {y_para[0]:[] for y_para in y}
results = {'x': [], 'y': [], 'test': [], 'passed': []}

## Statistical Tests
Some tests may be skipped if there are no x-y pairs that correspond to the test

### Time series Hypothesis Test
1. Check if stationary (Dickey-Fuller test)
2. Look at residuals of the time series - check if they follow normal distribution (D'Agostino and Pearson's)

In [0]:
time_results = {'stationary': [], 'normal residuals': []}
for time_feature in time_features:
    df_time = df[time_feature]
    plt.plot(df_time)
    plt.xlabel('Time')
    plt.ylabel(time_feature)
    plt.show()
    stationarity = adfuller(df_time)
    pvalue = stationarity[1]
    print("Stationarity pvalue:", pvalue)

    results['x'].append(time_feature)
    results['y'].append('-Time Series-')
    results['test'].append('stationarity')
    if pvalue <= significance:
        print(time_feature, "is stationary at significance", significance)
        time_results['stationary'].append(True)
        results['passed'].append(True)
    else:
        print(time_feature, "is not stationary at significance", significance)
        time_results['stationary'].append(False)
        results['passed'].append(False)
    
    stl = STL(df_time, period=7)
    res = stl.fit()
    fig = res.plot()
    resid = res.resid
    k2, p = stats.normaltest(resid)
    print("Normal Residuals pvalue:", p)

    results['x'].append(time_feature)
    results['y'].append('-Time Series-')
    results['test'].append('normal residuals')
    if p > significance:
        print(time_feature, "residuals follow a normal distribution at significance", significance)
        time_results['normal residuals'].append(True)
        results['passed'].append(True)
    else:
        print(time_feature, "residuals do not follow a normal distribution at significance", significance)
        time_results['normal residuals'].append(False)
        results['passed'].append(False)

In [0]:
if time_features:
    time_df = pd.DataFrame(time_results, index=time_features)
    time_df

## Linear Correlation
For continuous-continuous features

In [0]:
df_corr = df.corr()
df_corr[y_cont].loc[x_cont]

In [0]:
fig_h = 5
fig_w = 8

fig, ax = plt.subplots(len(x_cont), len(y_cont))
fig.set_figheight(fig_h * len(x_cont))
fig.set_figwidth(fig_w * len(y_cont))
if len(x_cont) == 1:
    for i in range(len(y)):
        slope, intercept, r, p, stderr = stats.linregress(df[x_cont[0]], df[y_cont[i]])
        ax[i].scatter(df[x_cont[0]], df[y_cont[i]])
        ax[i].plot(df[x_cont[0]], intercept + slope * df[x_cont[0]], color='r')
        ax[i].set_xlabel(x_cont[0])
        ax[i].set_ylabel(y_cont[i])
elif len(y_cont) == 1:
    for i in range(len(x_cont)):
        slope, intercept, r, p, stderr = stats.linregress(df[x_cont[i]], df[y_cont[0]])
        ax[i].scatter(df[x_cont[i]], df[y_cont[0]])
        ax[i].plot(df[x_cont[i]], intercept + slope * df[x_cont[i]], color='r')
        ax[i].set_xlabel(x_cont[i])
        ax[i].set_ylabel(y_cont[0])
else:
    for i in range(len(x_cont)):
        for j in range(len(y_cont)):
            slope, intercept, r, p, stderr = stats.linregress(df[x_cont[i]], df[y_cont[j]])
            ax[i, j].scatter(df[x_cont[i]],df[y_cont[j]])
            ax[i, j].plot(df[x_cont[i]], intercept + slope * df[x_cont[i]], color='r')
            ax[i, j].set_xlabel(x_cont[i])
            ax[i, j].set_ylabel(y_cont[j])
plt.show()

Univariate approach

In [0]:
# Univariate (open to selection bias)
for i in range(len(x_cont)):
    for j in range(len(y_cont)):
        X = sm.add_constant(df[x_cont[i]])
        result = sm.OLS(df[y_cont[j]], X).fit()
        print('Feature:', x_cont[i], 'Compared to:', y_cont[j])
        results_as_html = result.summary().tables[1].as_html()
        results_df = pd.read_html(results_as_html, header=0, index_col=0)[0].iloc[1:]
        #print(result.summary().tables[1])
        significant_corr_features = results_df.loc[results_df['P>|t|'] <= significance]
        results['x'].append(x_cont[i])
        results['y'].append(y_cont[j])
        results['test'].append('uni')
        if not significant_corr_features.empty:
            print("Correlated features at significance:", significance)
            print(significant_corr_features)
            results['passed'].append(True)
        else:
            print("Features not correlated at significance level:", significance)
            results['passed'].append(False)
        print("__________")

Multivariate Approach

In [0]:
# Multivariate (open to confounding bias)
for j in range(len(y_cont)):
    X = sm.add_constant(df[x_cont])
    result = sm.OLS(df[y_cont[j]], X).fit()
    print('Feature:', y_cont[j])
    results_as_html = result.summary().tables[1].as_html()
    results_df = pd.read_html(results_as_html, header=0, index_col=0)[0].iloc[1:]
    #print(result.summary().tables[1])
    significant_corr_features = results_df.loc[results_df['P>|t|'] <= significance]
    if not significant_corr_features.empty:
        print("Correlated features at significance:", significance)
        print(significant_corr_features)
        results_y[y_cont[j]].append([x_cont, 'multi', True])
        for x_para in list(significant_corr_features.index):
            results['x'].append(x_para)
            results['y'].append(y_cont[j])
            results['test'].append('multi')
            results['passed'].append(True)
        for x_para in list(results_df.loc[results_df['P>|t|'] > significance].index):
            results['x'].append(x_para)
            results['y'].append(y_cont[j])
            results['test'].append('multi')
            results['passed'].append(False)
    else:
        print("Features not correlated at significance level:", significance)
        for x_para in list(results_df.loc[results_df['P>|t|'] > significance].index):
            results['x'].append(x_para)
            results['y'].append(y_cont[j])
            results['test'].append('multi')
            results['passed'].append(False)
        
    print("__________")

### ANOVA (Analysis of Variance)
For continuous-non-binary discrete

Null hypothesis: Group means are equal (No effect on the categorical variable)
Alternative hypothesis: At least one group mean is different from other group means (Effect on the categorical variable)

Note: Pairwise comparison using Tukey's honestly significantly differenced test to find which are the significant treatments (discrete)

In [0]:
# One way ANOVA
for x in x_multi:
    discrete_values = list(set(df[x]))
    for y in y_cont:
        discrete_sets = []
        print("Feature:", y, "compared to discrete:", x)
        for value in discrete_values:
            discrete_sets.append(df[y].loc[df[x] == value])
        plt.boxplot(discrete_sets, labels=discrete_values)
        plt.xlabel(x)
        plt.ylabel(y)
        plt.show()
        fvalue, pvalue = stats.f_oneway(*discrete_sets)
        print("fvalue:", fvalue, "pvalue:", pvalue)
        results['x'].append(x)
        results['y'].append(y)
        results['test'].append('anova')
        if pvalue < significance:
            print("Group mean of" , y, "affected at significance:", significance)
            df_discrete = pd.DataFrame(discrete_sets, index=discrete_values).T
            df_discrete = pd.melt(df_discrete.reset_index(),id_vars=['index'], value_vars=discrete_values)
            df_discrete.columns =['index', 'treatments', 'value']
            res = stat()
            res.tukey_hsd(df=df_discrete, res_var='value', xfac_var='treatments', anova_model='value ~ C(treatments)')
            print("Pairwise comparison")
            print(res.tukey_summary)
            results['passed'].append(True)
        else:
            print("Group mean of", y, "not affected at significance:", significance)
            results['passed'].append(False)

## t-test
For binary categorical - continuous

In [0]:
for x in x_binary:
    discrete_values = list(set(df[x]))
    for y in y_cont:
        discrete_sets = []
        print("Feature:", y, "compared to discrete:", x)
        for value in discrete_values:
            discrete_sets.append(df[y].loc[df[x] == value])
        plt.boxplot(discrete_sets, labels=discrete_values)
        plt.xlabel(x)
        plt.ylabel(y)
        plt.show()
        fvalue, pvalue = stats.ttest_ind(*discrete_sets)
        print("fvalue:", fvalue, "pvalue:", pvalue)
        results['x'].append(x)
        results['y'].append(y)
        results['test'].append('t_test')
        if pvalue < significance:
            print("Group mean of" , y, "affected at significance:", significance)
            results['passed'].append(True)
        else:
            print("Group mean of", y, "not affected at significance:", significance)
            results['passed'].append(False)

## Chi-square Test of independence
For multi categorical with contingency table

In [0]:
for x in x_multi:
    discrete_values = list(set(df[x]))
    for y in y_multi:
        print("Feature:", y, "compared to:", x)
        contingency_table = pd.crosstab(index=df[x], columns=df[y], margins=True)
        print(contingency_table)
        chi2, pvalue, dof, ex = stats.chi2_contingency(contingency_table)
        results['x'].append(x)
        results['y'].append(y)
        results['test'].append('chi2')
        if pvalue < significance:
            print("Group mean of" , y, "affected at significance:", significance)
            results['passed'].append(True)
        else:
            print("Group mean of", y, "not affected at significance:", significance)
            results['passed'].append(False)

### Results
Note that both dataframes in 'sorted by x' and 'sorted by y' presents the SAME results with different sorting

In [0]:
results_df = pd.DataFrame(results)
results_df

Sorted by x

In [0]:
results_sort_x = results_df.sort_values(by=['y'])
results_sort_x = results_sort_x.sort_values(by=['x'])
index = pd.MultiIndex.from_frame(results_sort_x[['x', 'y']])
results_sort_x.index = index
results_sort_x = results_sort_x[['test', 'passed']]
results_sort_x

Sorted by y

In [0]:
results_sort_y = results_df.sort_values(by=['x'])
results_sort_y = results_sort_y.sort_values(by=['y'])
index = pd.MultiIndex.from_frame(results_sort_y[['y', 'x']])
results_sort_y.index = index
results_sort_y = results_sort_y[['test', 'passed']]
results_sort_y