In [1]:
import rosbag
import matplotlib.pyplot as plt
import numpy as np
from ipywidgets import interact, IntSlider
import ipywidgets as widgets

读取bag文件中的数据，并把数据保存到列表中

In [2]:
bag_file = '/home/xzh/ros1/BPws/my_bag.bag'
bag_data = rosbag.Bag(bag_file, "r")

info = bag_data.get_type_and_topic_info()
print(info)

topic_name1 = "/record/agent_0/ego_behavior_vis"
topic_name2 = "/record/agent_0/ego_vehicle_status"
topic_name3 = "/record/agent_0/forward_trajs"
topic_name4 = "/vis/agent_0/local_lanes_vis"

# 记录每个时间步的behavior
behavior_record = []
behavior_record_time = []
perception_data = bag_data.read_messages(topic_name1)
for topic, msg, t in perception_data:
    if msg is not None:
        behavior_record.append(msg.Behavior)
        behavior_record_time.append(t)

# 记录每个时间步的车辆状态，包括位置和其他的一些信息
ego_vehicle_position = []
ego_vehicle_status = [] # 0 angle,1 curvature,2 velocity,3 acceleration,4 steer
perception_data = bag_data.read_messages(topic_name2)
for topic, msg, t in perception_data:
    if msg is not None:
        ego_vehicle_position.append([msg.state.vec_position.x,msg.state.vec_position.y])
        ego_vehicle_status.append([msg.state.angle,msg.state.curvature,msg.state.velocity,msg.state.acceleration,msg.state.steer])

# 记录每个时间步所模拟的所有轨迹
forward_trajs_record = [] # 记录所有时刻的所有轨迹
perception_data = bag_data.read_messages(topic_name3)
for topic, msg, t in perception_data:
    if msg is not None:
        current_time_trajs = [] # 记录该时刻的所有轨迹
        # 遍历该时刻的每一条轨迹
        for traj in msg.forward_trajs_record:
            tra_record = [] # 记录该条轨迹上的所有点
            # 遍历该轨迹上的每一个点
            for point in traj.forward_trajs:
                tra_record.append([point.x, point.y])
            current_time_trajs.append(tra_record)
        forward_trajs_record.append(current_time_trajs)

Local_lanes_record = []
perception_data = bag_data.read_messages(topic_name4)
for topic, msg, t in perception_data:
    if msg is not None:
        Local_lanes_record_current = []
        for lane in msg.markers:
            Local_lane = []
            for point in lane.points:
                Local_lane.append([point.x, point.y])
            Local_lanes_record_current.append(Local_lane)
        Local_lanes_record.append(Local_lanes_record_current)
        
bag_data.close()

TypesAndTopicsTuple(msg_types={'vehicle_msgs/BehaviorString': '556d65a784e9d97c771304fd472b8e34', 'vehicle_msgs/ForwardTrajsRecord': '9eb8e2e3dc73c046a31c0fe2d9981308', 'vehicle_msgs/Vehicle': '0372a2fa6275905a037fbc1ca20c1ad6', 'visualization_msgs/MarkerArray': 'd155b9ce5188fbaf89745847fd5882d7'}, topics={'/record/agent_0/ego_behavior_vis': TopicTuple(msg_type='vehicle_msgs/BehaviorString', message_count=373, connections=1, frequency=19.936136738486695), '/record/agent_0/ego_vehicle_status': TopicTuple(msg_type='vehicle_msgs/Vehicle', message_count=373, connections=1, frequency=19.967123757203282), '/record/agent_0/forward_trajs': TopicTuple(msg_type='vehicle_msgs/ForwardTrajsRecord', message_count=373, connections=1, frequency=19.940591423409717), '/vis/agent_0/local_lanes_vis': TopicTuple(msg_type='visualization_msgs/MarkerArray', message_count=373, connections=1, frequency=19.95657831818301)})


绘图函数——绘制车道

In [11]:
# 定义绘图函数
def plot_local_lanes(time_step):
    lanes = Local_lanes_record[time_step]
    for lane in lanes:
        lane = np.array(lane)
        if(lane.ndim != 2):
            print("lane shape is not 2,continue")
            continue
        plt.plot(lane[:, 0], lane[:, 1])
    plt.grid(True)
    plt.show()

车道线计算

In [35]:
def compute_normals(trajectory):
    # 计算法向量
    delta = np.diff(trajectory, axis=0)
    normals = np.zeros_like(delta)
    normals[:, 0] = -delta[:, 1]
    normals[:, 1] = delta[:, 0]
    
    # 归一化法向量
    norms = np.linalg.norm(normals, axis=1)
    normals = normals / norms[:, np.newaxis]
    
    # 在起点和终点添加法向量（简单复制第一个和最后一个法向量）
    normals = np.vstack([normals, normals[-1]])
    
    return normals

def generate_lane_boundaries(trajectory, lane_width):
    # 计算法向量
    normals = compute_normals(trajectory)
    
    # 计算左边和右边的边界点
    left_boundary = trajectory + lane_width / 2 * normals
    right_boundary = trajectory - lane_width / 2 * normals
    
    return left_boundary, right_boundary

绘制长方形来代表车辆位置

In [47]:
def rotate_point(point, angle, center):
    # 将点绕中心旋转一定角度
    angle_rad = np.deg2rad(angle)
    rotation_matrix = np.array([
        [np.cos(angle_rad), -np.sin(angle_rad)],
        [np.sin(angle_rad), np.cos(angle_rad)]
    ])
    rotated_point = rotation_matrix.dot(point - center) + center
    return rotated_point

def get_rectangle_points(center, angle, length=4.88, width=1.9):
    # 计算未旋转的长方形的四个顶点相对于中心点的坐标
    half_length = length / 2
    half_width = width / 2
    points = np.array([
        [-half_length, -half_width],
        [half_length, -half_width],
        [half_length, half_width],
        [-half_length, half_width]
    ])

        # 旋转每个点
    rotated_points = np.array([rotate_point(point, angle, [0, 0]) for point in points])
    
    # 将旋转后的点平移到中心点
    rotated_points += center
    
    return rotated_points

def plot_rectangle(center, angle, length=4.88, width=1.9):
    # angle 是以度为单位的，当 angle = 0 时，长方形的长边平行于 x 轴
    # angle > 0 时，长方形绕中心点逆时针旋转指定的角度
    # 获取长方形的四个顶点
    rectangle_points = get_rectangle_points(center,angle,length,width)
    
    # 闭合长方形
    rectangle_points = np.vstack([rectangle_points, rectangle_points[0]])

    return rectangle_points

绘图函数——绘制车道线和中心线，绘制车辆，并且根据车辆位置缩放

In [67]:
# 定义绘图函数
def plot_local_lanes2(time_step):
    lanes = Local_lanes_record[time_step]
    vehicle_position = ego_vehicle_position[time_step]
    vehicle_status = ego_vehicle_status[time_step]
    # 绘制车道
    for lane in lanes:
        lane = np.array(lane)
        if(lane.ndim != 2):
            # print("lane shape is not 2,continue")
            continue
        plt.plot(lane[:, 0], lane[:, 1],color='grey',linestyle = '--',linewidth = 0.5)
        left_boundary, right_boundary = generate_lane_boundaries(lane, 3.5)
        plt.plot(left_boundary[:, 0], left_boundary[:, 1],color='grey',linewidth = 0.5)
        plt.plot(right_boundary[:, 0], right_boundary[:, 1],color='grey',linewidth = 0.5)
    # 绘制车辆位置
    rectangle_points = plot_rectangle(vehicle_position, np.degrees(vehicle_status[0]))
    plt.fill(rectangle_points[:, 0], rectangle_points[:, 1],color='blue')
    
    window_size = 10
    plt.xlim(vehicle_position[0]-window_size, vehicle_position[0]+window_size)  # 限制 x 轴范围
    plt.ylim(vehicle_position[1]-window_size, vehicle_position[1]+window_size)  # 限制 y 轴范围
    plt.show()

In [None]:
plot_local_lanes2(370)

绘图函数3——在2的基础上，绘制车辆的前向模拟轨迹

In [95]:
# 定义绘图函数
def plot_local_lanes3(time_step):
    lanes = Local_lanes_record[time_step]
    vehicle_position = ego_vehicle_position[time_step]
    vehicle_status = ego_vehicle_status[time_step]
    forward_trajs = forward_trajs_record[time_step]
    # 绘制车道
    for lane in lanes:
        lane = np.array(lane)
        if(lane.ndim != 2):
            # print("lane shape is not 2,continue")
            continue
        # plt.plot(lane[:, 0], lane[:, 1],color='grey',linestyle = '--',linewidth = 0.5)
        left_boundary, right_boundary = generate_lane_boundaries(lane, 3.5)
        plt.plot(left_boundary[:, 0], left_boundary[:, 1],color='black',linewidth = 1)
        plt.plot(right_boundary[:, 0], right_boundary[:, 1],color='black',linewidth = 1)
    # 绘制车辆位置
    rectangle_points = plot_rectangle(vehicle_position, np.degrees(vehicle_status[0]))
    plt.fill(rectangle_points[:, 0], rectangle_points[:, 1],color='blue')

    for traj in forward_trajs:
        traj = np.array(traj)
        if(traj.ndim != 2):
            # print("traj shape is not 2,continue")
            continue        
        plt.plot(traj[:, 0], traj[:, 1],color='red',linestyle = '-',linewidth = 0.5)
    
    window_size = 40
    plt.xlim(vehicle_position[0]-window_size+20, vehicle_position[0]+window_size+20)  # 限制 x 轴范围
    plt.ylim(vehicle_position[1]-window_size, vehicle_position[1]+window_size)  # 限制 y 轴范围
    plt.show()

In [96]:
# 创建频率滑动条
time_step_slider = IntSlider(value=0, min=0, max=len(Local_lanes_record)-1, step=1, description='time_step: ')

# 使用 interact 创建交互式绘图
interact(plot_local_lanes3, time_step=time_step_slider)

interactive(children=(IntSlider(value=0, description='time_step: ', max=372), Output()), _dom_classes=('widget…

<function __main__.plot_local_lanes3(time_step)>