In [None]:
import os
import sys
from pathlib import Path

In [None]:
# HOMEディレクトリ設定(環境に合わせて変更してください)
if "google.colab" in sys.modules:
    # Google Colaboratoryの場合
    HOME = Path("/content/drive/MyDrive/signate/NEDOG")

    # Google Driveをマウント
    if not os.path.exists("/content/drive"):
        from google.colab import drive
        drive.mount("/content/drive")
else:
    # それ以外
    HOME = Path("..")

# INPUT/WORKINGディレクトリ設定
INPUT = HOME / "input"
WORKING = HOME / "working"

In [None]:
import warnings
warnings.simplefilter('ignore', FutureWarning)

In [None]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.io as sio

In [None]:
subjects = [
    '0001', '0002', '0003', '0004', '0005'
]

In [None]:
emg_cols = [
    'TA R', 'TA L', 'LG R', 'LG L', 'RF R', 'RF L', 'VL R', 'VL L',
    'ST R', 'ST L', 'GMAX R', 'GMAX L', 'EMI R', 'EMI L', 'DEL R', 'DEL L'
]

In [None]:
vel_cols = [
    'vel_x', 'vel_y', 'vel_z'
]

# matからDataFrameに変換

In [None]:
# matファイル読み取り
tr_data = sio.loadmat(INPUT / 'train.mat')
ts_data = sio.loadmat(INPUT / 'test.mat')
rf_data = sio.loadmat(INPUT / 'reference.mat')

In [None]:
# DataFrame生成関数
def create_df(data, index, subjects, cols):
    dfs = []
    for subject in subjects:
        for i, col in enumerate(cols):
            df_tmp = pd.DataFrame(data[subject][0][0][index][:,i,:])
            df_tmp = df_tmp.stack().reset_index().rename(
                columns={'level_0':'trial','level_1':'time',0:col}
            )
            df = df.merge(df_tmp, on=['trial','time']) if i > 0 else df_tmp
        df['trial'] += 1
        df['subject'] = int(subject)
        dfs.append(df)
    df = pd.concat(dfs, axis=0).reset_index(drop=True)
    return df

In [None]:
# EGMデータ変換
tr_emg_df = create_df(tr_data, 0, subjects[:-1], emg_cols)
ts_emg_df = create_df(ts_data, 0, subjects[:-1], emg_cols)
rtr_emg_df = create_df(rf_data, 0, subjects[-1:], emg_cols)
rts_emg_df = create_df(rf_data, 2, subjects[-1:], emg_cols)

In [None]:
# 目的変数変換
tr_vel_df = create_df(tr_data, 1, subjects[:-1], vel_cols)
rtr_vel_df = create_df(rf_data, 1, subjects[-1:], vel_cols)
rts_vel_df = create_df(rf_data, 3, subjects[-1:], vel_cols)

# 軸補正

In [None]:
# xy平面上でのx軸との角度の中央値
def estimate_axis_direction(df):
    f = (df['vel_x']**2 + df['vel_y']**2) >= 5
    ang = np.arctan2(df.loc[f, 'vel_y'], df.loc[f, 'vel_x'])
    ang[ang < -np.pi/2] += np.pi*2
    ang[ang > +np.pi/2] -= np.pi
    return np.median(ang)

In [None]:
# 速度ベクトルの回転変換
def rotate_vel(df, angle, xcol='vel_x', ycol='vel_y'):
    rot = np.array(
        [[np.cos(angle),-np.sin(angle)],[np.sin(angle),np.cos(angle)]]
    )
    vel_x, vel_y = np.dot(rot, [df[xcol].values, df[ycol].values])
    return vel_x, vel_y

In [None]:
# 被験者ごとに補正
theta = {}
for subject in range(1,5):
    f = tr_vel_df['subject'] == subject
    theta[subject] = estimate_axis_direction(tr_vel_df[f])

    vel_x, vel_y = rotate_vel(tr_vel_df[f], -theta[subject])
    tr_vel_df.loc[f, 'vel_x'] = vel_x
    tr_vel_df.loc[f, 'vel_y'] = vel_y

theta[5] = estimate_axis_direction(rtr_vel_df)
vel_x, vel_y = rotate_vel(rtr_vel_df, -theta[5])
rtr_vel_df['vel_x'] = vel_x
rtr_vel_df['vel_y'] = vel_y
vel_x, vel_y = rotate_vel(rts_vel_df, -theta[5])
rts_vel_df['vel_x'] = vel_x
rts_vel_df['vel_y'] = vel_y

theta

# Train/Testで方向をそろえる

In [None]:
tr_vel_df[['vel_x','vel_y']] = -tr_vel_df[['vel_x','vel_y']]
rtr_vel_df[['vel_x','vel_y']] = -rtr_vel_df[['vel_x','vel_y']]

# 保存

In [None]:
tr_emg_df.to_pickle(WORKING / 'prep1_tr_emg.pickle')
ts_emg_df.to_pickle(WORKING / 'prep1_ts_emg.pickle')
rtr_emg_df.to_pickle(WORKING / 'prep1_rtr_emg.pickle')
rts_emg_df.to_pickle(WORKING / 'prep1_rts_emg.pickle')

In [None]:
tr_vel_df.to_pickle(WORKING / 'prep1_tr_vel.pickle')
rtr_vel_df.to_pickle(WORKING / 'prep1_rtr_vel.pickle')
rts_vel_df.to_pickle(WORKING / 'prep1_rts_vel.pickle')