In [None]:
import os 
import numpy as np
import pandas as pd
from scipy.signal import welch
import matplotlib.pyplot as plt
import seaborn as sns

from scipy.integrate import cumtrapz
import matplotlib.pyplot as plt

In [None]:
import os
import numpy as np
import pandas as pd
from scipy.signal import welch

def calculate_fft_metrics(df, fs, nperseg=500):
    fft_metrics = {}
    for axis in ['acc_x', 'acc_y', 'acc_z']:
        if axis in df.columns:
            valid_data = df[axis].dropna()
            if not valid_data.empty:
                nps = min(nperseg, len(valid_data))
                f, Pxx = welch(valid_data, fs=fs, nperseg=nps)
                dominant_freq = f[np.argmax(Pxx)]
                fft_metrics[f'{axis}_freq'] = f
                fft_metrics[f'{axis}_welch'] = Pxx
                fft_metrics[f'{axis}_dominant_freq'] = dominant_freq
                fft_metrics[f'{axis}_freq_energy'] = np.sum(Pxx)
    return fft_metrics

def process_fft(df, mode, nperseg=500):
    results = []
    for env_id, _df in df.groupby('env_id'):
        try:
            _df = _df.sort_values('time').copy()
            _df['time'] = pd.to_numeric(_df['time'], errors='coerce')
            _df = _df.dropna(subset=['time'])
            dt = _df['time'].diff().median()
            if pd.isna(dt) or dt == 0:
                print(f"[WARN] env_id={env_id}: invalid dt -> skip")
                continue
            fs = 1.0 / dt

            metrics = calculate_fft_metrics(_df, fs=fs, nperseg=nperseg)
            metrics.update({
                'env_id': env_id,                 
                'mode': mode,
                'duration': _df['time'].iloc[-1] - _df['time'].iloc[0],
                'n_samples': len(_df),
                'fs': fs,
            })
            results.append(metrics)
        except Exception as e:
            print(f"[ERROR] env_id={env_id}: {e}")

    return pd.DataFrame(results)


In [None]:
wheeled_mode_pos_df = pd.read_csv("teslabot_pos_100env_wheeled_mode.csv")
walking_mode_pos_df = pd.read_csv("teslabot_pos_100env_walking_mode.csv")

In [None]:
import numpy as np
import pandas as pd

def add_vel_acc(df: pd.DataFrame) -> pd.DataFrame:
    def _per_env(g):
        g = g.sort_values('time_s').copy()
        t = g['time_s'].to_numpy()

        for axis in ['x', 'y', 'z']:
            p = g[f'pos_{axis}'].to_numpy()
            v = np.gradient(p, t)     
            a = np.gradient(v, t)     

            g[f'vel_{axis}'] = v
            g[f'acc_{axis}'] = a
        return g

    return df.groupby('env_id', group_keys=False).apply(_per_env)

walking_mode_pos_df = add_vel_acc(walking_mode_pos_df)
wheeled_mode_pos_df = add_vel_acc(wheeled_mode_pos_df)

In [None]:
walking_mode_df = walking_mode_pos_df
wheeled_mode_df = wheeled_mode_pos_df

In [None]:
walking_mode_df = walking_mode_df.rename(columns={'time_s': 'time'})
wheeled_mode_df = wheeled_mode_df.rename(columns={'time_s': 'time'})

In [None]:
walking_fft_met = process_fft(walking_mode_df, mode='walking_mode', nperseg=500)
wheeled_fft_met = process_fft(wheeled_mode_df, mode='wheeled_mode', nperseg=500)


In [None]:
import numpy as np
import matplotlib.pyplot as plt

def mean_psd_on_common_grid(fft_met, axis='acc_x', fmax=None, n_grid=1024):
    f_list   = fft_met[f'{axis}_freq']
    psd_list = fft_met[f'{axis}_welch']

    # pandas.Series なら values で取り出す
    if hasattr(f_list, 'values'):
        f_list   = f_list.values
        psd_list = psd_list.values

    f_max_all = min(f[-1] for f in f_list)
    if fmax is not None:
        f_max_all = min(f_max_all, fmax)
    f_ref = np.linspace(0, f_max_all, n_grid)

    psd_stack = np.vstack([
        np.interp(f_ref, f, psd) for f, psd in zip(f_list, psd_list)
    ])

    psd_mean = psd_stack.mean(axis=0)
    return f_ref, psd_mean

for _mode, fft_met in [('walking_mode', walking_fft_met), ('wheeled_mode', wheeled_fft_met)]:
    for axis in ['acc_x', 'acc_y', 'acc_z']:
        f_ref, psd_mean = mean_psd_on_common_grid(fft_met, axis=axis, fmax=10.0, n_grid=1024)

        plt.figure(figsize=(4, 3))
        plt.plot(f_ref, psd_mean)
        plt.ylabel('Power spectral density')
        plt.xlabel('Frequency [Hz]')
        plt.title(f'{axis[-1]} ({_mode})')
        plt.tight_layout()
        plt.savefig(f'psd_{_mode}_{axis[-1]}.png')
        plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

def mean_psd_on_common_grid(fft_met, axis="acc_x", fmax=10.0, n_grid=1024):
    f_list   = fft_met[f"{axis}_freq"].values
    psd_list = fft_met[f"{axis}_welch"].values

    f_max_all = min(min(f[-1] for f in f_list), fmax)
    f_ref = np.linspace(0.0, f_max_all, n_grid)

    psd_interp = np.vstack([
        np.interp(f_ref, f, psd, left=0.0, right=0.0)
        for f, psd in zip(f_list, psd_list)
    ])
    return f_ref, psd_interp.mean(axis=0)

modes = {
    "Walking":  (walking_fft_met, "tab:red"),
    "Wheeled":  (wheeled_fft_met, "tab:blue"),
}

for axis in ["acc_x", "acc_y", "acc_z"]:
    plt.figure(figsize=(5, 3))

    for mode_name, (df_fft, color) in modes.items():
        if f"{axis}_freq" not in df_fft.columns:
            continue

        f_ref, psd_mean = mean_psd_on_common_grid(
            df_fft, axis=axis, fmax=10.0, n_grid=1024
        )

        plt.plot(
            f_ref, psd_mean,
            label=mode_name,
            color=color,
            linewidth=1.8
        )

    axis_tag = axis[-1].upper()
    plt.xlabel("Frequency [Hz]")
    plt.ylabel("PSD [$(m/s^2)^2/Hz$]")
    plt.title(f"Mean PSD ({axis_tag}‑axis)")
    plt.legend(loc="upper right")
    plt.grid(True, ls="--", lw=0.4, alpha=0.6)  
    plt.tight_layout()
    plt.savefig(f"psd_axis{axis_tag}.png", dpi=200)
    plt.show()
