# Before Analysis...

In [1]:
import numpy as np
import mne
import sklearn
import matplotlib.pyplot as plt
from scipy import stats
from decimal import Decimal, getcontext
import pandas as pd
import ast
import openpyxl as op

# 분석대상 이름 입력
name = input("분석대상 이름을 입력하세요.")
purpose = input("목적을 입력하세요. TFR or loading")

if purpose == "TFR":
    print(f"TFR을 선택하였습니다.")
    # fif 파일 불러오기
    file_path = rf'H:\Mg_EEG\edf_files\{name}_7200_clean.fif'
    clean_data = mne.io.read_raw_fif(file_path, preload=True)

    print(f"fif 파일을 성공적으로 불러왔습니다: {file_path}")
    
    # Computing TFR
    print(f"TFR을 계산 중 입니다.")
    tfr = clean_data.compute_tfr(method='multitaper', freqs=np.arange(1, 31), tmin=0, tmax=7199, n_jobs=-1, reject_by_annotation=False)
    print(f"TFR 계산이 완료되었습니다.")

    # TFR 파일 저장
    saving_path = rf'H:\Mg_EEG\tfr_files\{name}_7200_tfr.h5'
    tfr.save(saving_path, overwrite=True)
    print(f"TFR 파일이 성공적으로 저장되었습니다.")

    # Saving raw file for back-up
    tfr_raw = tfr.copy()

elif purpose == "loading":
    print(f"loading을 선택하였습니다.")
    # Loading saved TFR file
    file_path = rf'H:\Mg_EEG\tfr_files\{name}_7200_tfr.h5'
    tfr = mne.time_frequency.read_tfrs(file_path)
    print(f"성공적으로 tfr 파일을 로딩하였습니다.")

    # Saving raw file for back-up
    tfr_raw = tfr.copy()

else:
    print("올바른 값을 입력하시오")
    quit()

def tfr_mean(tfr):
    mean_electrode = np.mean(tfr.data, axis=0) # Averaging across electrode
    mean_frequency = np.mean(mean_electrode, axis=0) # Averaging across frequency
    return(mean_frequency)

TFR을 선택하였습니다.
Opening raw data file H:\Mg_EEG\edf_files\정광훈2_7200_clean.fif...


  clean_data = mne.io.read_raw_fif(file_path, preload=True)


    Range : 0 ... 1439800 =      0.000 ...  7199.000 secs
Ready.
Reading 0 ... 1439800  =      0.000 ...  7199.000 secs...
fif 파일을 성공적으로 불러왔습니다: H:\Mg_EEG\edf_files\정광훈2_7200_clean.fif
TFR을 계산 중 입니다.


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.


OSError: [WinError 1450] 시스템 리소스가 부족하기 때문에 요청한 서비스를 완성할 수 없습니다

In [None]:
#raw fif file 보고 싶다면 사용
#clean_data.plot()

In [None]:
#데이터가 뭔가 이상할 때 리셋하는 복구 코드
#tfr = tfr_raw

In [None]:
# large artifact data load
csv_file_path = r'C:\Users\esin4\OneDrive\바탕 화면\Github\Mg_infusion_coma\large_artifact.csv'

large_artifact_data = pd.read_csv(csv_file_path, encoding='utf-8-sig')

# 입력한 이름이 데이터프레임에 존재하는지 확인하고 좌표 가져오기
if name in large_artifact_data['Name'].values:
    # 입력한 이름에 해당하는 데이터 가져오기
    coordinates = large_artifact_data[large_artifact_data['Name'] == name]['Coordinates'].values[0]
    
    # 좌표가 'skip'이면 빈 리스트로 설정
    if coordinates == 'skip':
        large_artifact = []
    else:
        # 좌표 문자열을 리스트로 변환
        import ast
        large_artifact = ast.literal_eval(coordinates)
else:
    print(f"{input_name}이(가) 데이터에 없습니다.")
    large_artifact = []

# 결과 출력
print("large_artifact 리스트:")
print(large_artifact)

In [None]:
# 분석한 값 저장할 엑셀 파일
wb = op.load_workbook(r"C:\Users\esin4\OneDrive\바탕 화면\Github\Mg_infusion_coma\Mg_infusion_data.xlsx")
row = int(input("데이터가 입력될 행을 입력하세요."))

# independent t-test

In [None]:
# Before vs After segmentation
tfr_before = tfr.copy().crop(tmin=0, tmax=3600, include_tmax=False)
tfr_after = tfr.copy().crop(tmin=3600, tmax=7199, include_tmax=False)

# Averaging
before_mean = tfr_mean(tfr_before)
after_mean = tfr_mean(tfr_after)

# Indexing about Large artifact
before_idx = []
after_idx = []

for t_start, t_end in large_artifact:
    if t_start < 3600 and t_end < 3600:
        before_idx.extend(range(t_start*200, t_end*200))
    elif t_start < 3600 and t_end >= 3600:
        before_idx.extend(range(t_start*200, 3600*200))
        after_idx.extend(range(0, (t_end-3600)*200))
    else:
        after_idx.extend(range((t_start-3600)*200, (t_end-3600)*200))

# Removing Large artifacts
before_mean_clean = np.delete(before_mean, before_idx)
after_mean_clean = np.delete(after_mean, after_idx)

# independent t-test
# precision
getcontext().prec = 50

print(f"{name}, independent t test")
# mean of before vs after
mean_before = np.mean(before_mean_clean)
mean_after = np.mean(after_mean_clean)
print("mean_before : \n", mean_before)
print("mean_after : \n", mean_after)

# t-test
t_stat, p_value = stats.ttest_ind(before_mean_clean, after_mean_clean)

# print to decimal object
p_value_decimal = Decimal(p_value)
print ("t_statistics : \n", t_stat)
p_value_str = f"{p_value_decimal:.100f}"
print(f"p-value: \n {p_value_str}")

# Define Cohens'D 
def cohens_d(group1, group2):
    # Calculate mean, sd
    mean1, mean2 = np.mean(group1), np.mean(group2)
    std1, std2 = np.std(group1, ddof=1), np.std(group2, ddof=1)
    
    # sample size
    n1, n2 = len(group1), len(group2)
    
    # Calculate pooled sd
    pooled_std = np.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
    
    # Cohen's d
    d = (mean1 - mean2) / pooled_std
    return d

# Calculate Effect Size
effect_size = cohens_d(before_mean, after_mean)
print("effect size \n", effect_size)

In [None]:
ws_i = wb["independent t"]
ws_i.cell(row=row, column=1).value = f"{name}"
ws_i.cell(row=row, column=2).value = mean_before
ws_i.cell(row=row, column=3).value = mean_after
ws_i.cell(row=row, column=4).value = t_stat
ws_i.cell(row=row, column=5).value = p_value_str
ws_i.cell(row=row, column=7).value = effect_size

# Paired t-test

In [None]:
# Before vs After segmentation
tfr_before = tfr.copy().crop(tmin=0, tmax=3600, include_tmax=False)
tfr_after = tfr.copy().crop(tmin=3600, tmax=7199, include_tmax=False)

# Averaging
before_mean = tfr_mean(tfr_before)
after_mean = tfr_mean(tfr_after)

# trim the data 무조건 7200초 맞춰서 이 부분 안 써도 되도록 해야함. 
if len(before_mean) > len(after_mean):
    before_mean = before_mean[:len(after_mean)]
elif len(before_mean) < len(after_mean):
    after_mean = after_mean[:len(before_mean)]
else:
    quit

len(before_mean) == len(after_mean)

# Indexing about large artifact

before_idx = set()
after_idx = set()

for t_start, t_end in large_artifact:
    if t_start < 3600 and t_end < 3600:
        before_idx.update(range(t_start*200, t_end*200))
        after_idx.update(range(t_start*200, t_end*200))
    elif t_start < 3600 and t_end >= 3600:
        before_idx.update(range(t_start*200, 3600*200))
        after_idx.update(range(t_start*200, 3600*200))
        after_idx.update(range(0, (t_end-3600)*200))
        before_idx.update(range(0, (t_end-3600)*200))
    else:
        after_idx.update(range((t_start-3600)*200, (t_end-3600)*200))
        before_idx.update(range((t_start-3600)*200, (t_end-3600)*200))

# Convert sets back to sorted lists
before_idx = sorted(before_idx)
after_idx = sorted(after_idx)

before_mean_clean = np.delete(before_mean, before_idx)
after_mean_clean = np.delete(after_mean, after_idx)

# Paired t-test
# precision
getcontext().prec = 50

print(f"{name}, paired t test")
# mean of before vs after
mean_before = np.mean(before_mean_clean)
mean_after = np.mean(after_mean_clean)
print("mean_before :\n ", mean_before)
print("mean_after : \n", mean_after)

# Kolmogorov-Smirnov test
difference = before_mean_clean-after_mean_clean
ks_statistic, ks_p_value = stats.kstest(difference, stats.norm.cdf)
print(f"KS Statistic: \n {ks_statistic}")
print(f"KS p-value: \n {ks_p_value}")

# t-test
t_stat, p_value = stats.ttest_rel(before_mean, after_mean)

# print to decimal object
p_value_decimal = Decimal(p_value)
print ("t_statistics : \n", t_stat)
p_value_str = f"{p_value_decimal:.100f}"
print(f"p-value: \n {p_value_str}")

# Define Cohens'D 
def cohens_d(group1, group2):
    # Calculate mean, sd
    mean1, mean2 = np.mean(group1), np.mean(group2)
    std1, std2 = np.std(group1, ddof=1), np.std(group2, ddof=1)
    
    # sample size
    n1, n2 = len(group1), len(group2)
    
    # Calculate pooled sd
    pooled_std = np.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
    
    # Cohen's d
    d = (mean1 - mean2) / pooled_std
    return d

# Calculate Effect Size
effect_size = cohens_d(before_mean, after_mean)
print("effect size \n", effect_size)

In [None]:
ws_p = wb["paired t"]
ws_p.cell(row=row, column=1).value = f"{name}"
ws_p.cell(row=row, column=2).value = mean_before
ws_p.cell(row=row, column=3).value = mean_after
ws_p.cell(row=row, column=4).value = ks_statistic
ws_p.cell(row=row, column=5).value = ks_p_value
ws_p.cell(row=row, column=7).value = t_stat
ws_p.cell(row=row, column=8).value = p_value_str
ws_p.cell(row=row, column=10).value = effect_size

# Analysis across Band

## Paired t-test with Cohen's D test

In [None]:
freq_ranges = {
    'Delta': (1, 4),
    'Theta': (4, 8),
    'Alpha': (8, 13),
    'Beta': (13, 30),
}

# Averaging by band and timing
for band, (fmin, fmax) in freq_ranges.items():
    tfr_band_before = tfr.copy().crop(tmin=0, tmax=3600, fmin=fmin, fmax=fmax, include_tmax=False)
    tfr_band_after = tfr.copy().crop(tmin=3600, tmax=7199, fmin=fmin, fmax=fmax, include_tmax=False)

    globals()[f'tfr_{band}_before'] = tfr_band_before
    globals()[f'tfr_{band}_after'] = tfr_band_after

    globals()[f'{band}_before_mean'] = tfr_mean(tfr_band_before)
    globals()[f'{band}_after_mean'] = tfr_mean(tfr_band_after)

# Make length same    
for band in freq_ranges.keys():
    before_mean = globals()[f'{band}_before_mean']
    after_mean = globals()[f'{band}_after_mean']

    if len(before_mean) > len(after_mean):
        globals()[f'{band}_before_mean'] = before_mean[:len(after_mean)]
    elif len(before_mean) < len(after_mean):
        globals()[f'{band}_after_mean'] = after_mean[:len(before_mean)]
    else:
        print(f"{band} 대역의 before와 after 길이가 이미 같습니다.")

    print(f"{band} 대역 처리 완료: before 길이 = {len(globals()[f'{band}_before_mean'])}, after 길이 = {len(globals()[f'{band}_after_mean'])}")

# Removing large artifact by band
def remove_artifacts(before_mean, after_mean, large_artifact):
    before_idx = set()
    after_idx = set()

    for t_start, t_end in large_artifact:
        if t_start < 3600 and t_end < 3600:
            before_idx.update(range(t_start*200, t_end*200))
            after_idx.update(range(t_start*200, t_end*200))
        elif t_start < 3600 and t_end >= 3600:
            before_idx.update(range(t_start*200, 3600*200))
            after_idx.update(range(t_start*200, 3600*200))
            after_idx.update(range(0, (t_end-3600)*200))
            before_idx.update(range(0, (t_end-3600)*200))
        else:
            after_idx.update(range((t_start-3600)*200, (t_end-3600)*200))
            before_idx.update(range((t_start-3600)*200, (t_end-3600)*200))

    before_idx = sorted(before_idx)
    after_idx = sorted(after_idx)

    before_mean_clean = np.delete(before_mean, before_idx)
    after_mean_clean = np.delete(after_mean, after_idx)

    return before_mean_clean, after_mean_clean

# 주파수 대역별로 artifact 제거
for band in freq_ranges.keys():
    before_mean = globals()[f'{band}_before_mean']
    after_mean = globals()[f'{band}_after_mean']

    before_mean_clean, after_mean_clean = remove_artifacts(before_mean, after_mean, large_artifact)

    globals()[f'{band}_before_mean_clean'] = before_mean_clean
    globals()[f'{band}_after_mean_clean'] = after_mean_clean

    print(f"{band} 대역 처리 완료:")
    print(f"  Before: 원본 길이 = {len(before_mean)}, 정제 후 길이 = {len(before_mean_clean)}")
    print(f"  After: 원본 길이 = {len(after_mean)}, 정제 후 길이 = {len(after_mean_clean)}")
    print(f"  정제 후 Before와 After 길이 일치: {len(before_mean_clean) == len(after_mean_clean)}")

print(f"\n {name}, paired t test by band")
def run_tests(before_mean, after_mean, band_name):
    print(f"\n=== {band_name} 대역 분석 결과 ===")
    
    # precision
    getcontext().prec = 50

    # mean of before vs after
    mean_before = np.mean(before_mean)
    mean_after = np.mean(after_mean)
    print(f"mean_before: \n {mean_before}")
    print(f"mean_after: \n {mean_after}")

    # Kolmogorov-Smirnov test
    difference = before_mean - after_mean
    ks_statistic, ks_p_value = stats.kstest(difference, stats.norm.cdf)
    print(f"KS Statistic: \n {ks_statistic}")
    print(f"KS p-value: \n {ks_p_value}")

    # t-test
    t_stat, p_value = stats.ttest_rel(before_mean, after_mean)

    # print to decimal object
    p_value_decimal = Decimal(p_value)
    print(f"t_statistics: \n {t_stat}")
    p_value_str = f"{p_value_decimal:.100f}"
    print(f"p-value: \n {p_value_str}")

    # Define Cohens'D 
    def cohens_d(group1, group2):
        mean1, mean2 = np.mean(group1), np.mean(group2)
        std1, std2 = np.std(group1, ddof=1), np.std(group2, ddof=1)
        n1, n2 = len(group1), len(group2)
        pooled_std = np.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
        d = (mean1 - mean2) / pooled_std
        return d

    # Calculate Effect Size
    effect_size = cohens_d(before_mean, after_mean)
    print(f"Effect size: \n {effect_size}")

# 각 주파수 대역별로 테스트 실행
for band in freq_ranges.keys():
    before_mean = globals()[f'{band}_before_mean_clean']
    after_mean = globals()[f'{band}_after_mean_clean']
    run_tests(before_mean, after_mean, band)

In [None]:
ws_b = wb["paired t band"]
for row_b in [row*4-6, row*4-5, row*4-4, row*4-3]:
    ws_b.cell(row=row_b, column=1).value = f"{name}"
    ws_b.cell(row=row_b, column=4).value = mean_before
    ws_b.cell(row=row_b, column=5).value = mean_after
    ws_b.cell(row=row_b, column=6).value = ks_statistic
    ws_b.cell(row=row_b, column=7).value = ks_p_value
    ws_b.cell(row=row_b, column=9).value = t_stat
    ws_b.cell(row=row_b, column=10).value = p_value_str
    ws_b.cell(row=row_b, column=12).value = effect_size

In [None]:
wb.save(r"C:\Users\esin4\OneDrive\바탕 화면\Github\Mg_infusion_coma\Mg_infusion_data.xlsx")

## RM ANOVA

In [None]:
for band in freq_ranges.keys():
    before_mean = globals()[f'{band}_before_mean_clean']
    after_mean = globals()[f'{band}_after_mean_clean']
    
    before_mean_mean = np.mean(before_mean)
    after_mean_mean = np.mean(after_mean)
    
    globals()[f'{band.lower()}_before_mean_mean'] = before_mean_mean
    globals()[f'{band.lower()}_after_mean_mean'] = after_mean_mean
    
    print(f"\n=== {band} 대역 평균 ===")
    print(f"{band} Before Mean of Mean: {before_mean_mean}")
    print(f"{band} After Mean of Mean: {after_mean_mean}")

In [None]:
data = pd.DataFrame({
    'Band': ['Delta', 'Delta', 'Theta', 'Theta', 'Alpha', 'Alpha', 'Beta', 'Beta'],
    'Time': ['Before', 'After', 'Before', 'After', 'Before', 'After', 'Before', 'After'],
    'Power': [delta_before_mean_mean, delta_after_mean_mean, theta_before_mean_mean, theta_after_mean_mean, 
              alpha_before_mean_mean, alpha_after_mean_mean, beta_before_mean_mean, beta_after_mean_mean]
})

In [None]:
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.anova import AnovaRM

# 반복측정 ANOVA 모델 설정
aovrm = AnovaRM(data, 'Power', 'Band', within=['Time'])
res = aovrm.fit()

# 결과 출력
print(res)