In [None]:
!pip install filterpy
from google.colab import drive
drive.mount('/content/drive')
import os
from collections import defaultdict
import csv
import numpy as np
from scipy.spatial.distance import euclidean
from filterpy.kalman import KalmanFilter
from filterpy.kalman import ExtendedKalmanFilter
import math
from typing import Optional
import pandas as pd
import json
import math

Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/178.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-none-any.whl size=110458 sha256=c7351db7137861048bdb648ee14de0e738b6acb20ccab9c12b1db38abc2dcbfb
  Stored in directory: /root/.cache/pip/wheels/0f/0c/ea/218f266af4ad626897562199fbbcba521b8497303200186102
Successfully built filterpy
Installing collected packages: filterpy
Successfully installed filterpy-1.4.5
Mounted at /content/drive


In [None]:
class VehicleKalmanFilter:
    def __init__(self, initial_position, initial_velocity):
        self.kf = KalmanFilter(dim_x=4, dim_z=2)  # State: [x, y, vx, vy]
        self.kf.x = np.array([initial_position[0], initial_position[1],
                              initial_velocity[0], initial_velocity[1]])
        self.kf.F = np.array([[1, 0, 0, 0],
                              [0, 1, 0, 0],
                              [0, 0, 1, 0],
                              [0, 0, 0, 1]])
        self.kf.H = np.array([[1, 0, 0, 0],
                              [0, 1, 0, 0]])
        self.kf.P *= 1000.
        self.kf.R = np.eye(2) * 10
        self.kf.Q = np.eye(4) * 0.1

    def predict(self, time_diff):
      self.kf.F = np.array([
          [1, 0, time_diff, 0],
          [0, 1, 0, time_diff],
          [0, 0, 1, 0],
          [0, 0, 0, 1]
      ])
      # compute Q
      dt = time_diff
      q = 0.1
      self.kf.Q = np.array([
          [dt**4/4*q, 0, dt**3/2*q, 0],
          [0, dt**4/4*q, 0, dt**3/2*q],
          [dt**3/2*q, 0, dt**2*q, 0],
          [0, dt**3/2*q, 0, dt**2*q]
      ])
      self.kf.predict()
      return self.get_state()

    def update(self, position, velocity=None):
        self.kf.update(position)
        if velocity is not None:
            self.kf.x[2:] = velocity

    def get_state(self):
        return self.kf.x[:2], self.kf.x[2:]

In [None]:
# Load the JSON file
files = os.listdir('/content/drive/My Drive/Thesis_data/Data/')
junction_file = "/content/drive/My Drive/Thesis_data/Data/" + files[0]
with open(junction_file, 'r') as file:
    junctions_data = json.load(file)

# Function to find connection (with automatic prefix extraction for fuzzy matching)
def find_connection(from_lane, to_lane):
    # Function to extract the road prefix from the lane ID (removing the lane number suffix)
    def extract_road_prefix(lane_id):
        return lane_id.split('_')[0]

    # Extract the road prefixes
    from_road_prefix = extract_road_prefix(from_lane)
    to_road_prefix = extract_road_prefix(to_lane)

    # Search for a fuzzy connection
    for junction in junctions_data:
        if 'lane_connections' in junction:
            lane_connections = junction['lane_connections']
            # Search for all connections from lanes matching the from_road_prefix
            for lane in lane_connections:
                if from_road_prefix in lane:  # Matches the from_lane's road prefix
                    for connection in lane_connections[lane]:
                        if to_road_prefix in connection['to']:  # Matches the to_lane's road prefix
                            return True

    return False

# Load Eavesdropper ID
eavesdropper_id_file = "/content/drive/My Drive/Thesis_data/Data/" + files[1]
print(eavesdropper_id_file)
data = pd.read_csv(eavesdropper_id_file)
eavesdropper_on_mixzone_list = data['EavesdropperID'].tolist()
def check_mixzoneeavesdropper(id):
    ID = int(id)
    return ID in eavesdropper_on_mixzone_list
def get_eavesdropper_coordinates(eavesdropper_id):
    # Filter the dataset for the given EavesdropperID
    eavesdropper_data = data[data['EavesdropperID'] == int(eavesdropper_id)]

    # Check if EavesdropperID exists in the dataset
    if not eavesdropper_data.empty:
        x, y = eavesdropper_data.iloc[0]['X'], eavesdropper_data.iloc[0]['Y']
        return float(x), float(y)
    else:
        return None, None


/content/drive/My Drive/Thesis_data/Data/Eavesdropper_On_MixZone_Info_plus9.csv


In [None]:
class Tracker:
  def __init__(self, tracker_mode=0):
    # tracker_mode
    # 0 = position only (Baseline)
    # 1 = PCAID Tracker
    # 2 = RIPD Tracker

    # Initialize analyzer parameters
    self.exit_threshold = 15
    self.tracker_mode = tracker_mode

    # Statistics for evaluation
    self.vehicle_to_pseudonyms = defaultdict(list) # Real vehicle ID to pseudonym list mapping
    self.pseudonym_to_vehicleID = {}  # Pseudonym to real vehicle ID mapping (constructed during data processing)
    self.pseudonym_change_count = 0
    self.vehicle_count = 0

    # For tracking
    self.current_second = None
    self.observed_vehicle_count = 0
    self.disappear_pseudonym_last_seen = defaultdict(dict)
    self.pseudonym_link_list = defaultdict(list)
    self.pseudonym_link_switch_PCAID_list = defaultdict(list)
    self.pseudonym_to_observed_vehicle = {}
    self.observed_pseudonym_changes = 0  # Number of detected pseudonym changes
    self.current_vehicles = defaultdict(dict)  # Store information of currently active vehicles
    self.link_success_count = 0
    self.link_success_count_normal = 0
    self.link_success_count_mixzone = 0
    self.vehicle_pcaid_dict = {}

    self.pseudonym_change_events = []  # Store the time and vehicle ID of the pseudonyms switching event.

  def process_beacons(self, filename):
    previous_second_beacons = []
    current_second_beacons = []


    with open(filename, 'r') as file:
      reader = csv.DictReader(file)
      for row in reader:
        if int(row["Encrypted"]) == 1:
          self.pseudonym_change_events.append((timestamp, vehicle_id,row["PositionX"],row["PositionY"],row["PCAID"]))
          if row["VehicleID"] in self.vehicle_pcaid_dict.keys():
            self.vehicle_pcaid_dict[row["VehicleID"]].add(row["PCAID"])
            # if(len(self.vehicle_pcaid_dict[row["VehicleID"]]) == 2):
            #   print(row["VehicleID"])
            #   print(self.vehicle_pcaid_dict[row["VehicleID"]])
            #   break
          else:
             pass

        # Add real vehicles and pseudonyms
        if int(row["Encrypted"]) == 0:
          vehicle_id = row["VehicleID"]
          pseudonym = row["Pseudonym"]
          if vehicle_id not in self.vehicle_to_pseudonyms.keys():
            self.vehicle_to_pseudonyms[vehicle_id].append(pseudonym)
            self.vehicle_count += 1
            self.vehicle_pcaid_dict[vehicle_id] = set()
            self.vehicle_pcaid_dict[vehicle_id].add(row["PCAID"])
          else:
            if pseudonym not in self.vehicle_to_pseudonyms[vehicle_id]:
              self.vehicle_to_pseudonyms[vehicle_id].append(pseudonym)
              self.pseudonym_change_count += 1
              timestamp = float(row['Timestamp'])
              self.pseudonym_change_events.append((timestamp, vehicle_id,row["PositionX"],row["PositionY"],row["PCAID"]))
              self.vehicle_pcaid_dict[vehicle_id].add(row["PCAID"])
              if(len(self.vehicle_pcaid_dict[row["VehicleID"]]) > 2):
                print(row["VehicleID"])
                print(self.vehicle_pcaid_dict[row["VehicleID"]])
                print(row["PCAID"])
                print(self.current_second)
                break



          self.pseudonym_to_vehicleID[pseudonym] = vehicle_id

          timestamp = float(row['Timestamp'])
          second = int(timestamp)

          if self.current_second == None:
            self.current_second = second

          if self.current_second == second:
            current_second_beacons.append(row)
          else:
            self.process_two_seconds(previous_second_beacons, current_second_beacons)
            previous_second_beacons = current_second_beacons
            current_second_beacons = []
            current_second_beacons.append(row)
            self.current_second = second


    # if previous_second_beacons and current_second_beacons:
    #   self.process_two_seconds(previous_second_beacons, current_second_beacons)

  def process_two_seconds(self, previous_beacons, current_beacons):
    # Process two seconds of beacon data
    previous_pseudonyms = set(beacon['Pseudonym'] for beacon in previous_beacons)
    current_pseudonyms = set(beacon['Pseudonym'] for beacon in current_beacons)

    # Find the missing and newly appearing pseudonyms
    disappeared_pseudonyms = previous_pseudonyms - current_pseudonyms
    new_pseudonyms = current_pseudonyms - previous_pseudonyms

    # Continuously appearing pseudonyms
    common_pseudonyms = current_pseudonyms & previous_pseudonyms

    new_pseudonyms_beacons = []
    common_pseudonyms_beacons = []
    connected_beacons = []
    connected_pseudonyms = []
    old_pseudonyms = []
    exit_pseudonyms = []
    delayed_pseudonyms = []

    for pseudonym in disappeared_pseudonyms:
      self.handle_disappeared_pseudonym(pseudonym)


    # Scan the vanished vehicle
    if self.disappear_pseudonym_last_seen:
      for old_pseudonym in self.disappear_pseudonym_last_seen.keys():
        if old_pseudonym in new_pseudonyms:
          new_pseudonyms.remove(old_pseudonym)
          common_pseudonyms.add(old_pseudonym)
          delayed_pseudonyms.append(old_pseudonym)
        else:
          if check_mixzoneeavesdropper(self.disappear_pseudonym_last_seen[old_pseudonym]["eavesdropperid"]):
            if self.current_second - self.disappear_pseudonym_last_seen[old_pseudonym]["time"] >= 120:
              exit_pseudonyms.append(old_pseudonym)
          else:
            if self.current_second - self.disappear_pseudonym_last_seen[old_pseudonym]["time"] >= self.exit_threshold:
               exit_pseudonyms.append(old_pseudonym)

    for exit_pseudonym in exit_pseudonyms:
      self.disappear_pseudonym_last_seen.pop(exit_pseudonym)


    for beacon in current_beacons:
      pseudonym = beacon['Pseudonym']
      if pseudonym in new_pseudonyms:
        new_pseudonyms_beacons.append(beacon)
      elif pseudonym in common_pseudonyms:
        common_pseudonyms_beacons.append(beacon)

    for pseudonym in disappeared_pseudonyms:
      if check_mixzoneeavesdropper(self.disappear_pseudonym_last_seen[pseudonym]["eavesdropperid"]):
        eavesdropperX, eavesdropperY = get_eavesdropper_coordinates(self.disappear_pseudonym_last_seen[pseudonym]["eavesdropperid"])
        eavesdropperPos = np.array([eavesdropperX, eavesdropperY])
        distance_diff = euclidean(eavesdropperPos, self.disappear_pseudonym_last_seen[pseudonym]["position"])
        if distance_diff <= 60:
          linked_flag, result, pseudonym_new = self.try_link_pseudonym_inMixZone(new_pseudonyms_beacons, pseudonym)
        else:
          linked_flag, result, pseudonym_new = self.try_link_pseudonym(new_pseudonyms_beacons, pseudonym)
      else:
        linked_flag, result, pseudonym_new = self.try_link_pseudonym(new_pseudonyms_beacons, pseudonym)
      if linked_flag:
          self.observed_pseudonym_changes += 1
          connected_pseudonyms.append(pseudonym_new)
          connected_beacons.append(result)
          new_pseudonyms = [item for item in new_pseudonyms if item not in connected_pseudonyms]
          new_pseudonyms_beacons = [item for item in new_pseudonyms_beacons if item not in connected_beacons]
          old_pseudonyms.append(pseudonym)
          linked_flag = False


    if self.disappear_pseudonym_last_seen:
      for old_pseudonym in self.disappear_pseudonym_last_seen.keys():
        if old_pseudonym not in old_pseudonyms:
          if check_mixzoneeavesdropper(self.disappear_pseudonym_last_seen[old_pseudonym]["eavesdropperid"]):
            eavesdropperX, eavesdropperY = get_eavesdropper_coordinates(self.disappear_pseudonym_last_seen[old_pseudonym]["eavesdropperid"])
            eavesdropperPos = np.array([eavesdropperX, eavesdropperY])
            distance_diff = euclidean(eavesdropperPos, self.disappear_pseudonym_last_seen[old_pseudonym]["position"])
            if distance_diff <= 60:
              linked_flag, result, pseudonym_new = self.try_link_pseudonym_inMixZone(new_pseudonyms_beacons, old_pseudonym)
            else:
              linked_flag, result, pseudonym_new = self.try_link_pseudonym(new_pseudonyms_beacons, old_pseudonym)
          else:
            linked_flag, result, pseudonym_new = self.try_link_pseudonym(new_pseudonyms_beacons, old_pseudonym)
          if linked_flag:
            self.observed_pseudonym_changes += 1
            connected_beacons.append(result)
            connected_pseudonyms.append(pseudonym_new)
            old_pseudonyms.append(old_pseudonym)
            new_pseudonyms = [item for item in new_pseudonyms if item not in connected_pseudonyms]
            new_pseudonyms_beacons = [item for item in new_pseudonyms_beacons if item not in connected_beacons]
            linked_flag = False



    for i in range(len(connected_pseudonyms)):
      self.current_vehicles[connected_pseudonyms[i]].update({"kalman_filter":self.disappear_pseudonym_last_seen[old_pseudonyms[i]]["kalman_filter"]})

    for pseudonym in delayed_pseudonyms:
      self.current_vehicles[pseudonym].update({"kalman_filter":self.disappear_pseudonym_last_seen[pseudonym]["kalman_filter"]})

    # Handle connected pseudonyms
    for i in range(len(connected_beacons)):
      #new_pseudonyms.remove(connected_beacons[i]["Pseudonym"])
      common_pseudonyms_beacons.append(connected_beacons[i])
      if old_pseudonyms[i] in self.disappear_pseudonym_last_seen.keys():
        del self.disappear_pseudonym_last_seen[old_pseudonyms[i]]
      for key in self.pseudonym_link_list.keys():
        if old_pseudonyms[i] in self.pseudonym_link_list[key]:
           self.pseudonym_link_list[key].append(connected_pseudonyms[i])

    if len(new_pseudonyms_beacons) != 0:
      self.add_new_vehicle(new_pseudonyms_beacons)


    # Process the current beacon
    if len(common_pseudonyms_beacons) != 0:
      self.update_beacon(common_pseudonyms_beacons)

  def add_new_vehicle(self, beacons):
    for beacon in beacons:
      pseudonym = beacon['Pseudonym']
      pcaid = beacon['PCAID']
      position = np.array([float(beacon['PositionX']), float(beacon['PositionY'])])
      speed = np.array([float(beacon['SpeedX']), float(beacon['SpeedY'])])
      eavesdropperid = beacon["EavesdropperId"]
      laneid = beacon["LaneID"]
      self.observed_vehicle_count += 1
      self.pseudonym_link_list[self.observed_vehicle_count].append(beacon["Pseudonym"])
      if self.tracker_mode == 2:
        self.pseudonym_link_switch_PCAID_list[pseudonym].append(pcaid)
      kf = VehicleKalmanFilter(position, speed)
      self.current_vehicles[pseudonym].update({"VehicleID":beacon["VehicleID"],"position":position, "speed":speed, "kalman_filter":kf, "pcaid":pcaid, "eavesdropperid":eavesdropperid,"LaneID":laneid})



  def update_beacon(self, beacons):
    # Process single beacon data
    for beacon in beacons:
      pseudonym = beacon['Pseudonym']
      pcaid = beacon['PCAID']
      position = np.array([float(beacon['PositionX']), float(beacon['PositionY'])])
      speed = np.array([float(beacon['SpeedX']), float(beacon['SpeedY'])])
      eavesdropperid = beacon["EavesdropperId"]
      laneid = beacon["LaneID"]
      kf = self.current_vehicles[pseudonym]["kalman_filter"]
      kf.update(position, speed)
      self.current_vehicles[pseudonym].update({"VehicleID":beacon["VehicleID"],"position":position, "speed":speed, "kalman_filter":kf, "pcaid":pcaid, "eavesdropperid":eavesdropperid,"LaneID":laneid})



  def handle_disappeared_pseudonym(self, pseudonym):
    # Handle missing pseudonyms

    if pseudonym in self.current_vehicles:
      vehicle = self.current_vehicles[pseudonym]
      kf = vehicle["kalman_filter"]
      self.disappear_pseudonym_last_seen[pseudonym].update({
          "VehicleID":vehicle["VehicleID"],
          "kalman_filter": kf,
          "position":vehicle["position"],
          "speed":vehicle["speed"],
          "pcaid":vehicle["pcaid"],
          "eavesdropperid":vehicle["eavesdropperid"],
          "LaneID":vehicle["LaneID"],
          "time":self.current_second})

      if pseudonym in self.current_vehicles:
        self.current_vehicles.pop(pseudonym)

  def try_link_pseudonym(self, new_beacons, old_pseudonym):
    best_score_pseudonym = ""
    best_score = 0
    best_score_beacon = None
    best_time_diff = None
    best_pos = None
    best_pred_pos = None
    best_speed = None
    best_pred_speed = None

    old_pcaid = self.disappear_pseudonym_last_seen[old_pseudonym]["pcaid"]

    kf = self.disappear_pseudonym_last_seen[old_pseudonym]['kalman_filter']
    time_diff = self.current_second - self.disappear_pseudonym_last_seen[old_pseudonym]['time']
    if time_diff == 0:
      predicted_position = self.disappear_pseudonym_last_seen[old_pseudonym]["position"]
      predicted_speed = self.disappear_pseudonym_last_seen[old_pseudonym]["speed"]
    else:
      kf.predict(time_diff)
      predicted_position, predicted_speed = kf.get_state()
    if self.tracker_mode == 1 or self.tracker_mode == 2:
      for beacon in new_beacons:
        pseudonym = beacon['Pseudonym']
        new_pcaid = beacon['PCAID']

        if self.tracker_mode == 1:
          if new_pcaid != old_pcaid:
            continue
        elif self.tracker_mode == 2:
          if new_pcaid not in self.pseudonym_link_switch_PCAID_list[old_pseudonym]:
            continue
          else:
            pass

        # Calculate position and velocity differences
        position = np.array([float(beacon['PositionX']), float(beacon['PositionY'])])
        speed = np.array([float(beacon['SpeedX']), float(beacon['SpeedY'])])
        distance_diff = euclidean(predicted_position, position)
        speed_diff = euclidean(predicted_speed, speed)
        score = self.calculate_score(distance_diff, speed_diff)
        if score >70:
          if score > best_score:
            best_score = score
            best_score_pseudonym = pseudonym
            best_score_beacon = beacon
            best_time_diff = time_diff
            best_pos = position
            best_pred_pos = predicted_position
            best_speed = speed
            best_pred_speed = predicted_speed

      if best_score_pseudonym != "":
        if self.tracker_mode == 1:
          if self.disappear_pseudonym_last_seen[old_pseudonym]["VehicleID"] == best_score_beacon["VehicleID"]:
            self.link_success_count += 1
            self.link_success_count_normal += 1
          return True, best_score_beacon, best_score_pseudonym
        elif self.tracker_mode == 2:
          if self.disappear_pseudonym_last_seen[old_pseudonym]["VehicleID"] == best_score_beacon["VehicleID"]:
            self.link_success_count += 1
            self.link_success_count_normal += 1
          self.pseudonym_link_switch_PCAID_list[best_score_pseudonym] = self.pseudonym_link_switch_PCAID_list[old_pseudonym]
          del self.pseudonym_link_switch_PCAID_list[old_pseudonym]
          if best_score_beacon["PCAID"] not in self.pseudonym_link_switch_PCAID_list[best_score_pseudonym]:
            self.pseudonym_link_switch_PCAID_list[best_score_pseudonym].append(best_score_beacon["PCAID"])
          return True, best_score_beacon, best_score_pseudonym
        else:
          return False, None, None
      else:
        return False, None, None

    for beacon in new_beacons:
      pseudonym = beacon['Pseudonym']
      new_pcaid = beacon['PCAID']
      position = np.array([float(beacon['PositionX']), float(beacon['PositionY'])])
      speed = np.array([float(beacon['SpeedX']), float(beacon['SpeedY'])])
      distance_diff = euclidean(predicted_position, position)
      speed_diff = euclidean(predicted_speed, speed)
      score = self.calculate_score(distance_diff, speed_diff)
      if score > 60:
        if score > best_score:
          best_score = score
          best_score_pseudonym = pseudonym
          best_score_beacon = beacon
    if best_score_pseudonym != "":
      return True, best_score_beacon, best_score_pseudonym
    return False, None, None

  def try_link_pseudonym_inMixZone(self, new_beacons, old_pseudonym):

    eavesdropperX, eavesdropperY = get_eavesdropper_coordinates(self.disappear_pseudonym_last_seen[old_pseudonym]["eavesdropperid"])
    eavesdropperPos = np.array([eavesdropperX, eavesdropperY])
    old_pcaid = self.disappear_pseudonym_last_seen[old_pseudonym]["pcaid"]

    best_pseudonym = ""
    best_beacon = None

    if self.tracker_mode == 1 or self.tracker_mode == 2:
      for beacon in new_beacons:
        pseudonym = beacon['Pseudonym']
        new_pcaid = beacon['PCAID']

        if self.tracker_mode == 1:
          if new_pcaid != old_pcaid:
            continue
        elif self.tracker_mode == 2:
          if new_pcaid not in self.pseudonym_link_switch_PCAID_list[old_pseudonym]:
            continue
          else:
            pass

        position = np.array([float(beacon['PositionX']), float(beacon['PositionY'])])
        distance_diff = euclidean(eavesdropperPos, position)
        if distance_diff < 300 and find_connection(self.disappear_pseudonym_last_seen[old_pseudonym]["LaneID"], beacon["LaneID"]):
            best_pseudonym = pseudonym
            best_beacon = beacon
      if best_pseudonym != "":
        if self.tracker_mode == 1:
          if self.disappear_pseudonym_last_seen[old_pseudonym]["VehicleID"] == best_beacon["VehicleID"]:
            self.link_success_count += 1
            self.link_success_count_mixzone += 1
          return True, best_beacon, best_pseudonym
        elif self.tracker_mode == 2:
          if self.disappear_pseudonym_last_seen[old_pseudonym]["VehicleID"] == best_beacon["VehicleID"]:
            self.link_success_count += 1
            self.link_success_count_normal += 1
          self.pseudonym_link_switch_PCAID_list[best_pseudonym] = self.pseudonym_link_switch_PCAID_list[old_pseudonym]
          del self.pseudonym_link_switch_PCAID_list[old_pseudonym]
          if best_beacon["PCAID"] not in self.pseudonym_link_switch_PCAID_list[best_pseudonym]:
            self.pseudonym_link_switch_PCAID_list[best_pseudonym].append(best_beacon["PCAID"])
          return True, best_beacon, best_pseudonym
        else:
          return False, None, None
      else:
        return False, None, None

    for beacon in new_beacons:
        pseudonym = beacon['Pseudonym']
        new_pcaid = beacon['PCAID']
        position = np.array([float(beacon['PositionX']), float(beacon['PositionY'])])
        distance_diff = euclidean(eavesdropperPos, position)
        if distance_diff < 300 and find_connection(self.disappear_pseudonym_last_seen[old_pseudonym]["LaneID"], beacon["LaneID"]):
            best_score_pseudonym = pseudonym
            best_score_beacon = beacon
    if best_score_pseudonym != "":
      if self.disappear_pseudonym_last_seen[old_pseudonym]["VehicleID"] == best_score_beacon["VehicleID"]:
        self.link_success_count += 1
      return True, best_score_beacon, best_score_pseudonym
    return False, None, None


  def calculate_score(self, distance, speed_diff):
    position_normalized_score = np.clip(1 - distance / (100), 0, 1)
    speed_normalized_score = np.clip(1 - speed_diff / (100), 0, 1)
    base_score = position_normalized_score * 70 + speed_normalized_score * 30
    return base_score


  def evaluate (self, filename):
    self.process_beacons(filename)
    true_vehicle_count = self.vehicle_count # Total number of real vehicles
    observed_vehicle_count = len(self.pseudonym_link_list) # Total number of vehicles observed
    true_pseudonym_change_count = self.pseudonym_change_count # Real pseudonym Switch Count
    observed_pseudonym_change_count = self.observed_pseudonym_changes # Number of observed pseudonym switches

    coverage_results, average_coverage, single_pseudonym_vehicle_count = self.calculate_coverage()

    # Return evaluation results and coverage information
    return {
            "true_vehicle_count": true_vehicle_count,
            "observed_vehicle_count": observed_vehicle_count,
            "true_pseudonym_change_count": true_pseudonym_change_count,
            "observed_pseudonym_change_count": observed_pseudonym_change_count,
            "pseudonym_change_linked_count": self.link_success_count,
            "pseudonym_change_linked_count_rate": self.link_success_count / true_pseudonym_change_count,
            "average_coverage": average_coverage,  # Average tracking coverage of multi-pseudonym vehicles
            "single_pseudonym_vehicle_count": single_pseudonym_vehicle_count,  # The number of vehicles with only one pseudonym.
            "coverage_results": coverage_results,  # Details of each vehicle's coverage
            "link_list":self.pseudonym_link_list,
            "normal_link_success":self.link_success_count_normal,
            "mix_link_success":self.link_success_count_mixzone
        }

  def calculate_coverage(self):
    for observed_vehicle_id, pseudonym_list in self.pseudonym_link_list.items():
      for pseudonym in pseudonym_list:
         self.pseudonym_to_observed_vehicle[pseudonym] = observed_vehicle_id

    coverage_results = {}  # Store the coverage of each real vehicle
    total_coverage = 0.0  # Total coverage, used for calculating the average
    multi_pseudonym_vehicle_count = 0  # The number of vehicles with multiple pseudonyms
    single_pseudonym_vehicle_count = 0  # The number of vehicles with only one pseudonym.
    # Traverse each real vehicle
    for vehicle_id, true_pseudonym_list in self.vehicle_to_pseudonyms.items():
      total_pseudonyms = len(true_pseudonym_list)
      if total_pseudonyms > 1:
          # Only consider vehicles with multiple pseudonyms
        multi_pseudonym_vehicle_count += 1
        observed_vehicle_counts = {}  # The number of pseudonym contained in the storage observation vehicle

         # Statistical distribution of the pseudonym of the actual vehicle in the observed vehicles
        for pseudonym in true_pseudonym_list:
            observed_vehicle_id = self.pseudonym_to_observed_vehicle.get(pseudonym)
            if observed_vehicle_id is not None:
                if observed_vehicle_id in observed_vehicle_counts:
                    observed_vehicle_counts[observed_vehicle_id] += 1
                else:
                    observed_vehicle_counts[observed_vehicle_id] = 1
        if observed_vehicle_counts:
            # Find the observed vehicle containing the most pseudonyms of the vehicle (main observed vehicle)
            main_observed_vehicle_id = max(observed_vehicle_counts, key=observed_vehicle_counts.get)
            correct_pseudonyms = observed_vehicle_counts[main_observed_vehicle_id]
            # The total number of pseudonym observed in the main vehicles
            total_observed_pseudonyms_in_main_vehicle = len(self.pseudonym_link_list[main_observed_vehicle_id])
            incorrect_pseudonyms_in_main_vehicle = total_observed_pseudonyms_in_main_vehicle - correct_pseudonyms

             # Calculate the coverage rate, considering the impact of error allocation
            coverage = (correct_pseudonyms / total_pseudonyms) * (correct_pseudonyms / total_observed_pseudonyms_in_main_vehicle)


            # Cumulative coverage
            total_coverage += coverage


            # Record the results
            coverage_results[vehicle_id] = {
              "coverage": coverage,
              "correct_pseudonyms": correct_pseudonyms,
              "total_pseudonyms": total_pseudonyms,
              "incorrect_pseudonyms_in_main_vehicle": incorrect_pseudonyms_in_main_vehicle,
              "main_observed_vehicle_id": main_observed_vehicle_id
            }
        else:
            coverage_results[vehicle_id] = {
                "coverage": 0.0,
                "correct_pseudonyms": 0,
                "total_pseudonyms": total_pseudonyms,
                "incorrect_pseudonyms_in_main_vehicle": 0,
                "main_observed_vehicle_id": None
            }
      else:
        single_pseudonym_vehicle_count += 1
        coverage_results[vehicle_id] = {
            "coverage": None,
            "correct_pseudonyms": None,
            "total_pseudonyms": total_pseudonyms,
            "incorrect_pseudonyms_in_main_vehicle": None,
            "main_observed_vehicle_id": None
        }
    # Calculate average coverage
    if multi_pseudonym_vehicle_count > 0:
      average_coverage = total_coverage / multi_pseudonym_vehicle_count
    else:
      average_coverage = 0.0

    return coverage_results, average_coverage, single_pseudonym_vehicle_count




In [None]:
file_name_Base = "/content/drive/My Drive/Thesis_data/Data/3PCA/Baseline_1600To1700_PeriodicalPC_3PCA_40%.csv"
print(file_name_Base)
tracker = Tracker(1)
results_Baseline = dict(tracker.evaluate(file_name_Base))

with open('/content/drive/My Drive/tmp/Baseline_40%_3PCA_result.json', 'w') as json_file:
    json.dump(results_Baseline, json_file, indent=4)


/content/drive/My Drive/Thesis_data/Data/3PCA/Baseline_1600To1700_PeriodicalPC_3PCA_40%.csv


In [None]:
file_name_SR = "/content/drive/My Drive/Thesis_data/Data/5PCA/RIPD_1600To1700_PeriodicalPC_5PCA_40%.csv"
print(file_name_SR)
tracker = Tracker(2)
results_SR = dict(tracker.evaluate(file_name_SR))

with open('/content/drive/My Drive/tmp/RIPD_40%_5PCA_result.json', 'w') as json_file:
    json.dump(results_SR, json_file, indent=4)

/content/drive/My Drive/Thesis_data/Data/5PCA/RIPD_1600To1700_PeriodicalPC_5PCA_40%.csv
