Skip to content

Commit

Permalink
add event builders
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailChaikovskii committed Nov 1, 2023
1 parent dd10ce8 commit 808f14d
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 69 deletions.
48 changes: 24 additions & 24 deletions app/mcts_run_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
FinalPositionCriterion, GraspTimeCriterion, InstantContactingLinkCriterion,
InstantForceCriterion, InstantObjectCOGCriterion, SimulationReward,
TimeCriterion)
from rostok.criterion.simulation_flags import (EventContact,
EventContactTimeOut,
EventFlyingApart, EventGrasp,
EventSlipOut,
EventStopExternalForce)
from rostok.criterion.simulation_flags import (EventContactBuilder,
EventContactTimeOutBuilder,
EventFlyingApartBuilder, EventGraspBuilder,
EventSlipOutBuilder,
EventStopExternalForceBuilder)
from rostok.simulation_chrono.simulation_scenario import GraspScenario
from rostok.trajectory_optimizer.control_optimizer import (
CalculatorWithOptimizationDirect, TendonOptimizerCombinationForce)
Expand All @@ -27,42 +27,42 @@ def config_independent_torque(grasp_object_blueprint):
# simulation_manager.grasp_object_callback = lambda: creator.create_environment_body(
# grasp_object_blueprint)
simulation_manager.grasp_object_callback = grasp_object_blueprint
event_contact = EventContact()
simulation_manager.add_event(event_contact)
event_timeout = EventContactTimeOut(hp.FLAG_TIME_NO_CONTACT, event_contact)
simulation_manager.add_event(event_timeout)
event_flying_apart = EventFlyingApart(hp.FLAG_FLYING_APART)
simulation_manager.add_event(event_flying_apart)
event_slipout = EventSlipOut(hp.FLAG_TIME_SLIPOUT)
simulation_manager.add_event(event_slipout)
event_grasp = EventGrasp(
event_contact_builder = EventContactBuilder()
simulation_manager.add_event_builder(event_contact_builder)
event_timeout_builder = EventContactTimeOutBuilder(hp.FLAG_TIME_NO_CONTACT, event_contact_builder)
simulation_manager.add_event_builder(event_timeout_builder)
event_flying_apart_builder = EventFlyingApartBuilder(hp.FLAG_FLYING_APART)
simulation_manager.add_event_builder(event_flying_apart_builder)
event_slipout_builder = EventSlipOutBuilder(hp.FLAG_TIME_SLIPOUT)
simulation_manager.add_event_builder(event_slipout_builder)
event_grasp_builder = EventGraspBuilder(
grasp_limit_time=hp.GRASP_TIME,
contact_event=event_contact,
contact_event_builder=event_contact_builder,
verbosity=0,
)
simulation_manager.add_event(event_grasp)
event_stop_external_force = EventStopExternalForce(grasp_event=event_grasp,
simulation_manager.add_event_builder(event_grasp_builder)
event_stop_external_force = EventStopExternalForceBuilder(grasp_event_builder=event_grasp_builder,
force_test_time=hp.FORCE_TEST_TIME)
simulation_manager.add_event(event_stop_external_force)
simulation_manager.add_event_builder(event_stop_external_force)

#create criterion manager
simulation_rewarder = SimulationReward(verbosity=0)
#create criterions and add them to manager
simulation_rewarder.add_criterion(TimeCriterion(hp.GRASP_TIME, event_timeout, event_grasp),
simulation_rewarder.add_criterion(TimeCriterion(hp.GRASP_TIME, event_timeout_builder, event_grasp_builder),
hp.TIME_CRITERION_WEIGHT)
#simulation_rewarder.add_criterion(ForceCriterion(event_timeout), hp.FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantContactingLinkCriterion(event_grasp),
simulation_rewarder.add_criterion(InstantContactingLinkCriterion(event_grasp_builder),
hp.INSTANT_CONTACTING_LINK_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantForceCriterion(event_grasp),
simulation_rewarder.add_criterion(InstantForceCriterion(event_grasp_builder),
hp.INSTANT_FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantObjectCOGCriterion(event_grasp),
simulation_rewarder.add_criterion(InstantObjectCOGCriterion(event_grasp_builder),
hp.OBJECT_COG_CRITERION_WEIGHT)
n_steps = int(hp.GRASP_TIME / hp.TIME_STEP_SIMULATION)
print(n_steps)
simulation_rewarder.add_criterion(GraspTimeCriterion(event_grasp, n_steps),
simulation_rewarder.add_criterion(GraspTimeCriterion(event_grasp_builder, n_steps),
hp.GRASP_TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(
FinalPositionCriterion(hp.REFERENCE_DISTANCE, event_grasp, event_slipout),
FinalPositionCriterion(hp.REFERENCE_DISTANCE, event_grasp_builder, event_slipout_builder),
hp.FINAL_POSITION_CRITERION_WEIGHT)

control_optimizer = CalculatorWithOptimizationDirect(simulation_manager, simulation_rewarder,
Expand Down
78 changes: 43 additions & 35 deletions rostok/criterion/criterion_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import numpy as np
from scipy.spatial import distance

from rostok.criterion.simulation_flags import (EventContactTimeOut, EventGrasp,
EventSlipOut)
from rostok.criterion.simulation_flags import (EventContactTimeOutBuilder, EventGraspBuilder,
EventSlipOutBuilder)
from rostok.simulation_chrono.simulation_utils import SimulationResult
from rostok.utils.json_encoder import RostokJSONEncoder

Expand Down Expand Up @@ -40,10 +40,10 @@ class TimeCriterion(Criterion):
event_grasp (EventGrasp): event of object grasping
"""

def __init__(self, time: float, event_timeout: EventContactTimeOut, event_grasp: EventGrasp):
def __init__(self, time: float, event_timeout_builder: EventContactTimeOutBuilder, event_grasp_builder: EventGraspBuilder):
self.max_simulation_time = time
self.event_timeout = event_timeout
self.event_grasp = event_grasp
self.event_timeout_builder = event_timeout_builder
self.event_grasp_builder = event_grasp_builder

def calculate_reward(self, simulation_output: SimulationResult):
"""Return 1 for all robots that can grasp the object.
Expand All @@ -53,10 +53,12 @@ def calculate_reward(self, simulation_output: SimulationResult):
Returns:
float: reward of the criterion
"""
if self.event_grasp.state:
event_container = simulation_output.event_container
event_grasp = self.event_grasp_builder.find_event(event_list=event_container)
if event_grasp.state:
return 1

if self.event_timeout.state:
event_timeout=self.event_timeout_builder.find_event(event_list=event_container)
if event_timeout.state:
return 0
else:
time = simulation_output.time
Expand All @@ -73,8 +75,8 @@ class ForceCriterion(Criterion):
event_timeout (EventContactTimeOut): event of contact time out
"""

def __init__(self, event_timeout: EventContactTimeOut):
self.event_timeout = event_timeout
def __init__(self, event_timeout_builder: EventContactTimeOutBuilder):
self.event_timeout_builder = event_timeout_builder

def calculate_reward(self, simulation_output: SimulationResult) -> float:
"""Return 0 if there is no contact. For every step where object is in contact with robot
Expand All @@ -84,8 +86,8 @@ def calculate_reward(self, simulation_output: SimulationResult) -> float:
Returns:
float: reward of the criterion
"""

if self.event_timeout.state:
event_timeout=self.event_timeout_builder.find_event(event_list=simulation_output.event_container)
if event_timeout.state:
return 0
else:
env_data = simulation_output.environment_final_ds
Expand Down Expand Up @@ -115,19 +117,20 @@ class InstantObjectCOGCriterion(Criterion):
event_grasp (EventGrasp): event of object grasping
"""

def __init__(self, grasp_event: EventGrasp):
self.grasp_event = grasp_event
def __init__(self, grasp_event_builder: EventGraspBuilder):
self.grasp_event_builder = grasp_event_builder

def calculate_reward(self, simulation_output: SimulationResult):
"""Calculate the reward based on distance between object COG and force centroid in the moment of grasp.
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
grasp_event = self.grasp_event_builder.find_event(event_list=simulation_output.event_container)
if grasp_event.state:
env_data = simulation_output.environment_final_ds
body_COG = env_data.get_data("COG")[0][self.grasp_event.step_n + 1]
body_outer_force_center = env_data.get_data("force_center")[0][self.grasp_event.step_n +
body_COG = env_data.get_data("COG")[0][grasp_event.step_n + 1]
body_outer_force_center = env_data.get_data("force_center")[0][grasp_event.step_n +
1]
if body_outer_force_center is np.nan:
return 0
Expand All @@ -144,18 +147,19 @@ class InstantForceCriterion(Criterion):
event_grasp (EventGrasp): event of object grasping
"""

def __init__(self, grasp_event: EventGrasp):
self.grasp_event = grasp_event
def __init__(self, grasp_event_builder: EventGraspBuilder):
self.grasp_event_builder = grasp_event_builder

def calculate_reward(self, simulation_output: SimulationResult):
"""Calculate std of force modules in the grasp moment and calculate reward using it.
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
grasp_event = self.grasp_event_builder.find_event(event_list=simulation_output.event_container)
if grasp_event.state:
env_data = simulation_output.environment_final_ds
body_contacts: np.ndarray = env_data.get_data("forces")[0][self.grasp_event.step_n + 1]
body_contacts: np.ndarray = env_data.get_data("forces")[0][grasp_event.step_n + 1]
if len(body_contacts) > 0:
forces = []
for force in body_contacts:
Expand All @@ -175,22 +179,23 @@ class InstantContactingLinkCriterion(Criterion):
event_grasp (EventGrasp): event of object grasping
"""

def __init__(self, grasp_event: EventGrasp):
self.grasp_event = grasp_event
def __init__(self, grasp_event_builder: EventGraspBuilder):
self.grasp_event_builder = grasp_event_builder

def calculate_reward(self, simulation_output: SimulationResult):
"""The reward is the fraction of the links that contacts with object in the grasp moment.
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
grasp_event = self.grasp_event_builder.find_event(event_list=simulation_output.event_container)
if grasp_event.state:
robot_data = simulation_output.robot_final_ds
robot_contacts = robot_data.get_data("n_contacts")
n_bodies = len(robot_contacts.keys())
contacting_bodies = 0
for _, contacts in robot_contacts.items():
if contacts[self.grasp_event.step_n + 1] > 0:
if contacts[grasp_event.step_n + 1] > 0:
contacting_bodies += 1

return contacting_bodies / n_bodies
Expand All @@ -206,8 +211,8 @@ class GraspTimeCriterion(Criterion):
total_steps (total_steps): the amount of the possible steps
"""

def __init__(self, grasp_event: EventGrasp, total_steps: int):
self.grasp_event = grasp_event
def __init__(self, grasp_event_builder: EventGraspBuilder, total_steps: int):
self.grasp_event_builder = grasp_event_builder
self.total_steps = total_steps

def calculate_reward(self, simulation_output: SimulationResult):
Expand All @@ -216,8 +221,9 @@ def calculate_reward(self, simulation_output: SimulationResult):
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
return (self.total_steps - self.grasp_event.step_n) / self.total_steps
grasp_event = self.grasp_event_builder.find_event(event_list=simulation_output.event_container)
if grasp_event.state:
return (self.total_steps - grasp_event.step_n) / self.total_steps
else:
return 0

Expand All @@ -231,21 +237,23 @@ class FinalPositionCriterion(Criterion):
slipout_event (EventSlipOut): event of object slip out
"""

def __init__(self, reference_distance: float, grasp_event: EventGrasp,
slipout_event: EventSlipOut):
def __init__(self, reference_distance: float, grasp_event_builder: EventGraspBuilder,
slipout_event_builder: EventSlipOutBuilder):
self.reference_distance = reference_distance
self.grasp_event = grasp_event
self.slipout_event = slipout_event
self.grasp_event_builder = grasp_event_builder
self.slipout_event_builder = slipout_event_builder

def calculate_reward(self, simulation_output: SimulationResult):
"""The reward is 1 - dist(position in the grasp moment, position in the final moment)/(reference distance)
Returns:
float: reward of the criterion
"""
if self.grasp_event.state and not self.slipout_event.state:
grasp_event = self.grasp_event_builder.find_event(event_list=simulation_output.event_container)
slipout_event = self.slipout_event_builder.find_event(event_list=simulation_output.event_container)
if grasp_event.state and not slipout_event.state:
env_data = simulation_output.environment_final_ds
grasp_pos = env_data.get_data("COG")[0][self.grasp_event.step_n + 1]
grasp_pos = env_data.get_data("COG")[0][grasp_event.step_n + 1]
final_pos = env_data.get_data("COG")[0][-1]
dist = distance.euclidean(grasp_pos, final_pos)
if dist <= self.reference_distance:
Expand Down
Loading

0 comments on commit 808f14d

Please sign in to comment.