In [46]:
import numpy as np
import pandas as pd
from collections import Counter
from scipy import stats
import statsmodels.api as sm
from statsmodels.stats.anova import AnovaRM
from statsmodels.stats.multitest import multipletests

EPOCH_MINUTES = 0.5 #min
WINDOW_SIZE = 5

In [47]:
df = pd.read_excel("Data/ref.xlsx", sheet_name="Problem 4", header=[0, 1])


      被试1                     被试2                     被试3                  \
  Night 1 Night 2 Night 3 Night 1 Night 2 Night 3 Night 1 Night 2 Night 3   
0     4.0     4.0     4.0     4.0     4.0     2.0     4.0     4.0     4.0   
1     4.0     4.0     4.0     4.0     4.0     2.0     4.0     4.0     4.0   
2     4.0     4.0     4.0     4.0     4.0     2.0     4.0     4.0     4.0   
3     4.0     4.0     4.0     4.0     4.0     2.0     4.0     4.0     4.0   
4     4.0     2.0     4.0     4.0     4.0     2.0     4.0     4.0     4.0   

      被试4  ...     被试8     被试9                    被试10                  \
  Night 1  ... Night 3 Night 1 Night 2 Night 3 Night 1 Night 2 Night 3   
0     4.0  ...     4.0       4     4.0     4.0     4.0     4.0     4.0   
1     4.0  ...     4.0       4     4.0     4.0     4.0     4.0     4.0   
2     4.0  ...     4.0       4     4.0     4.0     4.0     4.0     4.0   
3     4.0  ...     4.0       4     4.0     2.0     4.0     4.0     4.0   
4     4.0  ...  

In [48]:
sample_1_1 = df[("被试1", "Night 1")].to_numpy()
sample_1_2 = df[("被试1", "Night 2")].to_numpy()
sample_1_3 = df[("被试1", "Night 3")].to_numpy()
sample_2_1 = df[("被试2", "Night 1")].to_numpy()
sample_2_2 = df[("被试2", "Night 2")].to_numpy()
sample_2_3 = df[("被试2", "Night 3")].to_numpy()
sample_3_1 = df[("被试3", "Night 1")].to_numpy()
sample_3_2 = df[("被试3", "Night 2")].to_numpy()
sample_3_3 = df[("被试3", "Night 3")].to_numpy()

sample_list = [sample_1_1,
    sample_1_2,
    sample_1_3,
    sample_2_1,
    sample_2_2,
    sample_2_3,
    sample_3_1,
    sample_3_2,
    sample_3_3]

print(len(sample_1_1))
nan_count = np.sum(np.isnan(sample_1_1))
nan_count

1064


351

In [49]:
def clear_nan(sample_list: list) -> list:
    cleaned_list = []
    for sample in sample_list:
        not_nan_indices = np.where(~np.isnan(sample))[0]
        if len(not_nan_indices) == 0:
            cleaned_arr = np.array([])
        else:
            last_valid_index = not_nan_indices[-1]
            cleaned_arr = sample[:last_valid_index + 1]
        cleaned_list.append(cleaned_arr)
    return cleaned_list


def check_wake(sample_list: list) -> list:
    checked_list = []
    for sample in sample_list:
        sample = np.array(sample)
        
        # 补充末尾4.0
        if sample.size == 0 or not np.isclose(sample[-1], 4.0):
            sample = np.append(sample, 4.0)
        
        # 计算尾部连续4.0数量
        count_4 = 0
        for v in sample[::-1]:
            if np.isclose(v, 4.0):
                count_4 += 1
            else:
                break
        
        # 截断数组，保留一个4.0
        if count_4 > 1:
            sample = sample[:len(sample) - count_4 + 1]
        
        checked_list.append(sample)
    return checked_list


def smooth_data(sample_list: list, window_size: int = WINDOW_SIZE) -> list:
    assert window_size % 2 == 1, "window_size 必须是奇数"
    half_w = window_size // 2

    smoothed_list = []
    for sample in sample_list:
        sample = np.array(sample)
        length = len(sample)
        smoothed = np.empty_like(sample)

        for i in range(length):
            # 窗口边界
            start = max(0, i - half_w)
            end = min(length, i + half_w + 1)
            window = sample[start:end]

            # 统计窗口内最频繁的阶段
            most_common_stage = Counter(window).most_common(1)[0][0]
            smoothed[i] = most_common_stage
        
        smoothed_list.append(smoothed)
    
    return smoothed_list

def preprocess_data(sample_list: list):
    cleaned_list = clear_nan(sample_list)
    smoothed_list = smooth_data(cleaned_list)
    checked_list = check_wake(smoothed_list)
    return checked_list

In [64]:
def get_total_sleep_time(sample_list: list, epoch_seconds: int = EPOCH_MINUTES) -> list:

    TST_list = []
    for sample in sample_list:
        sample = np.array(sample)
        
        # 非清醒阶段的布尔数组（清醒编码为4）
        sleep_mask = (sample != 4)
        
        # 统计非清醒阶段数目 * 每阶段秒数
        total_sleep_sec = np.sum(sleep_mask) * epoch_seconds
        TST_list.append(total_sleep_sec)
    return TST_list


def get_sleep_efficiency(sample_list: list, epoch_seconds: int = EPOCH_MINUTES) -> list:

    SE_list = []
    for sample in sample_list:
        sample = np.array(sample)
        total_time_sec = len(sample) * epoch_seconds
        sleep_mask = (sample != 4)
        TST_sec = np.sum(sleep_mask) * epoch_seconds
        
        # 防止除零
        if total_time_sec == 0:
            SE = 0.0
        else:
            SE = (TST_sec / total_time_sec) * 100
        
        SE_list.append(SE)
    return SE_list


def get_sleep_onsite_latency(sample_list: list, epoch_seconds: int = EPOCH_MINUTES) -> list:

    SOL_list = []
    for sample in sample_list:
        sample = np.array(sample)
        
        # 找到首次非清醒阶段索引
        sleep_indices = np.where(sample != 4)[0]
        if sleep_indices.size == 0:
            
            # 没有进入睡眠阶段，SOL为整个时间
            sol = len(sample) * epoch_seconds
        else:
            sol = sleep_indices[0] * epoch_seconds
        SOL_list.append(sol)
    return SOL_list

def get_N3_percentage(sample_list: list, epoch_seconds: int = EPOCH_MINUTES) -> list:

    N3_percent_list = []
    for sample in sample_list:
        sample = np.array(sample)
        
        # 总睡眠时间 TST（非清醒）
        sleep_mask = (sample != 4)
        TST_sec = np.sum(sleep_mask) * epoch_seconds
        
        # 深睡眠时间 N3 (编码为3)
        N3_mask = (sample == 3)
        N3_sec = np.sum(N3_mask) * epoch_seconds
        
        # 避免除零错误
        if TST_sec == 0:
            N3_percent = 0.0
        else:
            N3_percent = (N3_sec / TST_sec) * 100
        
        N3_percent_list.append(N3_percent)
    return N3_percent_list


def get_REM_percentage(sample_list: list, epoch_seconds: int = EPOCH_MINUTES) -> list:

    REM_percent_list = []
    for sample in sample_list:
        sample = np.array(sample)
        sleep_mask = (sample != 4)
        TST_sec = np.sum(sleep_mask) * epoch_seconds
        
        REM_mask = (sample == 5)
        REM_sec = np.sum(REM_mask) * epoch_seconds
        
        if TST_sec == 0:
            REM_percent = 0.0
        else:
            REM_percent = (REM_sec / TST_sec) * 100
        
        REM_percent_list.append(REM_percent)
    return REM_percent_list


def get_number_of_awakenings(sample_list: list) -> list:
    awakenings_list = []
    for sample in sample_list:
        sample = np.array(sample)
        is_wake = (sample == 4)
        awakenings = np.sum((~is_wake[:-1]) & (is_wake[1:]))

        # 如果最后状态是清醒，认为最后的醒来是最终起床，不计入
        if is_wake[-1] and awakenings > 0:
            awakenings -= 1
        
        awakenings_list.append(int(awakenings))
    return awakenings_list


def get_key_metrics(preprocessed_sample_list: list) -> dict:
    preprocessed_sample_list = preprocess_data(sample_list)
    TST_list = get_total_sleep_time(preprocessed_sample_list)
    SE_list = get_sleep_efficiency(preprocessed_sample_list)
    SOL_list = get_sleep_onsite_latency(preprocessed_sample_list)
    N3_perc_list = get_N3_percentage(preprocessed_sample_list)
    REM_perc_list = get_REM_percentage(preprocessed_sample_list)
    awakenings_list = get_number_of_awakenings(preprocessed_sample_list)

    key_metrics = {
        "TST": TST_list,
        "SE": SE_list,
        "SOL": SOL_list,
        "N3%": N3_perc_list,
        "REM%": REM_perc_list,
        "Number of Awakenings": awakenings_list
    }
    return key_metrics

"""
preprocessed_sample_list = preprocess_data(sample_list)
TST_list = get_total_sleep_time(preprocessed_sample_list)
SE_list = get_sleep_efficiency(preprocessed_sample_list)
SOL_list = get_sleep_onsite_latency(preprocessed_sample_list)
N3_perc_list = get_N3_percentage(preprocessed_sample_list)
REM_perc_list = get_REM_percentage(preprocessed_sample_list)
awakenings_list = get_number_of_awakenings(preprocessed_sample_list)
print("TST",TST_list[0])
print("SE", SE_list[1])
print("SOL", SOL_list[0])
print("N3", N3_perc_list[0])
print("REM", REM_perc_list[0])
print("awake", awakenings_list[0])
print (preprocessed_sample_list[0])
"""

In [83]:
from scipy.stats import shapiro
import pingouin as pg


def build_df_from_metrics(key_metrics, subject_ids, environments):
    """
    将 key_metrics 字典转成 DataFrame，附加 subject 和 environment 列
    """
    df = pd.DataFrame(key_metrics)
    df['subject'] = subject_ids
    df['environment'] = environments
    return df


def normality_test(df, metric):
    print(f"正态性检验: {metric}")
    for env in df['environment'].unique():
        vals = df[df['environment'] == env][metric].values
        stat, p = shapiro(vals)
        print(f"  环境 {env}: stat={stat:.3f}, p={p:.3f} -> {'近似正态' if p>0.05 else '非正态'}")


def sphericity_test(df, metric):
    df_wide = df.pivot(index='subject', columns='environment', values=metric)
    result = pg.sphericity(df_wide)
    w_stat = result.W
    p_value = result.pval
    print(f"球形检验: {metric} -> W={w_stat:.3f}, p={p_value:.3f}, {'满足球形' if p_value>0.05 else '违背球形'}")

def run_rm_anova(df, metric, subject_col='subject', within_col='environment'):
    """
    对单个指标运行重复测量 ANOVA (RM-ANOVA)
    
    args:
        df (pd.DataFrame): 数据集
        metric (str): 要分析的因变量列名
        subject_col (str): 被试 ID 列名
        within_col (str): 重复测量自变量列名
    
    return:
        DataFrame: RM-ANOVA 结果
    """
    aov = pg.rm_anova(dv=metric, within=within_col, subject=subject_col, data=df, detailed=True)
    return aov

In [84]:
preprocessed_sample_list = preprocess_data(sample_list)
key_metrics_dict = get_key_metrics(preprocessed_sample_list)


subject_ids = [1,1,1,2,2,2,3,3,3]  # 长度=样本数，如 [1,1,1,2,2,2,...]
environments = ['A','B','C','A','B','C','A','B','C'] # 长度=样本数，如 ['A','B','C','A','B','C',...]

key_metrics_df = build_df_from_metrics(key_metrics_dict, subject_ids, environments)
print(df.head())

for metric in key_metrics.keys():
    normality_test(df, metric)
    sphericity_test(df, metric)
    print()

     TST         SE   SOL        N3%       REM%  Number of Awakenings  \
0  314.0  89.971347  15.5  15.286624  20.222930                     9   
1  316.5  91.210375   2.5  24.960506  22.906793                    10   
2  292.5  92.125984   5.5  26.153846  25.470085                     9   
3  329.0  68.828452  16.0  17.021277  31.458967                     4   
4  352.5  83.038869   5.0  31.347518  29.361702                     3   

   subject environment  
0        1           A  
1        1           B  
2        1           C  
3        2           A  
4        2           B  
正态性检验: TST
  环境 A: stat=0.949, p=0.567 -> 近似正态
  环境 B: stat=0.988, p=0.790 -> 近似正态
  环境 C: stat=0.962, p=0.625 -> 近似正态
球形检验: TST -> W=0.128, p=0.358, 满足球形

正态性检验: SE
  环境 A: stat=0.922, p=0.459 -> 近似正态
  环境 B: stat=0.895, p=0.370 -> 近似正态
  环境 C: stat=0.930, p=0.488 -> 近似正态
球形检验: SE -> W=0.015, p=0.121, 满足球形

正态性检验: SOL
  环境 A: stat=0.781, p=0.070 -> 近似正态
  环境 B: stat=0.789, p=0.087 -> 近似正态
  环境 C: stat=0.825

In [None]:
import pingouin as pg

# 假设df为DataFrame，列包含 'subject', 'environment', 'metric_name'

anova_results = {}
for metric in key_metrics.keys():
    aov = pg.rm_anova(dv=metric, within='environment', subject='subject', data=df, detailed=True)
    print(f"ANOVA results for {metric}:\n", aov)
    anova_results[metric] = aov
    
    if aov.loc[0, 'p-unc'] < 0.05:
        print(f"Post-hoc tests for {metric}:")
        posthocs = pg.pairwise_ttests(dv=metric, within='environment', subject='subject', data=df, padjust='bonf')
        print(posthocs[['A', 'B', 'T', 'p-corr', 'p-adjust']])
