In [1]:
import pandas as pd
from scipy.signal import savgol_filter
from datetime import timedelta
import glob

In [2]:
class PARAMS:
    """
    A configuration class containing dataset paths, preprocessing parameters, 
    and constants used throughout the system.
    """

    class DATASET:
        """
        Configuration for dataset-related parameters, including file paths, 
        scaling factors, and column names.
        """

        # Paths to raw dataset files for actions and states
        action_path = "datasets/raw/rl2pd"
        state_path = "datasets/raw/pd2rl"

        # Scaling factor for action values
        action_scale = 1  

        # Motor ID for data extraction
        motor_id = 3  

        # Total duration of the dataset (120 seconds)
        duration = timedelta(seconds=120)  

        # List of all action and state dataset files
        action_files = glob.glob(f"{action_path}/*.csv")  
        state_files = glob.glob(f"{state_path}/*.csv")  

        # Column names in the dataset
        action_col_name = "action"  # Column name for action data
        qpos_col_name = "rbt_state_q"  # Column name for position data
        qvel_col_name = "rbt_state_dq"  # Column name for velocity data

        # File path for saving processed dataset
        DUMP_FILE = "datasets/generated/test.csv"  

    class PREPROCESS:
        """
        Configuration for preprocessing parameters such as lag features, 
        filtering, and resampling.
        """

        # Number of lag features to generate
        NUM_LAGS = 7  

        # Renamed column names for preprocessing
        qpos_name = "qpos_motor"  # Renamed position column
        qvel_name = "qvel_motor"  # Renamed velocity column
        qact_name = "qact_motor"  # Renamed action column

        # Filtering function to denoise signals (default: identity function)
        # FILTER = partial(savgol_filter, window_length=200, polyorder=3)  # Example for Savitzky-Golay filter
        FILTER = lambda x: x  # No filtering applied (identity function)

        # Resampling frequency for time-series data (e.g., downsampling to 1 ms intervals)
        resample = "1ms"  


In [3]:
def open_df(file_path: str) -> pd.DataFrame:
    """
    Reads a CSV file into a Pandas DataFrame, converts the 'time' column to a datetime index.

    Parameters:
        file_path (str): Path to the CSV file.

    Returns:
        pd.DataFrame: DataFrame with 'time' as the index.
    """
    df = pd.read_csv(file_path)
    df["time"] = pd.to_datetime(df["time"], unit="ns")
    df.set_index("time", inplace=True)
    return df


def get_crop_date(df: pd.DataFrame) -> pd.Timestamp:
    """
    Finds the first timestamp where a specified action column is non-zero.

    Parameters:
        df (pd.DataFrame): Input DataFrame containing action data.

    Returns:
        pd.Timestamp: Timestamp of the first non-zero action value.
    """
    crop_id = (
        df[f"action{PARAMS.DATASET.motor_id}"] != 0
    ).argmax()  # Finds the first nonzero value in the action column
    crop_date = df.index[crop_id]
    return crop_date


def generate_lags(df: pd.DataFrame, num_lags: int = PARAMS.PREPROCESS.NUM_LAGS) -> pd.DataFrame:
    """
    Generates lagged versions of all columns in the DataFrame.

    Parameters:
        df (pd.DataFrame): Input DataFrame.
        num_lags (int, optional): Number of lagged versions to generate. Default is PARAMS.PREPROCESS.NUM_LAGS.

    Returns:
        pd.DataFrame: DataFrame containing lagged features.
    """
    properties = df.columns
    lagged_dfs = []

    for lag_id in range(num_lags):
        lagged_df = df[properties].shift(-lag_id).add_suffix(f"_lag{lag_id}")
        lagged_dfs.append(lagged_df)

    df_lagged = pd.concat(lagged_dfs, axis=1)  # Efficient concatenation
    return df_lagged


def rename(df: pd.DataFrame) -> pd.DataFrame:
    """
    Renames columns in the DataFrame according to predefined mappings in PARAMS.

    Parameters:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with renamed columns.
    """
    cols = df.columns
    new_cols = [
        col.replace(PARAMS.DATASET.action_col_name, PARAMS.PREPROCESS.qact_name)
        .replace(PARAMS.DATASET.qpos_col_name, PARAMS.PREPROCESS.qpos_name)
        .replace(PARAMS.DATASET.qvel_col_name, PARAMS.PREPROCESS.qvel_name)
        for col in cols
    ]
    df.columns = new_cols
    return df


def denoize(df: pd.DataFrame) -> pd.DataFrame:
    """
    Applies a filtering function to specific columns in the DataFrame to remove noise.

    Parameters:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with filtered columns.
    """
    for col in df.columns:
        if (PARAMS.PREPROCESS.qpos_name in col) | (PARAMS.PREPROCESS.qvel_name in col):
            df[col] = PARAMS.PREPROCESS.FILTER(df[col])  # Apply predefined filter function
    return df


def save_df(df: pd.DataFrame) -> None:
    """
    Saves the DataFrame to a CSV file specified in PARAMS.

    Parameters:
        df (pd.DataFrame): DataFrame to be saved.

    Returns:
        None
    """
    df.to_csv(PARAMS.DATASET.DUMP_FILE, index=False)


In [4]:
data_list = []
for action_file, state_file in zip(
    PARAMS.DATASET.action_files, PARAMS.DATASET.state_files
):
    actions = open_df(action_file) * PARAMS.DATASET.action_scale
    states = open_df(state_file)
    crop_date_start = get_crop_date(actions)
    crop_date_end = crop_date_start + PARAMS.DATASET.duration
    data = pd.concat([actions, states], axis=1)
    data = data[crop_date_start:crop_date_end]
    data.interpolate(inplace=True)
    data.dropna(inplace=True)
    data = data.resample(PARAMS.PREPROCESS.resample).mean().interpolate()
    rename(data)
    data_raw = data.copy()
    data = denoize(data)
    data_denoized = data.copy()
    data = generate_lags(data)
    data.dropna(inplace=True)
    data.reset_index(inplace=True)
    data["time"] = (data["time"] - data["time"][0]).dt.total_seconds()
    data_list.append(data)
final_data = pd.concat(data_list, ignore_index=True)

In [5]:
save_df(final_data.drop(columns=["time"]))
final_data.head()

Unnamed: 0,time,qact_motor0_lag0,qact_motor1_lag0,qact_motor2_lag0,qact_motor3_lag0,qact_motor4_lag0,qact_motor5_lag0,qact_motor6_lag0,qact_motor7_lag0,qact_motor8_lag0,...,qpos_motor7_lag6,qvel_motor7_lag6,qpos_motor8_lag6,qvel_motor8_lag6,qpos_motor9_lag6,qvel_motor9_lag6,qpos_motor10_lag6,qvel_motor10_lag6,qpos_motor11_lag6,qvel_motor11_lag6
0,0.0,-0.013054,-0.013054,-0.013054,-0.013054,-0.013054,-0.013054,-0.013054,-0.013054,-0.013054,...,-0.050265,0.0,-3.100124,-0.004712,2.461752,0.0,0.028903,0.0,0.0,0.003142
1,0.001,-0.014121,-0.014121,-0.014121,-0.014121,-0.014121,-0.014121,-0.014121,-0.014121,-0.014121,...,-0.050265,0.0,-3.100124,0.001571,2.461752,0.0,0.028903,0.0,0.0,0.0
2,0.002,-0.015189,-0.015189,-0.015189,-0.015189,-0.015189,-0.015189,-0.015189,-0.015189,-0.015189,...,-0.050265,-0.004712,-3.100124,0.0,2.461752,0.0,0.028903,0.0,0.0,-0.003142
3,0.003,-0.016256,-0.016256,-0.016256,-0.016256,-0.016256,-0.016256,-0.016256,-0.016256,-0.016256,...,-0.050265,-0.002356,-3.100124,0.0,2.461752,0.0,0.028903,0.0,0.0,-0.002356
4,0.004,-0.01679,-0.01679,-0.01679,-0.01679,-0.01679,-0.01679,-0.01679,-0.01679,-0.01679,...,-0.050265,0.0,-3.100124,0.0,2.461752,0.0,0.028903,0.0,0.0,-0.001571
