In [9]:
#ライブラリをインポート
import os
import re
import sys
import csv
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [10]:
#定数を定義
BINS = 10000  #ヒストグラムのビンの数
EPSILON = .00001  #スムージングパラメータ
UPPER_LIMIT = 1.1 #静止区間の上限
LOWER_LIMIT = 0.9 #静止区間の加減
STATIONARY_INTERVALS = 5  #静止区間除去のサンプルの間隔(何サンプル静止区間が連続したら除去するか)

In [11]:
#ディレクトリ内のデータセットのファイル名と周波数を取得する関数
def get_filename_and_Hz(path: str) -> list[int, str]:
    filename = os.listdir(path)
    filename_and_Hz=[]

    for file in filename:
        Hz = re.search(r'\d+', file)
        if Hz:  #数字の入っていないファイル名があるとエラーを吐くので、このif文でチェックする
            filename_and_Hz.append([int(Hz.group(0)), file])

    return filename_and_Hz

In [12]:
#ファイル名と周波数を分けて出力する関数
def divide_filename_and_Hz(filename_and_Hz: list[int, str]) -> tuple[list[int], list[str]]:
    Hz = []
    filename = []
    for row in filename_and_Hz:
      Hz.append(row[0])
      filename.append(row[1])

    return Hz, filename

In [13]:
#加速度データのCSVファイルから3軸加速度を取得する関数
def get_acceleration(filename: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    data = np.loadtxt(filename, delimiter=',', usecols=(2, 3, 4))
    return data[:, 0], data[:, 1], data[:, 2]

In [14]:
#静止区間を除去する関数
def remove_stationary_intervals(AccX: np.ndarray, AccY: np.ndarray, AccZ: np.ndarray) -> np.ndarray:
    # 各軸の加速度の平均を求める
    AvgAccX = np.mean(AccX)
    AvgAccY = np.mean(AccY)
    AvgAccZ = np.mean(AccZ)

    # 重力加速度の推定値=合成加速度の平均を求める
    AvgResultantAcc = np.sqrt(AvgAccX ** 2 + AvgAccY ** 2 + AvgAccZ ** 2)

    # 各時刻の合成加速度を求める
    ResultantAcc = np.sqrt(AccX ** 2 + AccY ** 2 + AccZ ** 2)

    # 各時刻の合成加速度から静止区間を除去する
    stationary_interval_mask = (ResultantAcc > AvgResultantAcc * LOWER_LIMIT) & (ResultantAcc < AvgResultantAcc * UPPER_LIMIT)
    stationary_intervals = np.split(ResultantAcc, np.where(np.diff(stationary_interval_mask) != 1)[0] + 1)
    ResultantAcc = np.concatenate([interval for interval in stationary_intervals if len(interval) < STATIONARY_INTERVALS])

    return ResultantAcc

In [15]:
def remove_stationary_intervals(AccX: np.ndarray, AccY: np.ndarray, AccZ: np.ndarray) -> np.ndarray:
    # 各軸の加速度の平均を求める
    AvgAccX = np.mean(AccX)
    AvgAccY = np.mean(AccY)
    AvgAccZ = np.mean(AccZ)

    # 重力加速度の推定値=合成加速度の平均を求める
    AvgResultantAcc = np.sqrt(AvgAccX ** 2 + AvgAccY ** 2 + AvgAccZ ** 2)

    # 各時刻の合成加速度を求める
    ResultantAcc = np.sqrt(AccX ** 2 + AccY ** 2 + AccZ ** 2)

    # 静止区間を特定するマスクを作成
    stationary_interval_mask = (ResultantAcc > AvgResultantAcc * LOWER_LIMIT) & (ResultantAcc < AvgResultantAcc * UPPER_LIMIT)

    # 静止区間を分割して配列に保存
    intervals = np.split(ResultantAcc, np.where(np.diff(stationary_interval_mask) != 1)[0] + 1)

    # 静止区間が一定以上続く場合には除去
    updated_intervals = []
    for interval in intervals:
        if len(interval) >= STATIONARY_INTERVALS:
            # 区間が静止区間の場合は除去する
            for i in range(len(interval) - STATIONARY_INTERVALS + 1):
                if np.all((interval[i:i+STATIONARY_INTERVALS] > AvgResultantAcc * LOWER_LIMIT) &
                          (interval[i:i+STATIONARY_INTERVALS] < AvgResultantAcc * UPPER_LIMIT)):
                    interval[i:i+STATIONARY_INTERVALS] = np.nan
            updated_intervals.append(interval[~np.isnan(interval)])
        else:
            updated_intervals.append(interval)

    # すべての区間を結合して最終結果を返す
    return np.concatenate(updated_intervals)

In [16]:
AccX, AccY, AccZ = get_acceleration("data/walk100Hz-0930-0756.csv")
ResultantAcc = remove_stationary_intervals(AccX, AccY, AccZ)
len(ResultantAcc)

216590

In [18]:
#連続する2サンプルの差分を取る関数
def differences_of_acceleration(ResultantAcc: np.ndarray) -> np.ndarray:
    return np.abs(np.diff(ResultantAcc)*100000)

In [9]:
#KLダイバージェンス関数 #引数として与える2つの分布は非負の値の集合でなければならないことに注意
def KL_divergence(a: np.ndarray, b: np.ndarray) -> float:
    min_value = min(min(a), min(b)) #a,bの最小値の小さい方
    max_value = max(max(a), max(b)) #a,bの最大値の大きい方

    #a,bのヒストグラムを作成し、同じ数のビンで区切る
    a_hist, _ = np.histogram(a, bins=BINS, range=(min_value, max_value))
    b_hist, _ = np.histogram(b, bins=BINS, range=(min_value, max_value))

    #正規化する(確率分布に変換する、合計を1にする)ために全合計で割る
    a_hist = (a_hist + EPSILON) / a_hist.sum()
    b_hist = (b_hist + EPSILON) / b_hist.sum()

    #KLダイバージェンスの値を返す
    return np.sum(a_hist * np.log(a_hist / b_hist + EPSILON))

In [10]:
#JSダイバージェンス関数 #引数として与える2つの分布は非負の値の集合でなければならないことに注意
def JS_divergence(a: list[float], b: list[float]) -> float:
    min_value = min(min(a), min(b)) #a,bの最小値の小さい方
    max_value = max(max(a), max(b)) #a,bの最大値の大きい方

    #a,bのヒストグラムを作成し、同じ数のビンで区切る
    a_hist, _ = np.histogram(a, bins=BINS, range=(min_value, max_value))
    b_hist, _ = np.histogram(b, bins=BINS, range=(min_value, max_value))

    #正規化する(確率分布に変換する、合計を1にする)ために全合計で割る
    a_hist = (a_hist + EPSILON) / a_hist.sum()
    b_hist = (b_hist + EPSILON) / b_hist.sum()

    #2つの分布の平均値を求める
    mean_hist = (a_hist + b_hist) / 2.0

    #平均とそれぞれの分布のKLダイバージェンスを算出
    kl_a = np.sum(a_hist * np.log(a_hist / mean_hist))
    kl_b = np.sum(b_hist * np.log(b_hist / mean_hist))

    #JSダイバージェンスの値を返す
    return (kl_a + kl_b) / 2.0

In [11]:
#データフレームの各行の中で2番目に小さい値が格納されている場所を調べる関数(最小値は同じ確率分布同士の0.0)
def get_index_and_columns_of_second_smallest(df: pd.DataFrame) -> list[str, str]:
    index_and_columns_of_second_smallest = []
    sorted_indices = np.argsort(df.values, axis=1)  # 各行を昇順に並べ替えたインデックスの行列を作成
    for i in range(len(df)):
        second_smallest_index = sorted_indices[i, 1]  # 各行の2番目に小さい値が格納されているカラムのインデックスを取得
        index_and_columns_of_second_smallest.append((df.index[i], df.columns[second_smallest_index]))
    return index_and_columns_of_second_smallest

In [12]:
#推定精度を算出する巻数
def calculate_accuracy(index_and_columns_of_second_smallest: list[np.ndarray]) -> float:
    # 全要素のうち、インデックスとカラムのラベル名が同じ要素のみがTrueとなる真偽値の配列を作成
    is_same = np.array([index_and_columns_of_second_smallest[i][0] == index_and_columns_of_second_smallest[i][1] for i in range(len(index_and_columns_of_second_smallest))])

    # 真偽値の配列の合計を計算し、その割合を算出することで、精度を求める
    accuracy = is_same.sum() / len(index_and_columns_of_second_smallest) * 100

    return accuracy

In [13]:
def main():
    args = sys.argv
    path = args[1]
    filename_and_Hz = get_filename_and_Hz(path)
    filename_and_Hz.sort(reverse=True)
    Hz, filename = divide_filename_and_Hz(filename_and_Hz)
    Hz = [str(hz) + "Hz" for hz in Hz]

    # AccX, AccY, AccZを一括で処理するために、辞書にまとめる
    Acc_dict = {}
    for i in filename:
        Acc_dict[i] = get_acceleration(path + i)
        Acc_dict[i] = remove_stationary_intervals(*Acc_dict[i])

    # ndarrayに変換して、差分計算とJS/KLダイバージェンスの計算を一括で行う
    acc_array_dict = {k: np.array(differences_of_acceleration(v)) for k, v in Acc_dict.items()}
    filename_array = np.array(filename)
    resultKLD = np.zeros((len(filename), len(filename)))
    resultJSD = np.zeros((len(filename), len(filename)))
    for i in range(len(filename)):
        for j in range(len(filename)):
            resultKLD[i, j] = KL_divergence(acc_array_dict[filename_array[i]], acc_array_dict[filename_array[j]])
            resultJSD[i, j] = JS_divergence(acc_array_dict[filename_array[i]], acc_array_dict[filename_array[j]])

    # DataFrameに変換して出力する
    df_KLD = pd.DataFrame(resultKLD, index=Hz, columns=Hz)
    display(df_KLD)
    print(f"KLダイバージェンスによる推定精度は{calculate_accuracy(get_index_and_columns_of_second_smallest(df_KLD))}%です")

    df_JSD = pd.DataFrame(resultJSD, index=Hz, columns=Hz)
    display(df_JSD)
    print(f"JSダイバージェンスによる推定精度は{calculate_accuracy(get_index_and_columns_of_second_smallest(df_JSD))}%です")


In [15]:
path = "data/"
filename_and_Hz = get_filename_and_Hz(path)
filename_and_Hz.sort(reverse=True)
Hz, filename = divide_filename_and_Hz(filename_and_Hz)
Hz = [str(hz) + "Hz" for hz in Hz]

# AccX, AccY, AccZを一括で処理するために、辞書にまとめる
Acc_dict = {}
for i in filename:
    Acc_dict[i] = get_acceleration(path + i)
    Acc_dict[i] = remove_stationary_intervals(*Acc_dict[i])

# ndarrayに変換して、差分計算とJS/KLダイバージェンスの計算を一括で行う
acc_array_dict = {k: np.array(differences_of_acceleration(v)) for k, v in Acc_dict.items()}
filename_array = np.array(filename)
resultKLD = np.zeros((len(filename), len(filename)))
resultJSD = np.zeros((len(filename), len(filename)))
for i in range(len(filename)):
    for j in range(len(filename)):
        resultKLD[i, j] = KL_divergence(acc_array_dict[filename_array[i]], acc_array_dict[filename_array[j]])
        resultJSD[i, j] = JS_divergence(acc_array_dict[filename_array[i]], acc_array_dict[filename_array[j]])

# DataFrameに変換して出力する
df_KLD = pd.DataFrame(resultKLD, index=Hz, columns=Hz)
display(df_KLD)
print(f"KLダイバージェンスによる推定精度は{calculate_accuracy(get_index_and_columns_of_second_smallest(df_KLD))}%です")

df_JSD = pd.DataFrame(resultJSD, index=Hz, columns=Hz)
display(df_JSD)
print(f"JSダイバージェンスによる推定精度は{calculate_accuracy(get_index_and_columns_of_second_smallest(df_JSD))}%です")

TypeError: unsupported operand type(s) for ** or pow(): 'list' and 'int'