In [1]:
%cd ..

import random
import time
import math
import collections
import weakref

import carla

from agents.navigation.behavior_agent import BehaviorAgent


client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
traffic_manager = client.get_trafficmanager(8000)

# Retrieve the world that is currently running
world = client.load_world('Town01')

origin_settings = world.get_settings()

# set sync mode
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.1
world.apply_settings(settings)

d:\PythonWorkspace\Defensive_Driving


1801768

In [2]:
class CollisionSensor(object):
    def __init__(self, parent_actor):
        self.sensor = None
        self.history = []
        self._parent = parent_actor
        world = self._parent.get_world()
        bp = world.get_blueprint_library().find('sensor.other.collision')
        self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
        # We need to pass the lambda a weak reference to self to avoid circular
        # reference.
        weak_self = weakref.ref(self)
        self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))

    def get_collision_history(self) -> dict:
        history = collections.defaultdict(int)
        for frame, intensity in self.history:
            history[frame] += intensity
        return history
    
    def is_collided(self) -> bool:
        return self.history != []

    @staticmethod
    def _on_collision(weak_self, event):
        self = weak_self()
        if not self:
            return
        impulse = event.normal_impulse
        intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)
        self.history.append((event.frame, intensity))
        if len(self.history) > 4000:
            self.history.pop(0)

In [3]:
 # read all valid spawn points
all_default_spawn = world.get_map().get_spawn_points()
print(len(all_default_spawn))
# randomly choose one as the start point
ego_spawn_point = all_default_spawn[190]
ego_destination = all_default_spawn[118]
enemy_spawn_point = all_default_spawn[109]
enemy_destination = all_default_spawn[118]

255


In [4]:
traffic_lights = world.get_actors().filter('traffic.traffic_light')
for tl in traffic_lights:
    tl.set_state(carla.TrafficLightState.Off)
    tl.freeze(True)

In [5]:
traffic_manager.set_global_distance_to_leading_vehicle(0)
traffic_manager.global_percentage_speed_difference(100)

In [10]:
try:
    blueprint_library = world.get_blueprint_library()

    # create the blueprint library
    ego_vehicle_bp = blueprint_library.find('vehicle.audi.a2')
    ego_vehicle_bp.set_attribute('color', '255, 0, 0')
    truck_bp = blueprint_library.find('vehicle.audi.a2')
    truck_bp.set_attribute('color', '0, 0, 255')

    # spawn the vehicle
    ego_vehicle = world.spawn_actor(ego_vehicle_bp, ego_spawn_point)
    enemy_vehicle = world.spawn_actor(truck_bp, enemy_spawn_point)

    # we need to tick the world once to let the client update the spawn position
    world.tick()

    # create the behavior agent
    ego_agent = BehaviorAgent(ego_vehicle, behavior='cautious')
    enemy_agent = BehaviorAgent(enemy_vehicle, behavior='aggressive')

    ego_collision_sensor = CollisionSensor(ego_vehicle)


    # generate the route
    ego_agent.set_destination(ego_destination.location)
    enemy_agent.set_destination(enemy_destination.location)

    while True:
        time.sleep(0.05)
        # agent.run_step()

        world.tick()
        
        if ego_agent.done() and enemy_agent.done() or ego_collision_sensor.is_collided():
            print('======== Success, Arrivied at Target Point!')
            break
            
        # top view
        spectator = world.get_spectator()
        transform = enemy_vehicle.get_transform()
        spectator.set_transform(carla.Transform(transform.location + carla.Location(z=40),
                                                carla.Rotation(pitch=-90)))

        ego_agent.get_local_planner().set_speed(ego_vehicle.get_speed_limit())
        ego_vehicle.apply_control(ego_agent.run_step(debug=True))
        enemy_agent.get_local_planner().set_speed(enemy_vehicle.get_speed_limit())
        enemy_vehicle.apply_control(enemy_agent.run_step(debug=True))

finally:
    world.apply_settings(origin_settings)
    ego_vehicle.destroy()
    enemy_vehicle.destroy()



: 

In [7]:
# ego_vehicle.destroy()
# enemy_vehicle.destroy()