physionetが公開しているデータセットからデータを抽出し、睡眠段階を可視化します

In [None]:
import os
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import mne
from mne.datasets import sleep_physionet
import pandas as pd
from scipy import signal
from sklearn.preprocessing import StandardScaler
import json

# データの取得
subjects = [0]
recording = [1]

# PhysioNetの睡眠データセットを取得
try:
    # データファイルを取得
    raw_fnames = sleep_physionet.age.fetch_data(subjects=subjects, recording=recording)

    # ファイルパスの確認とデバッグ
    print(f"Raw data files type: {type(raw_fnames)}")
    print(f"Raw data files: {raw_fnames}")

    # 最初のファイルパスを取得
    if isinstance(raw_fnames, list) and len(raw_fnames) > 0:
        if isinstance(raw_fnames[0], list):
            raw_file_path = raw_fnames[0][0]  # ネストされたリストの場合
        else:
            raw_file_path = raw_fnames[0]
    else:
        raise ValueError("データファイルが見つかりません")

    print(f"使用するファイルパス: {raw_file_path}")

    # アノテーションファイルの処理は後回し
    annotation_fname = None

except Exception as e:
    print(f"データ取得エラー: {e}")
    # フォールバック: 別の被験者やrecordingを試す
    try:
        subjects = [0]
        recording = [2]  # recording [2]を試す
        raw_fnames = sleep_physionet.age.fetch_data(subjects=subjects, recording=recording)
        if isinstance(raw_fnames[0], list):
            raw_file_path = raw_fnames[0][0]
        else:
            raw_file_path = raw_fnames[0]
        annotation_fname = None
    except:
        print("すべてのフォールバックオプションが失敗しました")
        raise

# データの読み込み
raw = mne.io.read_raw_edf(raw_file_path, preload=True, stim_channel=None)

# アノテーションが利用可能な場合のみ読み込み
if annotation_fname:
    try:
        annotations = mne.read_annotations(annotation_fname)
        raw.set_annotations(annotations)
        print("アノテーションを適用しました")
    except:
        print("アノテーションの読み込みに失敗しました")
else:
    print("アノテーションファイルが利用できません - 生データのみで解析を続行")

print(f"データの長さ: {raw.times[-1]:.1f} 秒")
print(f"サンプリング周波数: {raw.info['sfreq']} Hz")
print(f"チャンネル数: {len(raw.ch_names)}")
print(f"チャンネル名: {raw.ch_names}")

# 利用可能なチャンネルを確認してEOGとEEGチャンネルを選択
available_channels = raw.ch_names

# EOGチャンネルの検索と選択
eog_possible = ['EOG horizontal', 'EOG vertical', 'EOG', 'EOG1', 'EOG2']
eog_channels = [ch for ch in eog_possible if ch in available_channels]

if len(eog_channels) == 0:
    # EOGチャンネルが見つからない場合、EEGチャンネルから代用
    print("EOGチャンネルが見つかりません。EEGチャンネルを使用します。")
    eog_channels = [ch for ch in available_channels if 'EEG' in ch][:2]

# チャンネルが不足している場合の処理
if len(eog_channels) < 2:
    eog_channels = available_channels[:2]  # 最初の2チャンネルを使用

print(f"使用するEOGチャンネル: {eog_channels}")

# EEGチャンネルの検索と選択
eeg_possible = ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EEG C3-A2', 'EEG C4-A1']
eeg_channels = [ch for ch in eeg_possible if ch in available_channels]

if len(eeg_channels) == 0:
    # 指定されたEEGチャンネルが見つからない場合、利用可能なEEGチャンネルを使用
    eeg_channels = [ch for ch in available_channels if 'EEG' in ch][:2]

# チャンネルが不足している場合の処理
if len(eeg_channels) < 2:
    # EOGチャンネルとは異なるチャンネルを選択
    remaining_channels = [ch for ch in available_channels if ch not in eog_channels]
    eeg_channels = remaining_channels[:2]

print(f"使用するEEGチャンネル: {eeg_channels}")

# チャンネルデータの取得
eog_data = raw.copy().pick_channels(eog_channels[:2])  # 最大2チャンネル
eeg_data = raw.copy().pick_channels(eeg_channels[:2])  # 最大2チャンネル

# REM睡眠検知のための特徴量抽出
def extract_rem_features(raw_data, window_size=30, overlap=0.5):
    """
    REM睡眠検知のための特徴量を抽出
    """
    sfreq = raw_data.info['sfreq']
    window_samples = int(window_size * sfreq)
    step_samples = int(window_samples * (1 - overlap))

    features = []
    times = []

    for start in range(0, len(raw_data.times) - window_samples, step_samples):
        end = start + window_samples
        time_center = raw_data.times[start + window_samples // 2]

        # EOGデータの取得
        eog_segment = eog_data.get_data()[:, start:end]

        # EEGデータの取得
        eeg_segment = eeg_data.get_data()[:, start:end]

        # EOG特徴量（急速眼球運動の検知）
        eog_std = np.std(eog_segment, axis=1)
        eog_range = np.ptp(eog_segment, axis=1)

        # EEG特徴量（低振幅・混合周波数の検知）
        eeg_power_bands = {}
        for i, ch_data in enumerate(eeg_segment):
            freqs, psd = signal.welch(ch_data, sfreq, nperseg=min(256, len(ch_data)))

            # 周波数帯域のパワー計算
            delta_power = np.mean(psd[(freqs >= 0.5) & (freqs < 4)])
            theta_power = np.mean(psd[(freqs >= 4) & (freqs < 8)])
            alpha_power = np.mean(psd[(freqs >= 8) & (freqs < 13)])
            beta_power = np.mean(psd[(freqs >= 13) & (freqs < 30)])

            eeg_power_bands[f'ch{i}_delta'] = delta_power
            eeg_power_bands[f'ch{i}_theta'] = theta_power
            eeg_power_bands[f'ch{i}_alpha'] = alpha_power
            eeg_power_bands[f'ch{i}_beta'] = beta_power

        # 特徴量の組み合わせ（ゼロ除算回避）
        feature_vector = [
            np.mean(eog_std),  # EOG標準偏差の平均
            np.mean(eog_range),  # EOG範囲の平均
            eeg_power_bands['ch0_theta'] / (eeg_power_bands['ch0_delta'] + 1e-10),  # シータ/デルタ比
            eeg_power_bands['ch1_theta'] / (eeg_power_bands['ch1_delta'] + 1e-10),
            np.mean([eeg_power_bands['ch0_alpha'], eeg_power_bands['ch1_alpha']]),  # アルファ波パワー
            np.mean([eeg_power_bands['ch0_beta'], eeg_power_bands['ch1_beta']])   # ベータ波パワー
        ]

        features.append(feature_vector)
        times.append(time_center)

    return np.array(features), np.array(times)

# 特徴量抽出
print("特徴量を抽出中...")
features, feature_times = extract_rem_features(raw)

# REM睡眠の簡易検知アルゴリズム
def detect_rem_sleep(features, threshold_percentile=75):
    """
    特徴量を基にREM睡眠を検知
    """
    # 特徴量の正規化
    scaler = StandardScaler()
    features_normalized = scaler.fit_transform(features)

    # REM睡眠スコアの計算（EOG活動 + 低デルタ波活動）
    eog_score = features_normalized[:, 0] + features_normalized[:, 1]  # EOG活動
    theta_delta_ratio = features_normalized[:, 2] + features_normalized[:, 3]  # シータ/デルタ比

    rem_score = eog_score + theta_delta_ratio

    # 閾値による検知
    threshold = np.percentile(rem_score, threshold_percentile)
    rem_detected = rem_score > threshold

    return rem_detected, rem_score

# REM睡眠検知
rem_detected, rem_score = detect_rem_sleep(features)

# 統計情報の計算と出力
total_rem_time = np.sum(rem_detected) * 30 / 60  # 分単位
total_recording_time = len(feature_times) * 30 / 60  # 分単位
rem_percentage = (total_rem_time / total_recording_time) * 100
rem_episodes = int(np.sum(np.diff(rem_detected.astype(int)) == 1))

print(f"\n=== REM睡眠検知結果 ===")
print(f"総記録時間: {total_recording_time:.1f} 分")
print(f"検知されたREM睡眠時間: {total_rem_time:.1f} 分")
print(f"REM睡眠の割合: {rem_percentage:.1f}%")
print(f"REM睡眠エピソード数: {rem_episodes}")

# 詳細な統計情報
print(f"\n=== 詳細統計 ===")
print(f"平均REMスコア: {np.mean(rem_score):.3f}")
print(f"REMスコア標準偏差: {np.std(rem_score):.3f}")
print(f"検知閾値: {np.percentile(rem_score, 75):.3f}")
print(f"最長REM連続時間: {np.max([len(list(g)) for k, g in __import__('itertools').groupby(rem_detected) if k]) * 0.5:.1f} 分")

# データをJSONで保存（後の統合システムで使用）
sleep_data = {
    "session_id": "rem_analysis_001",
    "timestamp": pd.Timestamp.now().isoformat(),
    "total_recording_time": float(total_recording_time),
    "rem_time": float(total_rem_time),
    "rem_percentage": float(rem_percentage),
    "rem_episodes": rem_episodes,
    "channels_used": {
        "eog": eog_channels,
        "eeg": eeg_channels
    },
    "features_shape": list(features.shape),
    "rem_statistics": {
        "mean_score": float(np.mean(rem_score)),
        "std_score": float(np.std(rem_score)),
        "threshold": float(np.percentile(rem_score, 75)),
        "max_continuous_rem_minutes": float(np.max([len(list(g)) for k, g in __import__('itertools').groupby(rem_detected) if k]) * 0.5)
    },
    "plot_saved": False,
    "analysis_method": "EOG_EEG_feature_extraction"
}

# 結果をJSONファイルとして保存
with open('/dreamdive-Baku/rem_analysis_results.json', 'w', encoding='utf-8') as f:
    json.dump(sleep_data, f, ensure_ascii=False, indent=2)

# CSVでも詳細データを保存
detailed_results = pd.DataFrame({
    'time_hours': feature_times / 3600,
    'rem_score': rem_score,
    'rem_detected': rem_detected.astype(int),
    'eog_std': features[:, 0],
    'eog_range': features[:, 1],
    'theta_delta_ratio_ch0': features[:, 2],
    'theta_delta_ratio_ch1': features[:, 3],
    'alpha_power': features[:, 4],
    'beta_power': features[:, 5]
})

detailed_results.to_csv('/dreamdive-Baku/rem_detailed_analysis.csv', index=False)

print(f"\n=== ファイル保存完了 ===")
print(f"JSON結果: rem_analysis_results.json")
print(f"詳細CSV: rem_detailed_analysis.csv")
print(f"使用チャンネル - EOG: {eog_channels}, EEG: {eeg_channels}")

# CBTシステムとの統合用データ準備
integration_data = {
    "sleep_session": sleep_data,
    "ready_for_integration": True,
    "next_step": "dream_report_input"
}

print(f"\n=== 統合システム準備完了 ===")
print(f"CBTシステムとの統合準備が完了しました。")
print(f"夢の報告を受け付ける準備ができています。")

Using default location ~/mne_data for PHYSIONET_SLEEP...
Raw data files type: <class 'list'>
Raw data files: [['/Users/mizuno_shin/mne_data/physionet-sleep-data/SC4001E0-PSG.edf', '/Users/mizuno_shin/mne_data/physionet-sleep-data/SC4001EC-Hypnogram.edf']]
使用するファイルパス: /Users/mizuno_shin/mne_data/physionet-sleep-data/SC4001E0-PSG.edf
Extracting EDF parameters from /Users/mizuno_shin/mne_data/physionet-sleep-data/SC4001E0-PSG.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 7949999  =      0.000 ... 79499.990 secs...
アノテーションファイルが利用できません - 生データのみで解析を続行
データの長さ: 79500.0 秒
サンプリング周波数: 100.0 Hz
チャンネル数: 7
チャンネル名: ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EOG horizontal', 'Resp oro-nasal', 'EMG submental', 'Temp rectal', 'Event marker']
使用するEOGチャンネル: ['EEG Fpz-Cz', 'EEG Pz-Oz']
使用するEEGチャンネル: ['EEG Fpz-Cz', 'EEG Pz-Oz']
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code shoul

In [None]:

import os
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
import kaleido  # noqa

df = detailed_results.copy()
threshold = float(np.percentile(rem_score, 75))

fig = make_subplots(
    rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.03,
    subplot_titles=("REMスコアと閾値", "Theta/Delta 比（ch0/ch1）", "EOG 指標（std/range）")
)

# Row1: REMスコア + 閾値
fig.add_trace(go.Scatter(
    x=df["time_hours"], y=df["rem_score"], name="REM score", line=dict(color="#2E86DE")
), row=1, col=1)
fig.add_trace(go.Scatter(
    x=df["time_hours"], y=[threshold]*len(df), name="threshold(p75)", line=dict(color="#E74C3C", dash="dash")
), row=1, col=1)

# REM検知区間をシェーディング
mask = df["rem_detected"] == 1
segments = []
in_seg = False
start_t = None
for i in range(len(mask)):
    if mask.iloc[i] and not in_seg:
        in_seg = True
        start_t = df["time_hours"].iloc[i]
    if in_seg and (i == len(mask)-1 or not mask.iloc[i+1]):
        end_t = df["time_hours"].iloc[i]
        segments.append((start_t, end_t))
        in_seg = False

for s, e in segments:
    fig.add_vrect(x0=s, x1=e, fillcolor="rgba(0,150,255,0.12)", line_width=0, layer="below")

# Row2: Theta/Delta
fig.add_trace(go.Scatter(
    x=df["time_hours"], y=df["theta_delta_ratio_ch0"], name="Theta/Delta ch0", line=dict(color="#27AE60")
), row=2, col=1)
fig.add_trace(go.Scatter(
    x=df["time_hours"], y=df["theta_delta_ratio_ch1"], name="Theta/Delta ch1", line=dict(color="#F39C12")
), row=2, col=1)

# Row3: EOG
fig.add_trace(go.Scatter(
    x=df["time_hours"], y=df["eog_std"], name="EOG std", line=dict(color="#8E44AD")
), row=3, col=1)
fig.add_trace(go.Scatter(
    x=df["time_hours"], y=df["eog_range"], name="EOG range", line=dict(color="#16A085")
), row=3, col=1)

fig.update_xaxes(title_text="経過時間 [hours]", row=3, col=1)
fig.update_yaxes(title_text="スコア", row=1, col=1)
fig.update_layout(
    title="REM検知 可視化（Plotly）",
    height=900, width=1200, legend_orientation="h", legend_y= -0.06, margin=dict(l=60, r=20, t=60, b=60)
)

fig.show()

# 保存（HTMLは常に、PNGはkaleidoが必要）
out_dir = "/dreamdive-Baku"
html_path = os.path.join(out_dir, "rem_analysis_plot.html")
png_path = os.path.join(out_dir, "rem_analysis_plot.png")

pio.write_html(fig, file=html_path, auto_open=False, include_plotlyjs="cdn")

png_saved = False
try:
    pio.write_image(fig, png_path, width=1400, height=900, scale=2)
    png_saved = True
except Exception:
    print("PNG保存には kaleido が必要です: pip install -U kaleido")

sleep_data["plot_saved"] = bool(png_saved)
with open('/dreamdive-Baku/rem_analysis_results.json', 'w', encoding='utf-8') as f:
    json.dump(sleep_data, f, ensure_ascii=False, indent=2)

print(f"可視化HTML: {html_path}")
if png_saved:
    print(f"可視化PNG : {png_path}")