# IDM model

In [38]:
import json
import numpy as np
import pygad

def idm_model_before_change(params, initial_conditions, time_steps, lead_vehicle_speeds, lead_vehicle_positions, vehicle_length):
    a_max, s_0, T, b = params
    v0 = 33.3  # m/s
    delta = 4

    velocities = [initial_conditions['v']]
    positions = [initial_conditions['s']]
    gaps = []
    accelerations = []

    for t in range(0, len(time_steps)-1):
        current_v = velocities[-1]
        current_s =  positions[-1]

        s_lead = lead_vehicle_positions[t]
        gap = s_lead - current_s - vehicle_length
        delta_v = current_v - lead_vehicle_speeds[t]
        
        s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
        a = a_max * (1 - (current_v / v0) ** delta - (s_star / gap) ** 2)

        v_next = current_v + a * (time_steps[t+1] - time_steps[t])
        s_next = current_s + current_v * (time_steps[t+1] - time_steps[t]) + 0.5 * a * (time_steps[t+1] - time_steps[t]) **2

        velocities.append(v_next)
        gaps.append(gap)
        positions.append(s_next)
        accelerations.append(a)
        
    accelerations.append(accelerations[-1])    
    gaps.append(lead_vehicle_positions[-1] - positions[-1] - vehicle_length)
    
    # 为变道后提供原始预测数据
    current_v = velocities[-1]
    current_s = positions[-1]
    s_lead = lead_vehicle_positions[-1]
    
    gap = s_lead - current_s - vehicle_length
    delta_v = current_v - lead_vehicle_speeds[-1]
    
    s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
    a = a_max * (1 - (current_v / v0) ** delta - (s_star / gap) ** 2)
    
    v_next = current_v + a * (time_steps[t+1] - time_steps[t])
    s_next = current_s + current_v * (time_steps[t+1] - time_steps[t]) + 0.5 * a * (time_steps[t+1] - time_steps[t]) **2
    
    return velocities, positions, gaps, accelerations, v_next, s_next

def idm_model_after_change(params, initial_conditions, time_steps, lead_vehicle_speeds, lead_vehicle_positions, vehicle_length):
    a_max, s_0, T, b = params
    v0 = 33.3  # m/s
    delta = 4

    velocities = [initial_conditions['v']]
    positions = [initial_conditions['s']]
    gaps = []
    accelerations = []

    for t in range(0, len(time_steps)-1):
        current_v = velocities[-1]
        current_s =  positions[-1]

        s_lead = lead_vehicle_positions[t]
        gap = s_lead - current_s - vehicle_length
        delta_v = current_v - lead_vehicle_speeds[t]
        
        s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
        a = a_max * (1 - (current_v / v0) ** delta - (s_star / gap) ** 2)

        v_next = current_v + a * (time_steps[t+1] - time_steps[t])
        s_next = current_s + current_v * (time_steps[t+1] - time_steps[t]) + 0.5 * a * (time_steps[t+1] - time_steps[t]) **2

        velocities.append(v_next)
        gaps.append(gap)
        positions.append(s_next)
        accelerations.append(a)
        
    accelerations.append(accelerations[-1])    
    gaps.append(lead_vehicle_positions[-1] - positions[-1] - vehicle_length)
    
    return velocities, positions, gaps, accelerations

# Evaluation function

In [39]:
def calculate_mse(real, predicted):
    return np.mean((np.array(real) - np.array(predicted)) ** 2)

# Fintness function

In [40]:
def fitness_function(ga_instance, solution, solution_idx):
    user_data = ga_instance.user_data
    before_timestamp, after_timestamp, before_preceding_length, after_preceding_length, focus_length, foucs_before_position, focus_before_velocity, focus_after_position, focus_after_velocity, init_condition_before_following, init_condition_after_following, real_value_before_following, real_value_after_following, before_preceding_position, before_preceding_velocity, after_preceding_position, after_preceding_velocity = user_data

    # 建模-变道前的两辆后车
    predicted_before_following_before_change_vel, predicted_before_following_before_change_pos, predicted_before_change_before_gap, predicted_before_following_before_change_acc, predicted_value_for_next_v_before_following, predicted_value_for_next_s_before_following = idm_model_before_change(solution, init_condition_before_following, before_timestamp, focus_before_velocity, foucs_before_position, focus_length)
    predicted_after_following_before_change_vel, predicted_after_following_before_change_pos, predicted_after_change_before_gap, predicted_after_following_before_change_acc, predicted_value_for_next_v_after_following, predicted_value_for_next_s_after_following = idm_model_before_change(solution, init_condition_after_following, before_timestamp, after_preceding_velocity, after_preceding_position, after_preceding_length)
    predicted_init_condition_before_following = {
        'v': predicted_value_for_next_v_before_following,
        's': predicted_value_for_next_s_before_following
    }
    predicted_init_condition_after_following = {
        'v': predicted_value_for_next_v_after_following,
        's': predicted_value_for_next_s_after_following
    }
    # 建模-变道后的两辆后车
    predicted_before_following_after_change_vel, predicted_before_following_after_change_pos, predicted_before_change_after_gap, predicted_before_following_after_change_acc = idm_model_after_change(solution, predicted_init_condition_before_following, after_timestamp, before_preceding_velocity, before_preceding_position, before_preceding_length)
    predicted_after_following_after_change_vel, predicted_after_following_after_change_pos, predicted_after_change_after_gap, predicted_after_following_after_change_acc = idm_model_after_change(solution, predicted_init_condition_after_following, after_timestamp, focus_after_velocity, focus_after_position, focus_length)
    # 还原速度单位
    same_length = len(predicted_before_following_before_change_vel)
    for i in range(same_length):
        predicted_before_following_before_change_vel[i] *= 2.237
        predicted_after_following_before_change_vel[i] *= 2.237
        predicted_before_following_after_change_vel[i] *= 2.237
        predicted_after_following_after_change_vel[i] *= 2.237

    # 计算四次建模的均方误差
    # 变道前的变道前后车
    before_following_before_change_mse_gap = calculate_mse(real_value_before_following['before_change_gap'], predicted_before_change_before_gap)
    before_following_before_change_mse_vel = calculate_mse(real_value_before_following['before_change_velocity'], predicted_before_following_before_change_vel)
    before_following_before_change_mse_acc = calculate_mse(real_value_before_following['before_change_acceleration'], predicted_before_following_before_change_acc)
    # 变道前的变道后后车
    after_following_before_change_mse_gap = calculate_mse(real_value_after_following['before_change_gap'],  predicted_after_change_before_gap)
    after_following_before_change_mse_vel = calculate_mse(real_value_after_following['before_change_velocity'], predicted_after_following_before_change_vel)
    after_following_before_change_mse_acc = calculate_mse(real_value_after_following['before_change_acceleration'], predicted_after_following_before_change_acc)
    # 变道后的变道前后车
    before_following_after_change_mse_gap = calculate_mse(real_value_before_following['after_change_gap'], predicted_before_change_after_gap)
    before_following_after_change_mse_vel = calculate_mse(real_value_before_following['after_change_velocity'], predicted_before_following_after_change_vel)
    before_following_after_change_mse_acc = calculate_mse(real_value_before_following['after_change_acceleration'], predicted_before_following_after_change_acc)
    # 变道后的变道后后车
    after_following_after_change_mse_gap = calculate_mse(real_value_after_following['after_change_gap'], predicted_after_change_after_gap)
    after_following_after_change_mse_vel = calculate_mse(real_value_after_following['after_change_velocity'], predicted_after_following_after_change_vel)
    after_following_after_change_mse_acc = calculate_mse(real_value_after_following['after_change_acceleration'], predicted_after_following_after_change_acc)
    # 均方误差简单加和（未来可能有更有的适应度评判方式）
    mse_total = before_following_before_change_mse_gap + before_following_before_change_mse_vel + before_following_before_change_mse_acc + after_following_before_change_mse_gap + after_following_before_change_mse_vel + after_following_before_change_mse_acc + before_following_after_change_mse_gap + before_following_after_change_mse_vel + before_following_after_change_mse_acc + after_following_after_change_mse_gap + after_following_after_change_mse_vel + after_following_after_change_mse_acc
    
    return -mse_total

# Optimize

In [41]:
def optimize_all_entries(idm_file_path, real_file_path, output_file):
    
    # 读取文件
    with open(idm_file_path, 'r') as idm_file, open(real_file_path, 'r') as real_file:
        idm_lines = idm_file.readlines()
        real_lines = real_file.readlines()

        idm_data = [json.loads(line.strip()) for line in idm_lines]
        real_data = [json.loads(line.strip()) for line in real_lines]

    # 优化结果列表
    optimized_results = []
    
    with open(output_file, 'w') as f:
        for idm_entry, real_entry in zip(idm_data, real_data):
            # 相关数据读取
            start_time = idm_entry['timestamp'][0]
            end_time = idm_entry['timestamp'][-1]
            before_change = idm_entry['before_change_timestamp']
            after_change = idm_entry['after_change_timestamp']
            total_timestamp = idm_entry['timestamp']
            vehicle_focus_position = idm_entry['focus_position']
            vehicle_focus_velocity = [speed * 0.44704 for speed in idm_entry['focus_velocity']]
            before_preceding_position = idm_entry['before_preceding_after_change_position']
            before_preceding_velocity = [speed * 0.44704 for speed in idm_entry['before_preceding_after_change_velocity']]
            after_preceding_position = idm_entry['after_preceding_before_change_position']
            after_preceding_velocity = [speed * 0.44704 for speed in idm_entry['after_preceding_before_change_velocity']]
            # 获取变道前后的总时间帧
            before_change_timestamp = []
            after_change_timestamp = []
            
            for ts in total_timestamp:
                if start_time <= ts <= before_change:
                    before_change_timestamp.append(ts)
                if after_change <= ts <= end_time:
                    after_change_timestamp.append(ts)
            
            # 获取三辆前车长度
            vehicle_length_before_preceding = idm_entry['v_before_preceding_length']
            vehicle_length_after_preceding = idm_entry['v_after_preceding_length']
            vehicle_length_focus = idm_entry['v_length']
            
            # 拆分变道车辆的速度与位置信息
            vehicle_focus_before_change_position = []
            vehicle_focus_before_change_velocity = []
            vehicle_focus_after_change_position = []
            vehicle_focus_after_change_velocity = []
            
            for i, ts in enumerate(total_timestamp):
                if start_time <= ts <= before_change:
                    vehicle_focus_before_change_position.append(vehicle_focus_position[i])
                    vehicle_focus_before_change_velocity.append(vehicle_focus_velocity[i])
                if after_change <= ts <= end_time:
                    vehicle_focus_after_change_position.append(vehicle_focus_position[i])
                    vehicle_focus_after_change_velocity.append(vehicle_focus_velocity[i])
            
            # 变道前与变道后后车的初始信息
            initial_conditions_before_following = {
                'v': idm_entry['before_following_init_velocity'] * 0.44704,
                's': idm_entry['before_following_init_position']
            }
            
            initial_conditions_after_following = {
                'v': idm_entry['after_following_init_velocity'] * 0.44704,
                's': idm_entry['after_following_init_position']
            }
            # 获取真实数据
            before_following_total_velocity = real_entry['before_following_velocities']
            after_following_total_velocity = real_entry['after_following_velocities']
            before_following_total_accelerations = real_entry['before_following_accelerations']
            after_following_total_accelerations = real_entry['after_following_accelerations']
            
            before_following_before_change_velocity = []
            before_following_after_change_velocity = []
            after_following_before_change_velocity = []            
            after_following_after_change_velocity = []            
            before_following_before_change_acceleration = []
            before_following_after_change_acceleration = []
            after_following_before_change_acceleration = []            
            after_following_after_change_acceleration = []
            
            half_length = len(before_following_total_velocity) / 2
            
            i = 0
            while i < half_length:
                before_following_before_change_velocity.append(before_following_total_velocity[i])
                after_following_before_change_velocity.append(after_following_total_velocity[i])
                before_following_before_change_acceleration.append(before_following_total_accelerations[i])
                after_following_before_change_acceleration.append(after_following_total_accelerations[i])
                i = i + 1
            
            while i < len(before_following_total_velocity):
                before_following_after_change_velocity.append(before_following_total_velocity[i])
                after_following_after_change_velocity.append(after_following_total_velocity[i])
                before_following_after_change_acceleration.append(before_following_total_accelerations[i])
                after_following_after_change_acceleration.append(after_following_total_accelerations[i])
                i = i + 1
            
            real_values_before_following = {
                'before_change_gap': real_entry['before_following_before_gap'],
                'after_change_gap': real_entry['before_following_after_gap'],
                'before_change_velocity': before_following_before_change_velocity,
                'after_change_velocity': before_following_after_change_velocity,
                'before_change_acceleration': before_following_before_change_acceleration,
                'after_change_acceleration': before_following_after_change_acceleration
            }
            real_values_after_following = {
                'before_change_gap': real_entry['after_following_before_gap'],
                'after_change_gap': real_entry['after_following_after_gap'],
                'before_change_velocity': after_following_before_change_velocity,
                'after_change_velocity': after_following_after_change_velocity,
                'before_change_acceleration': after_following_before_change_acceleration,
                'after_change_acceleration': after_following_after_change_acceleration
            }
            # 传递用于IDM建模的数据和用于计算误差的真实数据
            user_data = (before_change_timestamp, after_change_timestamp, vehicle_length_before_preceding, vehicle_length_after_preceding, vehicle_length_focus, vehicle_focus_before_change_position, vehicle_focus_before_change_velocity, vehicle_focus_after_change_position, vehicle_focus_after_change_velocity, initial_conditions_before_following, initial_conditions_after_following, real_values_before_following, real_values_after_following, before_preceding_position, before_preceding_velocity, after_preceding_position, after_preceding_velocity)
            gene_space = [
                {'low': 1.0, 'high': 3.0},  # a_max
                {'low': 0.5, 'high': 3.0},  # s_0
                {'low': 0.5, 'high': 3.0},  # T
                {'low': 1.5, 'high': 3.0}   # b
            ]

            ga_instance = pygad.GA(
                num_generations=50,
                num_parents_mating=4,
                fitness_func=fitness_function,
                sol_per_pop=8,
                num_genes=4,
                gene_space=gene_space,
                gene_type=float,
                parent_selection_type="sss",
                keep_parents=1,
                crossover_type="single_point",
                mutation_type="random",
                mutation_percent_genes=10
            )

            # 设置用户数据
            ga_instance.user_data = user_data

            ga_instance.run()

            solution, solution_fitness, solution_idx = ga_instance.best_solution()
            a_max, s_0, T, b = solution

            optimized_result = {
                'vehicle_id': idm_entry['vehicle_id'],
                'before_lane_id': idm_entry['before_lane_id'],
                'after_lane_id': idm_entry['after_lane_id'],
                'before_preceding_id': idm_entry['before_preceding_id'],
                'after_preceding_id': idm_entry['after_preceding_id'],
                'a_max': a_max,
                's_0': s_0,
                'T': T,
                'b': b,
                'mse': -solution_fitness
            }

            optimized_results.append(optimized_result)
            
            json.dump(optimized_result, f)
            f.write('\n')

            if len(optimized_results) % 10 == 0:
                print(f"Processed {len(optimized_results)} lane changes")

        print(f"Total optimized entries: {len(optimized_results)}")

# Operation function

In [42]:
# 文件路径
idm_file_path = 'trajectories-0400-0415-idm_modeling_data.json'
real_file_path = 'trajectories-0400-0415-real_values_data.json'
output_file = 'trajectories-0400-0415-optimized_idm_parameters.json'

# 优化所有数据
optimize_all_entries(idm_file_path, real_file_path, output_file)

  s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
  a = a_max * (1 - (current_v / v0) ** delta - (s_star / gap) ** 2)
  s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
  return np.mean((np.array(real) - np.array(predicted)) ** 2)
  s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
  a = a_max * (1 - (current_v / v0) ** delta - (s_star / gap) ** 2)
  s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
  s_star = s_0 + max(0,current_v * T + (current_v * delta_v) / (2 * np.sqrt(a_max * b)))
  a = a_max * (1 - (current_v / v0) ** delta - (s_star / gap) ** 2)


Processed 10 lane changes


TypeError: unsupported operand type(s) for *: 'NoneType' and 'float'