BioPackで抽出した心電図情報のtxtをcsvにするプログラム

In [None]:
import numpy as np
import pandas as pd
from typing import List, Tuple
import os
import matplotlib.pyplot as plt
from scipy import signal
from scipy.spatial import ConvexHull

import pyVHR as vhr
from pyVHR.plot.visualize import *
from pyVHR.BVP import *
vhr.plot.VisualizeParams.renderer = 'notebook'

import mediapipe as mp

In [None]:
# 入力とする動画と動画のファイル名を取得
root_dir = "experimentData\\"
data_dirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]
trueValueTxt_paths = []
print("動画ディレクトリ:", data_dirs)

for i in range(len(data_dirs)):
    data_dir = data_dirs[i]
    print(f"Processing video directory: {data_dir}")

    # 動画ファイルのパスを取得
    movie_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
    trueValueTxt_paths.extend(movie_files)

print(f"真値ファイルのパス: {trueValueTxt_paths}")

In [None]:
# 真値txtファイルを読み込み、データフレームに変換
for trueValueTxt_path in trueValueTxt_paths:
    print(f"Reading true value file: {trueValueTxt_path}")
    df = pd.read_csv(trueValueTxt_path, header=None, skiprows=6, names=['time', 'ppg'], encoding='shift_jis', sep='\\s+')
    save_path = trueValueTxt_path.replace('.txt', '.csv')
    print(f"Saving to CSV: {save_path}")
    df.to_csv(save_path, index=False, encoding='utf-8-sig')

吉澤先生の精度比較ソフトウェアに入れるRRI_Simpleファイルを作るプログラム

In [None]:
def detect_peaks_from_ecg(ecg_signal, ecg_rate, method="neurokit"):
    """
    ECG信号からピークを検出し、そのインデックスの配列を返す関数

    Args:
        ecg_signal: ECG信号の配列
        ecg_rate: ECGのサンプリングレート
        method: ピーク検出方法 ("neurokit" または "scipy")

    Returns:
        peaks_indices: 検出されたピークのインデックスの配列
    """
    if method == "neurokit":
        # neurokit2を用いてピーク検出
        _ , info_dict = nk.ecg_process(ecg_signal, sampling_rate=ecg_rate, method="neurokit")
        peaks_indices = info_dict['ECG_R_Peaks']
    elif method == "scipy":
        # scipyを用いてピーク検出
        peaks_indices, _ = signal.find_peaks(ecg_signal, distance=ecg_rate * 0.6)
    else:
        raise ValueError("Unsupported method. Use 'neurokit' or 'scipy'.")

    return peaks_indices

In [None]:
# 入力とする動画と動画のファイル名を取得
root_dir = "experimentData\\"
data_dirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]
movie_paths = []
movie_names = []
true_value_csv_array = []
print("動画ディレクトリ:", data_dirs)

for i in range(len(data_dirs)):
    data_dir = data_dirs[i]

    # 動画ファイルのパスを取得
    movie_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.avi')]
    movie_paths.extend(movie_files)

    movie_name = os.path.basename(data_dir)
    movie_names.append(movie_name)

    # ppgファイルのパスを取得
    movie_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv')]
    true_value_csv_array = [f.replace('.avi', '.csv') for f in movie_paths if f.endswith('.avi')]
print(f"data_dirs: {data_dirs}")
print(f"movie_paths: {movie_paths}")
print(f"movie_names: {movie_names}")
print(f"true_value_csv_array: {true_value_csv_array}")

In [None]:
for i in range(len(movie_paths)):
    trueValueCSVPath = true_value_csv_array[i]
    trueValue = pd.read_csv(trueValueCSVPath)

    ecgRate = 1000
    trueValuePPG = trueValue['ppg'].values
    trueValuePPGTime = trueValue['time'].values

    trueValuePeaks = detect_peaks_from_ecg(trueValuePPG, ecgRate, method="scipy")
    trueValueRRI = np.diff(trueValuePPGTime[trueValuePeaks])  # RRIを計算
    trueValueRRITime = trueValuePPGTime[trueValuePeaks][1:]

    trueValueRRIDataFrame = pd.DataFrame(
        columns=['time', 'RRI', 'BPM']
    )
    trueValueRRIDataFrame['time'] = trueValueRRITime[1:-1]  # 最初と最後のピークは除外
    trueValueRRIDataFrame['RRI'] = trueValueRRI[1:-1]  # 最初と最後のRRIは除外
    trueValueRRIDataFrame['BPM'] = 60 / trueValueRRI[1:-1]  # RRIからBPMを計算

    rri_simple_path = os.path.join(data_dirs[i], 'RRI_Simple_' + movie_names[i] + '.csv')
    pd.DataFrame.to_csv(trueValueRRIDataFrame, rri_simple_path, index=False, encoding='utf-8-sig')

## 肌領域検出でRGB抽出

In [None]:
def visualize_roi(frame, roi_info):
    """
    ROIを可視化する関数
    
    Parameters:
    -----------
    frame : np.ndarray
        元の画像フレーム
    roi_info : dict
        extract_rgb_with_fixed_roi()が返すROI情報
    
    Returns:
    --------
    vis_frame : np.ndarray
        ROIを描画した画像
    """
    vis_frame = frame.copy()
    
    # マスク領域を半透明で表示
    mask_colored = cv2.cvtColor(roi_info['mask'], cv2.COLOR_GRAY2BGR)
    mask_colored[:, :, 1] = roi_info['mask']  # 緑チャンネル
    vis_frame = cv2.addWeighted(vis_frame, 0.7, mask_colored, 0.3, 0)
    
    # 凸包の輪郭を描画
    cv2.polylines(vis_frame, [roi_info['hull_points']], 
                  True, (0, 255, 0), 2)
    
    # ランドマークを描画（オプション）
    for pt in roi_info['landmarks']:
        cv2.circle(vis_frame, tuple(pt), 1, (255, 0, 0), -1)
    
    return vis_frame

In [None]:
# メイン処理
for i in range(len(movie_paths)):
    inputMoviePath = movie_paths[i]
    rootDir = data_dirs[i]
    dataName = movie_names[i]
    save_dir = os.path.join(rootDir)
    print(f"\n{'='*60}")
    print(f"処理中: {dataName}")
    print(f"保存先: {save_dir}")
    print(f"{'='*60}")
    os.makedirs(save_dir, exist_ok=True)

    # 動画情報取得
    cap = cv2.VideoCapture(inputMoviePath)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = total_frames / fps
    print(f"動画情報: {total_frames}フレーム, {fps:.2f}fps, {duration:.2f}秒")

    # 1フレーム目でROI検出
    ret, first_frame = cap.read()
    if not ret:
        cap.release()
        print(f"エラー: 動画の読み込みに失敗しました: {inputMoviePath}")
        continue
    
    mp_face_mesh = mp.solutions.face_mesh
    with mp_face_mesh.FaceMesh(
        static_image_mode=True,
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5) as face_mesh:
        
        frame_rgb = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB)
        results_mp = face_mesh.process(frame_rgb)
        
        if not results_mp.multi_face_landmarks:
            cap.release()
            print(f"エラー: 顔が検出できませんでした: {inputMoviePath}")
            continue
        
        landmarks = results_mp.multi_face_landmarks[0]
        h, w = first_frame.shape[:2]
        
        ldmks = np.array([[int(l.x * w), int(l.y * h)] 
                        for l in landmarks.landmark])
        
        hull = ConvexHull(ldmks)
        hull_points = ldmks[hull.vertices]
        
        mask = np.zeros((h, w), dtype=np.uint8)
        cv2.fillConvexPoly(mask, hull_points, 255)

        roi_info = {
            'mask': mask,
            'hull_points': hull_points,
            'landmarks': ldmks
        }

        vis_frame = visualize_roi(first_frame, roi_info)
        roi_vis_path = os.path.join(save_dir, "roi_visualization.jpg")
        cv2.imwrite(roi_vis_path, vis_frame)
        print(f"ROI可視化画像を保存: {roi_vis_path}")
        print(f"1フレーム目でROI検出完了: {len(hull_points)}個の凸包頂点")

    # 全フレームでRGB信号抽出
    print("全フレームで特徴量抽出中...")
    
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    frame_count = 0

    # 結果を格納する辞書
    results = {
        'frame_number': [], 'timestamp': [],'r_mean': [], 'g_mean': [], 'b_mean': [], 'angle': []
    }

    prev_frame = None
    prev_gray = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        timestamp = frame_count / fps
        
        # BGRをRGBに変換
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # マスク領域のピクセルを抽出
        valid_pixels = frame_rgb[mask > 0]
        
        if len(valid_pixels) > 0:
            # RGB統計量
            r_mean = np.mean(valid_pixels[:, 0])
            g_mean = np.mean(valid_pixels[:, 1])
            b_mean = np.mean(valid_pixels[:, 2])

            # RGB正規化
            r_mean_square = r_mean ** 2
            g_mean_square = g_mean ** 2
            b_mean_square = b_mean ** 2

            normalized_r_mean = r_mean / np.sqrt(r_mean_square + g_mean_square + b_mean_square)
            normalized_g_mean = g_mean / np.sqrt(r_mean_square + g_mean_square + b_mean_square)
            normalized_b_mean = b_mean / np.sqrt(r_mean_square + g_mean_square + b_mean_square)

            # 標準化肌ベクトル [0.7682, 0.5121, 0.3841]との角度計算
            skin_vector = np.array([normalized_r_mean, normalized_g_mean, normalized_b_mean])
            reference_vector = np.array([0.7682, 0.5121, 0.3841])
            cosine_similarity = np.dot(skin_vector, reference_vector)
            angle = np.arccos(cosine_similarity) * (180 / np.pi)
            
        else:
            raise ValueError("ROI内に有効なピクセルが存在しません")

        # 結果を保存
        results['frame_number'].append(frame_count)
        results['timestamp'].append(timestamp)
        results['r_mean'].append(r_mean)
        results['g_mean'].append(g_mean)
        results['b_mean'].append(b_mean)
        results['angle'].append(angle)
        
        # 前フレームを保存
        prev_frame = frame.copy()
        prev_gray = gray.copy()
        
        frame_count += 1
        
        if frame_count % 100 == 0:
            print(f"  処理中: {frame_count}/{total_frames} フレーム ({frame_count/total_frames*100:.1f}%)")

    cap.release()
    print(f"特徴量抽出完了: {frame_count}フレーム処理")

    # DataFrameに変換して保存
    signals_df = pd.DataFrame(results)

    save_path = os.path.join(save_dir, "extracted_signals.csv")
    signals_df.to_csv(save_path, index=False)
    print(f"抽出した信号を保存: {save_path}")