In [1]:
import bagpy
from bagpy import bagreader
import pandas as pd

Python older than 3.7 detected. 


In [2]:
def filter_mgs(msg_csv, pattern):
    data = open(msg_csv, "r").readlines()
    filtered_data = [data[0]]

    for i, line in enumerate(data[1:]):
        if line.find(pattern) > 0:
            filtered_data.append(line)

    out_csv = msg_csv + "_filtered.csv"
    open(out_csv, "w").write("".join(filtered_data))
    return out_csv

In [3]:
def load_bag_topic_to_df(bag, topic_name, filter=False, filter_pattern=""):
    msg = bag.message_by_topic(topic_name)
    
    if filter:
        msg = filter_mgs(msg, filter_pattern)
    
    df = pd.read_csv(msg)
    return df

In [4]:
def extract_bag_to_data(bag_name):
    
    bag = bagreader(bag_name)
    
    topic_name = "/roboy/pinky/sensing/external_joint_states"
    df_joint = load_bag_topic_to_df(bag, topic_name, True, "shoulder_left")
    
    topic_name = "/roboy/pinky/middleware/MagneticSensor"
    df_magnetic = load_bag_topic_to_df(bag, topic_name, True, "0,b")
    
    topic_name = "/tracking_loss"
    df_tracking_loss = load_bag_topic_to_df(bag, topic_name)
    
    df = pd.merge_asof(df_magnetic, df_joint, on="Time", direction="nearest")
    df = pd.merge_asof(df, df_tracking_loss, on="Time", direction="nearest")
    
    df = df.loc[df['data'] == False]
    
    df_final = df[['Time'] + 
              [f'{coord}_{id}' for coord in ['x','y','z'] for id in range(4)] +
              [f'position_{id}' for id in range(3)]].copy()
    
    return df_final

In [5]:
bag_names = [
    "./bags_data/shoulder_left_5_tendon_aug_11_elbow_both_axises.bag",
    "./bags_data/shoulder_left_6_tendon_aug_11_elbow_both_axises.bag",
    "./bags_data/shoulder_left_7_tendon_aug_11_elbow_both_axises.bag",
    "./bags_data/shoulder_left_9_tendon_aug_11_animus.bag",
    "./bags_data/shoulder_left_10_tendon_aug_11_animus.bag",
]

csv_outs = [
    "./training_data/shoulder_left_5.csv",
    "./training_data/shoulder_left_6.csv",
    "./training_data/shoulder_left_7.csv",
    "./training_data/shoulder_left_9.csv",
    "./training_data/shoulder_left_10.csv",
]

for bag_name, out in zip(bag_names, csv_outs):
    df_final = extract_bag_to_data(bag_name)
    df_final.to_csv(out, index=False)

[INFO]  Data folder ./bags_data/shoulder_left_5_tendon_aug_11_elbow_both_axises already exists. Not creating.
[INFO]  Data folder ./bags_data/shoulder_left_6_tendon_aug_11_elbow_both_axises already exists. Not creating.
[INFO]  Data folder ./bags_data/shoulder_left_7_tendon_aug_11_elbow_both_axises already exists. Not creating.
[INFO]  Data folder ./bags_data/shoulder_left_9_tendon_aug_11_animus already exists. Not creating.
[INFO]  Data folder ./bags_data/shoulder_left_10_tendon_aug_11_animus already exists. Not creating.
