In [3]:
class RawData:
    def __init__(
        self,
        env_name,
        brain_name,
        search_seed,
        env_seed,
        env_sdf,
        goal,
        robot_urdf,
        score,
    ):
        self.env_name = env_name
        self.brain_name = brain_name
        self.search_seed = search_seed
        self.env_seed = env_seed
        self.env_sdf = env_sdf
        self.goal = goal
        self.robot_urdf = robot_urdf
        self.score = score

    def __str__(self):
        return (
            f"Data(\n"
            f"  env_name={self.env_name},\n"
            f"  brain_name={self.brain_name},\n"
            f"  search_seed={self.search_seed},\n"
            f"  env_seed={self.env_seed},\n"
            f"  env_sdf={self.env_sdf[0:50]},\n"
            f"  goal={self.goal},\n"
            f"  robot_urdf={self.robot_urdf[0:50]},\n"
            f"  score={self.score}\n"
            ")"
        )

# read Raw Data

In [2]:
import os
import re

data_dir = "../final/data"
env_pattern = r"terrain\d+"
seed_list = ["seed1234", "seed2345"] 
brain_type = "ea"

fitness_file = "best_fitness.txt"
body_file = "body.urdf"
brain_file = "brain-100.nndf"
world_file = "world.sdf"
            

In [3]:
def read_file(file_path):
    with open(file_path,"r") as f:
        file_str = f.read()
    return file_str

In [4]:
raw_data_dirs = []
raw_data_list = []
for e in os.listdir(data_dir):
    env_path = os.path.join(data_dir,e)
    match = re.match(r"[A-Za-z]+(\d+)", e)
    if match:
        env_seed = int(match.group(1))
    if os.path.isdir(env_path) and re.match(env_pattern,e):
        for s in os.listdir(env_path):
            seed_path = os.path.join(env_path,s)
            if os.path.isdir(seed_path) and (s in seed_list):
                for b in os.listdir(seed_path):
                    if re.match(r"body\d+",b):
                        body_path = os.path.join(seed_path,b)
                        raw_data_dirs.append(body_path)
                        raw_data = RawData(
                            env_name=e,
                            brain_name=brain_type,
                            search_seed=s,
                            env_seed=env_seed,
                            env_sdf=read_file(os.path.join(env_path, world_file)),
                            goal=(100, 0, 0),
                            robot_urdf=read_file(os.path.join(body_path, body_file)),
                            score=(float(read_file(os.path.join(body_path, fitness_file))), 0 , 0),
                        )
                        raw_data_list.append(raw_data)

In [9]:
str(raw_data_list[120])

'Data(\n  env_name=terrain36199,\n  brain_name=ea,\n  search_seed=seed2345,\n  env_seed=36199,\n  env_sdf=<sdf>\n    <model name="terrain0">\n        <pose>1.,\n  goal=(100, 0, 0),\n  robot_urdf=<robot name = "robot">\n    <link name="link1">\n   ,\n  score=(0.6332962657929321, 0, 0)\n)'

# save data

In [8]:
from datetime import datetime
import pickle

now = datetime.now()
formatted_now = now.strftime("%y%m%d%H%M")
with open(f"./data/rawdata_{formatted_now}.pkl", "wb") as file:
    pickle.dump(raw_data_list, file)

# read data

In [4]:
import pickle
def read_data(file_path):
    with open(file_path, "rb") as f:
        loaded_data = pickle.load(f)
    return loaded_data

In [5]:
raw_data= read_data("./data/rawdata_2304040613.pkl")

In [6]:
str(raw_data[0])

'Data(\n  env_name=terrain36199,\n  brain_name=ea,\n  search_seed=seed1234,\n  env_seed=36199,\n  env_sdf=<sdf>\n    <model name="terrain0">\n        <pose>1.,\n  goal=(100, 0, 0),\n  robot_urdf=<robot name = "robot">\n    <link name="link1">\n   ,\n  score=(0.24942633879748496, 0, 0)\n)'

# transform data

In [1]:
def convert_urdf_to_sequence(urdf_string):
    root = ET.fromstring(urdf_string)
    # Extract the desired information
    sequence = []
    for element in root:
        if element.tag == "joint":
            parent_name = element.find("parent").attrib["link"]
            node_name = element.find("child").attrib["link"]
            joint_axis = element.find("axis").attrib["xyz"]
            joint_type = element.attrib["type"]
            sequence.append((parent_name, node_name, joint_axis, joint_type))
        elif element.tag == "link":
            node_name = element.attrib["name"]
            # Extract the link type and size from the visual geometry
            visual_geometry = element.find("visual/geometry")
            link_type = None
            link_size = None
            if visual_geometry is not None:
                link_type = list(visual_geometry)[0].tag
                if link_type == "sphere":
                    radius = float(visual_geometry.find("sphere").attrib["radius"])
                    link_size = (radius,radius,radius)
                elif link_type == "box":
                    size_str = visual_geometry.find("box").attrib["size"]
                    link_size = tuple([float(s) for s in size_str.split()])
                elif link_type == "cylinder":
                    length = float(visual_geometry.find("cylinder").attrib["length"])
                    radius = float(visual_geometry.find("cylinder").attrib["radius"])
                    link_size = (radius,radius,length)

            # Extract the sensor tag (assuming it's stored in the material name attribute)
            sensor_color = element.find("visual/material").attrib["name"] if element.find("visual/material") is not None else "unknown"
            sensor_tag = True if "sensored" in sensor_color else False
            sequence.append((node_name, link_type, link_size, sensor_tag))
    return sequence

In [2]:
def convert_sdf_to_gridmap(sdf_string, return_submap=False):
    # Parse the SDF string
    root = ET.fromstring(sdf_string)
    # Create a grid map representation
    grid_resolution = 1  # Adjust as needed
    grid_size = [10, 10]  # Adjust as needed (this should be large enough to cover the entire space)
    grid_map = np.zeros(grid_size, dtype=np.double) #grid_map = np.zeros(grid_size, dtype=np.uint8)
    # Iterate over each model in the SDF file
    for model in root.findall(".//model"):
        # Extract box dimensions and pose
        box_size = model.find(".//box/size").text.split()
        box_size = [float(dim) for dim in box_size]
        box_pose = model.find(".//pose").text.split()
        box_position = [float(coord) for coord in box_pose[:3]]
        box_rotation = [float(angle) for angle in box_pose[3:]]
        # Fill in the grid cells corresponding to the box
        offset = [int(coord / grid_resolution + size / 2) for (coord, size) in zip(box_position[0:2],grid_size)]
        for i in range(offset[0], min(grid_size[0], offset[0] + int(box_size[0] / grid_resolution))):
            for j in range(offset[1], min(grid_size[1], offset[1] + int(box_size[1] / grid_resolution))):
                grid_map[i, j] = box_size[2] #1
    if return_submap:
        return grid_map[6:10,3:7]
    else:
        return grid_map

In [5]:
import numpy as np
import xml.etree.ElementTree as ET

from src.utils.data_utils import PreprocessedData as PreprocessedData
from src.utils.data_utils import read_data as read_data
from src.utils.data_utils import save_data as save_data

class DataTransformer:
    def __init__(self):
        pass
    
    def generate_sequence(self, data_file, save_dir="./data/preprocessed", subset=None):   
        raw_data = read_data(data_file)
        processed_data = []
        for single_data in raw_data[0:subset]:
            sdf_sequence = convert_sdf_to_gridmap(single_data.env_sdf, return_submap=True)
            goal_sequence = single_data.goal
            urdf_sequence = convert_urdf_to_sequence(single_data.robot_urdf)
            score_sequence = single_data.score
            processed_data.append(PreprocessedData(sdf_sequence, goal_sequence, urdf_sequence, score_sequence))
        
        file_path = save_data(save_dir, processed_data)
        return file_path
        

In [7]:
data_tf = DataTransformer()
prepcossed_file = data_tf.generate_sequence("./data/raw/rawdata_2304041648.pkl",subset=10)

In [14]:
preprocessed_data = read_data(prepcossed_file)

# test

In [3]:
from src.utils.data_utils import read_data as read_data


In [4]:
preprocess_file = "./data/preprocessed/data_2304041728.pkl"

In [13]:
preprocessed_data = read_data(preprocess_file)

In [15]:
str(preprocessed_data[0])

"grid map: [[0.11291883 0.17559345 0.18491636 0.01630437]\n [0.14999127 0.0477891  0.00673156 0.16555812]\n [0.1934127  0.17415358 0.13729455 0.04104825]\n [0.04599994 0.04189273 0.18438424 0.12505994]], goal: (100, 0, 0), urdf: [('link1', 'sphere', (0.07622079994336455, 0.07622079994336455, 0.07622079994336455), False), ('link2', 'cylinder', (0.09602669424199578, 0.09602669424199578, 0.21849463163998928), True), ('link3', 'cylinder', (0.08972565871825965, 0.08972565871825965, 0.3602986477224373), True), ('link1', 'link2', '0 1 0', 'spherical'), ('link2', 'link3', '0 1 0', 'revolute')], score: (0.24942633879748496, 0, 0)"