Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CARLA Operator #279

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions data_gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import erdos

import pylot.flags
import pylot.component_creator
import pylot.operator_creator
import pylot.simulation.utils
import pylot.utils
Expand Down Expand Up @@ -61,10 +62,8 @@ def main(argv):
ground_stop_signs_stream,
vehicle_id_stream,
open_drive_stream,
global_trajectory_stream,
) = pylot.operator_creator.add_simulator_bridge(
control_loop_stream, release_sensor_stream,
pipeline_finish_notify_stream)
) = pylot.component_creator.add_simulator(control_loop_stream,
pipeline_finish_notify_stream)

# Add sensors.
rgb_camera_setup = RGBCameraSetup('center_camera',
Expand Down
24 changes: 20 additions & 4 deletions pylot.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def driver():
pipeline_finish_notify_stream = erdos.IngestStream()
notify_streams = []

# Create operator that bridges between pipeline and the simulator.
# Create operators that bridge between pipeline and the simulator.
(
pose_stream,
pose_stream_for_control,
Expand All @@ -44,13 +44,29 @@ def driver():
ground_stop_signs_stream,
vehicle_id_stream,
open_drive_stream,
global_trajectory_stream,
) = pylot.operator_creator.add_simulator_bridge(
) = pylot.component_creator.add_simulator(
control_loop_stream,
release_sensor_stream,
pipeline_finish_notify_stream,
)

# (
# pose_stream,
# pose_stream_for_control,
# ground_traffic_lights_stream,
# ground_obstacles_stream,
# ground_speed_limit_signs_stream,
# ground_stop_signs_stream,
# vehicle_id_stream,
# open_drive_stream,
# ) = pylot.operator_creator.add_simulator_bridge_old(
# control_loop_stream,
# release_sensor_stream,
# pipeline_finish_notify_stream,
# )

global_trajectory_stream = erdos.IngestStream()
streams_to_send_top_on.append(global_trajectory_stream)

# Add sensors.
center_camera_setup = RGBCameraSetup('center_camera',
FLAGS.camera_image_width,
Expand Down
56 changes: 56 additions & 0 deletions pylot/component_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,62 @@
logger = logging.getLogger(__name__)


def add_simulator(control_stream, pipeline_finish_notify_stream):
"""Adds operators to interface with and retrieve ground truth information
from the simulator.

Args:
control_stream: Control commands to actuate the ego vehicle.
pipeline_finish_notify_stream: Sends watermarks when the pipeline
completes execution. Used in pseudo-async mode.

Returns:
pose_stream: Sends the ego vehicle's pose at the frequency provided by
``--simulator_localization_frequency``.
pose_stream_for_control: Sends the ego vehicle's pose at the frequency
provided by ``--simulator_control_frequency``.
ground_traffic_lights_stream: Sends the state of all traffic lights.
ground_obstacles_stream: Sends the locations and bounding boxes of all
vehicles and pedestrians.
ground_speed_limit_signs_stream: Sends the locations and values of all
speed limit signs.
ground_stop_signs_stream: Sends the locations and values of all stop
signs.
vehicle_id_stream: Sends the ID of the ego vehicle, followed by a top
watermark.
open_drive_stream: Sends the map in OpenDRIVE format, followed by a top
watermark.
"""
vehicle_id_stream = pylot.operator_creator.add_simulator_bridge(
control_stream, pipeline_finish_notify_stream)
pose_stream = pylot.operator_creator.add_pose(
vehicle_id_stream, FLAGS.simulator_localization_frequency)
pose_stream_for_control = pylot.operator_creator.add_pose(
vehicle_id_stream, FLAGS.simulator_control_frequency,
'pose_for_control')
ground_traffic_lights_stream = (
pylot.operator_creator.add_simulator_traffic_lights(vehicle_id_stream))
ground_obstacles_stream = pylot.operator_creator.add_simulator_obstacles(
vehicle_id_stream)
ground_speed_limit_signs_stream = (
pylot.operator_creator.add_simulator_speed_limit_signs(
vehicle_id_stream))
ground_stop_signs_stream = pylot.operator_creator.add_simulator_stop_signs(
vehicle_id_stream)
open_drive_stream = pylot.operator_creator.add_simulator_open_drive()

return (
pose_stream,
pose_stream_for_control,
ground_traffic_lights_stream,
ground_obstacles_stream,
ground_speed_limit_signs_stream,
ground_stop_signs_stream,
vehicle_id_stream,
open_drive_stream,
)


def add_obstacle_detection(center_camera_stream,
center_camera_setup=None,
pose_stream=None,
Expand Down
145 changes: 145 additions & 0 deletions pylot/drivers/carla_base_gnss_driver_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""This module implements an operator which executes a callback at a the
provided frequency.

The operator attaches a GNSS sensor to the ego vehicle, receives GNSS
measurements from the simulator, and invokes the user-defined callback.
"""
import threading
from abc import abstractmethod

import carla

import erdos

from pylot.drivers.sensor_setup import GNSSSetup
from pylot.simulation.utils import get_vehicle_handle, get_world, \
set_simulation_mode


class CarlaBaseGNSSDriverOperator(erdos.Operator):
"""Invokes ``process_gnss`` at the provided frequency.

This operator attaches to a vehicle at the required position with respect
to the vehicle, registers callback functions to execute a ``process_gnss``
at the provided frequency.

Attributes:
_vehicle_id_stream (ReadStream): Stream on which the operator receives
the id of the ego vehicle. The ID is used to get a simulator handle
to the vehicle.
_output_stream (WriteStream): Stream on which the operator sends
messages.
_flags: Object to be used to access absl flags.
_logger (logging.Logger): Used to log information.
_gnss_setup (GNSSSetup): Setup of the GNSS sensor.
_frequency (float): Rate at which the callback is invoked.
_vehicle (Optional[carla.Vehicle]): Handle to the CARLA vehicle.
Initialized once the vehicle ID is received.
_world (Optional[carla.World]): Handle to the CARLA world. Initialized
once the vehicle ID is received.
_gnss (Optional[carla.Actor]): Handle to the CARLA GNSS sensor.
Initialized once the vehicle ID is received.
_log (threading.Lock): used to ensure that only 1 GNSS reading is
processed at a time.


Args:
vehicle_id_stream: Stream on which the operator receives the id of the
ego vehicle. The ID is used to get a simulator handle to the
vehicle.
output_stream: Stream on which the operator sends messages.
gnss_setup: Setup of the GNSS sensor.
frequency: Rate at which the pose is published, in Hertz. Set to -1 to
invoke on every simulator tick.
flags: Object used to access absl flags.
"""
def __init__(self, vehicle_id_stream: erdos.ReadStream,
output_stream: erdos.WriteStream, gnss_setup: GNSSSetup,
frequency: float, flags):
# Save the streams.
self._vehicle_id_stream = vehicle_id_stream
self._output_stream = output_stream

# Save the flags and initialize logging.
self._flags = flags
self._logger = erdos.utils.setup_logging(self.config.name,
self.config.log_file_name)

# Save the setup, the vehicle, the world, and the sensor.
self._gnss_setup = gnss_setup
self._frequency = frequency
self._vehicle = None
self._world = None
self._gnss = None
self._lock = threading.Lock()

@staticmethod
def connect(ground_vehicle_id_stream):
gnss_stream = erdos.WriteStream()
return [gnss_stream]

@abstractmethod
def process_gnss(self, timestamp: erdos.Timestamp,
gnss_msg: carla.GnssMeasurement):
"""Invoked when a GNSS measurement is received from the simulator.

Note:
Only 1 invocation of this callback will run at a time.
"""
raise NotImplementedError

def on_gnss(self, gnss_msg: carla.GnssMeasurement):
"""Invoked when a GNSS measurement is received from the simulator.
"""
game_time = int(gnss_msg.timestamp * 1000)
timestamp = erdos.Timestamp(coordinates=[game_time])
with erdos.profile(self.config.name + '.process_gnss',
self,
event_data={'timestamp': str(timestamp)}):
with self._lock:
self.process_gnss(timestamp, gnss_msg)

def run(self):
# Read the vehicle ID from the vehicle ID stream.
vehicle_id = self._vehicle_id_stream.read().data
self._logger.debug("received the vehicle id: {}".format(vehicle_id))

# Connect to the world.
_, self._world = get_world(self._flags.simulator_host,
self._flags.simulator_port,
self._flags.simulator_timeout)
set_simulation_mode(self._world, self._flags)

# Retrieve the vehicle and install the GNSS sensor.
self._vehicle = get_vehicle_handle(self._world, vehicle_id)
gnss_blueprint = self._world.get_blueprint_library().find(
'sensor.other.gnss')

# Set the noise and bias parameters.
gnss_blueprint.set_attribute('noise_alt_stddev',
str(self._flags.gnss_noise_stddev_alt))
gnss_blueprint.set_attribute('noise_lat_stddev',
str(self._flags.gnss_noise_stddev_lat))
gnss_blueprint.set_attribute('noise_lon_stddev',
str(self._flags.gnss_noise_stddev_lon))
gnss_blueprint.set_attribute('noise_alt_bias',
str(self._flags.gnss_bias_alt))
gnss_blueprint.set_attribute('noise_lat_bias',
str(self._flags.gnss_bias_lat))
gnss_blueprint.set_attribute('noise_lon_bias',
str(self._flags.gnss_bias_lon))

if self._frequency == -1:
gnss_blueprint.set_attribute('sensor_tick', '0.0')
else:
gnss_blueprint.set_attribute('sensor_tick',
str(1.0 / self._frequency))
transform = self._gnss_setup.get_transform().as_simulator_transform()
self._logger.debug("Spawning a GNSS sensor: {}".format(
self._gnss_setup))
self._gnss = self._world.spawn_actor(gnss_blueprint,
transform,
attach_to=self._vehicle)

# Register the callback on the GNSS sensor.
self._gnss.listen(self.on_gnss)
Loading