In [None]:
"""
Although statistically weak, the mixed effects models from past analysis trials point to some variation of stress levels by period of the day. Divide the stress data from each time bin into morning, afternoon, evening/night for all days. Non-parametric ANOVA
"""
"""
1. Import subjective data as before (focus on stress dimension first and then expand to other dimensions)
2. Organise them into 15 minute bins
3. Group these bins into categories of early morning, morning, noon (afternoon) and night. Use the same time divisions as give_binned_vals_category does to bin values
4. Check each group for normality (owing to less number of observations, likely that non-parametric tests needed)
5. Conduct non-parametric (or parametric if applicable) ANOVA on the data and tabulate and visualise results
6. Do the above for per day and all day (all days together)
"""

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy.stats import shapiro
from scipy.stats import kruskal #for independent groups
from scipy.stats import friedmanchisquare #for dependent or paired groups
import seaborn as sns

In [None]:
from l2script_functions import giv_x_y_vals, give_binned_vals, give_binned_vals_category

In [None]:
folder1 = 'empatica'
folder2 = 'saved_figures'

folder11 = 'aggr_p_min'
folder12 = 'avro_files'
folder13 = 'avro2csv'
folder14 = 'preprocessed_files_debug'
folder141 = 'data_preproc_debug'

In [None]:
#first, need to collect all the TET data for the participant
mainfolder = input('enter subject folder: ')
ger = True

dict_TET_x1, dict_TET_y1,  x_new1, y_new1, x1, y1 = giv_x_y_vals(mainfolder, 'q1', ger) #has duplicates (asd_001)
dict_TET_x2, dict_TET_y2,  x_new2, y_new2, x2, y2 = giv_x_y_vals(mainfolder, 'q2', ger) #has duplicates and days with missing data (asd_001)
dict_TET_x3, dict_TET_y3,  x_new3, y_new3, x3, y3 = giv_x_y_vals(mainfolder, 'q3', ger) #has duplicates (hc_002) #has days with missing data (asd_001)
dict_TET_x4, dict_TET_y4,  x_new4, y_new4, x4, y4 = giv_x_y_vals(mainfolder, 'q4', ger) #has days with missing data (asd_001)
dict_TET_x5, dict_TET_y5,  x_new5, y_new5, x5, y5 = giv_x_y_vals(mainfolder, 'q5', ger) #has duplicates and days with missing data (asd_001)
dict_TET_x6, dict_TET_y6,  x_new6, y_new6, x6, y6 = giv_x_y_vals(mainfolder, 'q6', ger) #has duplicates (hc_002) #has days with missing data (asd_001)
dict_TET_x7, dict_TET_y7,  x_new7, y_new7, x7, y7 = giv_x_y_vals(mainfolder, 'q7', ger)
dict_TET_x8, dict_TET_y8,  x_new8, y_new8, x8, y8 = giv_x_y_vals(mainfolder, 'q8', ger) #has duplicates and days with missing data (asd_001) -> but for the day that it had duplicate data (15_3_24_n7_16_3_24_d) the data was identical so all good
dict_TET_x9, dict_TET_y9,  x_new9, y_new9, x9, y9 = giv_x_y_vals(mainfolder, 'q9', ger) #has duplicates (hc_002) #has days with missing data (asd_001)


In [None]:
#Now, obtain the binned dictionaries for each dimension
"""
#e.g:
dim_q8 = {}
for key in dict_TET_x8:
    x_val = (dict_TET_x8[key])*6
    y_val = dict_TET_y8[key]
    dim_q8[key] = give_binned_vals(x_val, y_val, '15')
"""

dict_TET_x = [dict_TET_x1, dict_TET_x2, dict_TET_x3, dict_TET_x4, dict_TET_x5, dict_TET_x6, dict_TET_x7, dict_TET_x8, dict_TET_x9]
dict_TET_y = [dict_TET_y1, dict_TET_y2, dict_TET_y3, dict_TET_y4, dict_TET_y5, dict_TET_y6, dict_TET_y7, dict_TET_y8, dict_TET_y9]

dim_q = {}

for i in range(9):
    dim_q[f'dim_q{i+1}'] = {}
    for key in dict_TET_x[i]:
        x_val = dict_TET_x[i][key] * 6
        y_val = dict_TET_y[i][key]
        dim_q[f'dim_q{i+1}'][key] = give_binned_vals(x_val, y_val, '15')

In [None]:
"""
for dim in dim_q.keys():
    print(dim)
    for day in dim_q[dim].keys():
        print(day)
        """
for dim in dim_q.keys():   
        print(dim)
        for day in dim_q[dim].keys():
            print(day)
            for i in range(0,len(list(dim_q[dim][day].keys()))):
                binStartTime = float(list(dim_q[dim][day].keys())[i].split('_')[0])
                print(binStartTime)

In [None]:
#Group these bins into categories of early morning, morning, noon (afternoon) and night. Use the same time divisions as give_binned_vals_category does with bin values
#grouped dictionaries for every dimension
def group_bin_day_period(dim_q):
    bin_arr = np.arange(0,25,6) #going by #3: Group these bins into categories of early morning, morning, noon (afternoon) and night. Use the same time divisions as give_binned_vals_category does with bin values
    #grouped dictionaries for every dimension
    earlyMorning = {}
    morning = {}
    afterNoon = {}
    night = {}
    
    for dim in dim_q.keys():
        earlyMorning[dim] = {}
        morning[dim] = {}
        afterNoon[dim] = {}
        night[dim] = {}
        for day in dim_q[dim].keys():
            earlyMorning[dim][day] = []
            morning[dim][day] = []
            afterNoon[dim][day] = []
            night[dim][day] = []
            for i in range(0,len(list(dim_q[dim][day].keys()))):
                binStartTime = float(list(dim_q[dim][day].keys())[i].split('_')[0])
                if binStartTime >= bin_arr[0] and binStartTime < bin_arr[1]:
                    earlyMorning[dim][day].append(list(dim_q[dim][day].items())[i])
                elif binStartTime >= bin_arr[1] and binStartTime < bin_arr[2]:
                    morning[dim][day].append(list(dim_q[dim][day].items())[i])
                elif binStartTime >= bin_arr[2] and binStartTime < bin_arr[3]:
                    afterNoon[dim][day].append(list(dim_q[dim][day].items())[i])
                else:
                    night[dim][day].append(list(dim_q[dim][day].items())[i])
            earlyMorning[dim][day] = dict(earlyMorning[dim][day])
            morning[dim][day] = dict(morning[dim][day])
            afterNoon[dim][day] = dict(afterNoon[dim][day])
            night[dim][day] = dict(night[dim][day])

    return earlyMorning, morning, afterNoon, night
                
earlyMorning, morning, afterNoon, night = group_bin_day_period(dim_q)
night    

In [None]:
dayPeriod = {'earlyMorning': earlyMorning, 'morning': morning, 'afterNoon': afterNoon, 'night': night}
for period, periodDict in dayPeriod.items():
    print(period)
    #print(periodDict)
print(period)
#print(periodDict)

In [None]:
#Step 4 onwards requires a separate set one for per day analysis and the other for all days together

#Check each group for normality (owing to less number of observations, likely that non-parametric tests needed)
dayPeriod = {'earlyMorning': earlyMorning, 'morning': morning, 'afterNoon': afterNoon, 'night': night}
shapiroResults = {}
for dim in dim_q:
    shapiroResults[dim] = {}
    for day in dim_q[dim]:
        shapiroResults[dim][day] = {}
        for period, periodDict in dayPeriod.items():
            shapiroResults[dim][day][period] = {}
            #normality test of earlyMorning[dim][day], morning[dim][day], afterNoon[dim][day], night[dim][day]          
            filtered_data = {key: value for key, value in periodDict[dim][day].items() if value != -5000}
            dataValues = list(filtered_data.values())
            if len(dataValues)>2:
                    try:
                        data_range = max(dataValues) - min(dataValues)
                        if data_range == 0:
                            print(f"Warning: Zero range data for {dim}, {day}, {period}")
                            shapiroResults[dim][day][period]['stat'] = None
                            shapiroResults[dim][day][period]['p_val'] = None
                            shapiroResults[dim][day][period]['normal_yes_or_no'] = None
                            shapiroResults[dim][day][period]['data_length'] = len(dataValues)
                            continue
                        #when range != 0, run shapiro
                        stat_eM, p_val_eM = shapiro(dataValues)
                        if np.isnan(stat_eM) and not np.isnan(p_val_eM):
                            print(f"Warning: stat is nan but p val not nan for {dim}, {day}, {period}, but p val is {p_val_eM} and length of data after filteration is {len(dataValues)}")
                        if p_val_eM>0.05:
                            normal_yn = 1 #normal distribution
                        else:
                            normal_yn = 0 #not normal distribution
                        shapiroResults[dim][day][period]['stat'] = stat_eM
                        shapiroResults[dim][day][period]['p_val'] = p_val_eM
                        shapiroResults[dim][day][period]['normal_yes_or_no'] = normal_yn
                        shapiroResults[dim][day][period]['data_length'] = len(dataValues)
                    except Exception as e:
                        print(f"Error in Shapiro test for {dim}, {day}, {period}: {str(e)}")
                        shapiroResults[dim][day][period]['stat'] = None
                        shapiroResults[dim][day][period]['p_val'] = None
                        shapiroResults[dim][day][period]['normal_yes_or_no'] = None
                        shapiroResults[dim][day][period]['data_length'] = len(dataValues)
            else:
                    shapiroResults[dim][day][period]['stat'] = None
                    shapiroResults[dim][day][period]['p_val'] = None
                    shapiroResults[dim][day][period]['normal_yes_or_no'] = None
                    shapiroResults[dim][day][period]['data_length'] = len(dataValues)
        

In [None]:
shapiroResults

In [None]:
dayPeriod

In [None]:
#Conduct non-parametric (or parametric if applicable) ANOVA on the data and tabulate and visualise results
#KRUSKAL
#if a value in any group was missing, the corresponding values in the groups were also taken out. But it is possible that this is not required for a kruskal-wallis test. However, kruskal-wallis test may not be applicable in this use case. So if applicable, evaluate.

npAnovaResults = {}

for dim in dim_q:
    npAnovaResults[dim] = {}
    for day in dim_q[dim]:
        npAnovaResults[dim][day] = {}
        periodDaily = {'earlyMorning': list(earlyMorning[dim][day].values()), 'morning': list(morning[dim][day].values()), 'afterNoon': list(afterNoon[dim][day].values()), 'night': list(night[dim][day].values())}
        dfAnovaDay = pd.DataFrame(periodDaily)
        
        
        #if >50%data ==-5000 in a column, drop the column        
        for col in dfAnovaDay.columns:
            if (dfAnovaDay[col] == -5000).sum() > 0.5 * len(dfAnovaDay):
                dfAnovaDay.drop(columns=[col], inplace=True)
        
       #drop rows where any value in the row is -5000)        
        dfAnovaDay = dfAnovaDay[~dfAnovaDay.isin([-5000]).any(axis=1)]
        
        #each remaining column of the data frame taken as each group
       
        if len(dfAnovaDay.columns) >= 2:
            stat, pVal = kruskal(*[dfAnovaDay[col] for col in dfAnovaDay.columns])        
            npAnovaResults[dim][day]['stat'] = stat
            npAnovaResults[dim][day]['p_val'] = pVal
            npAnovaResults[dim][day]['groups'] = list(dfAnovaDay.columns)
            npAnovaResults[dim][day]['final data length'] = len(dfAnovaDay)
        else:
            npAnovaResults[dim][day]['stat'] = None
            npAnovaResults[dim][day]['p_val'] = None
            npAnovaResults[dim][day]['groups'] = list(dfAnovaDay.columns)
            npAnovaResults[dim][day]['final data length'] = len(dfAnovaDay)

In [None]:
npAnovaResults['dim_q1']

In [None]:
#Conduct non-parametric (or parametric if applicable) ANOVA on the data and tabulate and visualise results
#FRIEDMAN
npAnovaResults = {}
for dim in dim_q:
    npAnovaResults[dim] = {}
    for day in dim_q[dim]:
        npAnovaResults[dim][day] = {}
        periodDaily = {'earlyMorning': list(earlyMorning[dim][day].values()), 'morning': list(morning[dim][day].values()), 'afterNoon': list(afterNoon[dim][day].values()), 'night': list(night[dim][day].values())}
        dfAnovaDay = pd.DataFrame(periodDaily)
        
        #if >50%data ==-5000 in a column, drop the column   
        for col in dfAnovaDay.columns:
            if (dfAnovaDay[col] == -5000).sum() > 0.5 * len(dfAnovaDay):
                dfAnovaDay.drop(columns=[col], inplace=True)
                
        #drop rows where any value in the row is -5000)
        dfAnovaDay = dfAnovaDay[~dfAnovaDay.isin([-5000]).any(axis=1)]
        
        #each remaining column of the data frame taken as each group
        if len(dfAnovaDay.columns) >= 3:
            stat, pVal = friedmanchisquare(*[dfAnovaDay[col] for col in dfAnovaDay.columns])        
            npAnovaResults[dim][day]['stat'] = stat
            npAnovaResults[dim][day]['p_val'] = pVal
            npAnovaResults[dim][day]['groups'] = list(dfAnovaDay.columns)
            npAnovaResults[dim][day]['final data length'] = len(dfAnovaDay)
        else:
            npAnovaResults[dim][day]['stat'] = None
            npAnovaResults[dim][day]['p_val'] = None
            npAnovaResults[dim][day]['groups'] = list(dfAnovaDay.columns)
            npAnovaResults[dim][day]['final data length'] = len(dfAnovaDay)

            
        

In [None]:
npAnovaResults['dim_q8'] 

In [None]:
"""
For all days pooled together
#all day npANOVA
#friedman
"""

npAnovaResultsAllDays = {}

for dim in dim_q:  # Iterate over dimensions
    # Initialize storage for this dimension
    npAnovaResultsAllDays[dim] = {}
    
    # Aggregate data across all days for each period
    periodAllDays = {'earlyMorning': [], 'morning': [], 'afterNoon': [], 'night': []}
    for day in dim_q[dim]:
        for period in periodAllDays.keys():
            # Collect data from all days into the corresponding period
            periodAllDays[period].extend(list(eval(period)[dim][day].values()))
    
    # Create a DataFrame for all days combined
    dfAnovaAllDays = pd.DataFrame(periodAllDays)
    
    # Drop columns if >50% of data is -5000
    for col in dfAnovaAllDays.columns:
        if (dfAnovaAllDays[col] == -5000).sum() > 0.5 * len(dfAnovaAllDays):
            dfAnovaAllDays.drop(columns=[col], inplace=True)
    
    # Drop rows where any value == -5000
    dfAnovaAllDays = dfAnovaAllDays[~dfAnovaAllDays.isin([-5000]).any(axis=1)]
    
    # Perform friedman test if there are at least 2 groups (columns)
    if len(dfAnovaAllDays.columns) >= 3:
        stat, pVal = friedmanchisquare(*[dfAnovaAllDays[col] for col in dfAnovaAllDays.columns])
        
        # Store results
        npAnovaResultsAllDays[dim]['stat'] = stat
        npAnovaResultsAllDays[dim]['p_val'] = pVal
        npAnovaResultsAllDays[dim]['groups'] = list(dfAnovaAllDays.columns)
        npAnovaResultsAllDays[dim]['final data length'] = len(dfAnovaAllDays)
    else:
        # Insufficient data for test
        npAnovaResultsAllDays[dim]['stat'] = None
        npAnovaResultsAllDays[dim]['p_val'] = None
        npAnovaResultsAllDays[dim]['groups'] = list(dfAnovaAllDays.columns)
        npAnovaResultsAllDays[dim]['final data length'] = len(dfAnovaAllDays)


In [None]:
npAnovaResultsAllDays

In [None]:
#NOW TO TEST WHICH DAY PERIODS WERE SIGNIFICANTLY DIFFERENT FROM WHICH

from scipy.stats import wilcoxon
from itertools import combinations
try:
    from scikit_posthocs import posthoc_nemenyi_friedman
except ImportError:
    print("scikit-posthocs not installed. Only Wilcoxon test will be available.")

In [None]:
#run this before visualisation
%matplotlib inline

In [None]:
#all day npANOVA
#friedman

def friedman_posthoc_with_viz(df, dim_name):
    """
    Perform and visualize post-hoc analysis after significant Friedman test
    df: pandas DataFrame where columns are groups
    dim_name: name of dimension being analyzed (for plot titles)
    """
    results = {}
    
    # 1. Wilcoxon with Bonferroni correction
    groups = list(df.columns)
    n_comparisons = len(groups) * (len(groups) - 1) / 2
    alpha = 0.05
    bonferroni_alpha = alpha / n_comparisons
    
    wilcoxon_results = {}
    # Matrix to store p-values for heatmap
    p_value_matrix = np.zeros((len(groups), len(groups)))
    
    for i, j in combinations(range(len(groups)), 2):
        group1, group2 = groups[i], groups[j]
        stat, p_val = wilcoxon(df[group1], df[group2])
        wilcoxon_results[f"{group1} vs {group2}"] = {
            'statistic': stat,
            'p_value': p_val,
            'significant': p_val < bonferroni_alpha
        }
        # Fill both sides of the matrix for the heatmap
        p_value_matrix[i, j] = p_val
        p_value_matrix[j, i] = p_val
    
    results['wilcoxon'] = wilcoxon_results
    
    # Visualization
    plt.figure(figsize=(12, 5))
    
    # 1. Box plot
    plt.subplot(1, 2, 1)
    sns.boxplot(data=df)
    plt.title(f'Distribution of Values Across Groups\n{dim_name}')
    plt.xticks(rotation=45)
    
    # 2. Heatmap of p-values
    plt.subplot(1, 2, 2)
    mask = np.triu(np.ones_like(p_value_matrix, dtype=bool))  # mask upper triangle
    sns.heatmap(p_value_matrix, 
                mask=mask,
                xticklabels=groups,
                yticklabels=groups,
                annot=True,  # Show numbers
                fmt='.3f',   # Format to 3 decimal places
                cmap='RdYlBu_r',  # Red for significant, blue for non-significant
                vmin=0,
                vmax=0.05)
    plt.title('Pairwise Comparison p-values\n(significant if < {:.3f})'.format(bonferroni_alpha))
    plt.xticks(rotation=45)
    plt.tight_layout()
    
    # Print interpretation
    print(f"\nResults Interpretation for {dim_name}:")
    print("=" * 50)
    print(f"Bonferroni-corrected significance level: {bonferroni_alpha:.4f}")
    print("\nSignificant differences found between:")
    significant_pairs = []
    for pair, result in wilcoxon_results.items():
        if result['significant']:
            significant_pairs.append(f"- {pair} (p={result['p_value']:.4f})")
    if significant_pairs:
        print("\n".join(significant_pairs))
    else:
        print("No significant differences found after Bonferroni correction")
    
    return results, plt.gcf()  # Return both results and figure

#end of func


npAnovaResultsAllDays = {}

for dim in dim_q:  # Iterate over dimensions
    # Initialize storage for this dimension
    npAnovaResultsAllDays[dim] = {}
    
    # Aggregate data across all days for each period
    periodAllDays = {'earlyMorning': [], 'morning': [], 'afterNoon': [], 'night': []}
    for day in dim_q[dim]:
        for period in periodAllDays.keys():
            # Collect data from all days into the corresponding period
            periodAllDays[period].extend(list(eval(period)[dim][day].values()))
    
    #Create a DataFrame for all days combined
    dfAnovaAllDays = pd.DataFrame(periodAllDays)

     # Drop columns where >50% of data is -5000
    for col in dfAnovaAllDays.columns:
        if (dfAnovaAllDays[col] == -5000).sum() > 0.5 * len(dfAnovaAllDays):
            dfAnovaAllDays.drop(columns=[col], inplace=True)
    
    # Drop rows where any value is -5000
    dfAnovaAllDays = dfAnovaAllDays[~dfAnovaAllDays.isin([-5000]).any(axis=1)]
    
    # Perform friedmann test if there are at least 2 groups (columns)
    if len(dfAnovaAllDays.columns) >= 3 and len(dfAnovaAllDays) > 2 and all(dfAnovaAllDays[col].notna().sum() > 0 for col in dfAnovaAllDays.columns) and len(set(dfAnovaAllDays[col].notna().sum() for col in dfAnovaAllDays.columns)) == 1:
        stat, pVal = friedmanchisquare(*[dfAnovaAllDays[col] for col in dfAnovaAllDays.columns])
        if pVal < 0.05:  # If Friedman test is significant
            posthoc_results, fig  = friedman_posthoc_with_viz(dfAnovaAllDays, dim)
            plt.savefig(os.path.join(mainfolder, "all_day_subjective_dim_corr_post_hoc.png"), bbox_inches='tight', dpi=300)
            plt.show()
            plt.close()

            npAnovaResultsAllDays[dim]['posthoc'] = posthoc_results
        else:
            npAnovaResultsAllDays[dim]['posthoc'] = []
        # Store results
        npAnovaResultsAllDays[dim]['stat'] = stat
        npAnovaResultsAllDays[dim]['p_val'] = pVal
        npAnovaResultsAllDays[dim]['groups'] = list(dfAnovaAllDays.columns)
        npAnovaResultsAllDays[dim]['final data length'] = len(dfAnovaAllDays)
    else:
        # Insufficient data for test
        npAnovaResultsAllDays[dim]['stat'] = None
        npAnovaResultsAllDays[dim]['p_val'] = None
        npAnovaResultsAllDays[dim]['groups'] = list(dfAnovaAllDays.columns)
        npAnovaResultsAllDays[dim]['final data length'] = len(dfAnovaAllDays)


In [None]:
npAnovaResultsAllDays

In [None]:
dfAnovaAllDays