In [5]:
import os
import sys
import itertools
import collections

from joblib import Parallel,delayed
import random
import numpy as np
from matplotlib import pyplot as plt 
from matplotlib.ticker import MaxNLocator

sys.path.append(os.path.abspath('.'))
from common_utils.fileio import load_json, save_as_json, load_yaml
from common_utils.preserve_lib import TOWN_OBJECTS, SCENARIO_AGENTS

# Basic Functions

In [None]:
def deep_update_dict(d, u):
    """d: old dict
    u: new dict"""
    for k, v in u.items():
        if isinstance(v, collections.abc.Mapping):
            d[k] = deep_update_dict(d.get(k, {}), v)
        else:
            d[k] = v
    return d

In [6]:
def get_agents_in_scenario(scenario_root, sep=True):
    cav_list = []
    rsu_list = []
    for agent in os.listdir(scenario_root):
        if agent.startswith("cav"):
            cav_list.append(int(agent.split("cav_")[1]))
        elif agent.startswith("rsu"):
            rsu_list.append(int(agent.split("rsu_")[1]))

    if sep:
        return cav_list,rsu_list
    else:
        return cav_list+rsu_list

In [7]:
def draw_num_conn(conn_info, scenario_name, target_agent_list=None):
    plt.figure(figsize=(15,5))
    linestyle = "-"
    max_conn = 0
    for k,v in sorted(conn_info.items(), key=lambda v:v[0]):
        if target_agent_list is not None and k not in target_agent_list:
            continue
        v = np.array(v)
        v = v[v>=0]
        max_conn = max(max_conn, np.max(v))
        plt.plot(v, linestyle=linestyle, label=k)

    plt.ylim(-1, max_conn+1)
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.gca().yaxis.set_major_locator(MaxNLocator(integer=True))
    plt.title("Conn agents at each frame for {0} (n={1})".format(scenario_name, len(conn_info)))
    plt.xlabel("#frames")
    plt.ylabel("#conn")
    thr = 10
    if (target_agent_list is None and len(conn_info)<=thr) or\
        (target_agent_list is not None and len(target_agent_list)<=thr):
        plt.legend(bbox_to_anchor=(1.08,0), loc='lower right', borderaxespad=0)
    plt.show()

In [8]:
def check_movement(ego_candidate_list, scenario_name):
    ego_list = []
    for cav in ego_candidate_list:
        if cav in SCENARIO_AGENTS[scenario_name]["recommended_ego_cav_list"]:
            ego_list.append(cav)
    ego_list.sort()
    return ego_list

def count_zero_conn(conn_list):
    """
    
    Parameters
    ----------
    conn_list: List|ndarray
        Number of connected agents at each frame, e.g., [2,5,6,...]

    Returns
    -------
    flag: bool
        True if the agent can be an ego.
    """
    conn_list = np.array(conn_list)
    conn_list = conn_list[conn_list>=0]  # remove invalid frames
    n = np.sum(conn_list == 0)
    return n,len(conn_list)

def check_conn_for_candidates(conn_info, thr_ratio=0.5):
    """If the time period when the agent has no connection with other agents 
    takes up more than `thr_ratio` for the whole sequence, the agent is not 
    recommended to be ego when training.

    Paramters
    ---------
    conn_info: dict
        E.g.: {
            "agent_id": [3,4,5,...], # number of connected agents at each frame
        }
    
    thr_ratio: float
        The ratio to determine whether an agent can be an ego.

    Returns
    -------
    ego_list: List[int]
        Ids of agents that can be ego.
    zero_conn: dict
        CAV and its zero connections.
    """
    assert thr_ratio>=0.0 and thr_ratio<=1.0
    
    ego_list = []
    zero_conn = {}
    for cav,conn_list in conn_info.items():
        n_zero,n = count_zero_conn(conn_list)
        r = n_zero/n
        if r<thr_ratio:
            ego_list.append(cav)
        if n_zero>0:
            zero_conn[cav] = n_zero
    return ego_list,zero_conn

# Get Connections
Count connections for each agent at each frame 

In [None]:
def read_conn_from_yaml(yaml_path, agent, timestamp):
    if not os.path.exists(yaml_path):
        return {}
    data = load_yaml(yaml_path)
    return {timestamp: {
                agent: data['conn_agents']
            }}

def get_conn_detail(dataset_root, save_path=None):
    """Get conn info of the dataset to a JSON file
    to facilitate ego selection.

    Parameters
    ----------
    dataset_root: str
        root dir.

    save_path: str
        path to save JSON.

    Returns
    -------
    all_conn_detail: dict
        Format: {"map_name": {
            "timestamp": {
                "cav_208": [208, 122],
                "rsu_222": [22, 10], 
                ...
            }
        }}
    """
    all_conn_detail = {}
    for_test = False
    for map_name in os.listdir(dataset_root):
        agent_list = [v for v in os.listdir(os.path.join(dataset_root, map_name))
                      if v.startswith("cav") or v.startswith("rsu")]
        timestamp_list = [v.split(".yaml")[0] for v in os.listdir(os.path.join(dataset_root, map_name, "map"))]
        if for_test:
            agent_list = agent_list[:3]
            timestamp_list = timestamp_list[:3]
        res = Parallel(n_jobs=8, verbose=1, backend="multiprocessing")(delayed(read_conn_from_yaml)
                                                                       (os.path.join(dataset_root, map_name, agent, timestamp+".yaml"), agent, timestamp) for 
                                                                       agent,timestamp in itertools.product(agent_list, timestamp_list))
        all_conn_detail[map_name] = {}
        for item in res:
            deep_update_dict(all_conn_detail[map_name], item)
    # save
    if save_path is not None:
        save_as_json(all_conn_detail, save_path)
    return all_conn_detail

In [None]:
%%time
if __name__ == '__main__':
    # save detailed conn at each frame
    # this process may take tens of minutes
    root = "root of Multi-V2X"
    save_path = "./results/conn_detail.json"
    conn_detail = get_conn_detail(root, save_path)

# Sub-dataset Generation
By masking some equipped cars as normal vehicles, sub-dataset with specific CAV penetration rate can be generated.

In [9]:
def random_choose_cav(conn_detail, town_name, dataset_root, pr, seed=None, verbose=0):
    """Randomly choose CAV according to connection info and given 
    `pr` (penetration rate).

    Parameters
    ----------
    conn_detail: dict
        Connection detail for the whole dataset

    cav_candidate_list: list[int]
        For example, [338, 256]

    pr: [0,1]
        Expected penetration rate

    seed: int
        Random seed

    Returns
    -------
    conn_info: dict
        Each selected CAV's #conn at each frame. e.g.: {
            "cav_id": [3,3,4,...] # num of conn at each frame
        }

    Notes
    -----
    1. Some CAV may appear or disappear at certain monments.
    """
    assert pr>0.0 and pr<=1.0

    num_veh,num_cav = TOWN_OBJECTS[town_name]["num_veh"],TOWN_OBJECTS[town_name]["num_cav"]
    num_expected_cav = min(int(np.floor(num_veh*pr)), num_cav)
    cav_candidate_list,rsu_list = get_agents_in_scenario(os.path.join(dataset_root, town_name))
    random.seed(seed)
    cav_list = random.sample(cav_candidate_list, num_expected_cav)
    cav_list.sort()

    # structure: {cav_id: [2,1,4]}, i.e., num of connections at each timestamp
    # -1 denotes the cav doesn't exist at that time
    conn_info = {cav: [] for cav in cav_list}
    
    possible_agent_set = set(cav_list + rsu_list)
    for timestamp,cav_conn in sorted(conn_detail[town_name].items(), key = lambda v:v[0]):
        for cav in cav_list:
            if "cav_" + str(cav) in cav_conn:
                conn_agents = cav_conn["cav_"+str(cav)]
                n = len(possible_agent_set & set(conn_agents))
                n = n-1 # remove ego
                conn_info[cav].append(n)
            else:
                conn_info[cav].append(-1)
                if verbose == 2:
                    print("Warning: agent {0} not in conn_detail at {1}".format(cav, timestamp))
    
    # summary
    num,den = 0,0
    min_conn,max_conn = 1000,0
    for v in conn_info.values():
        v = np.array(v)
        indices = v>=0
        min_conn = min(min_conn, np.min(v[indices]))
        max_conn = max(max_conn, np.max(v))
        num += np.sum(v[indices])
        den += np.sum(indices)
    if verbose == 1:
        print("In map {0}, {1} out of {2} are chosen as CAVs, actual penetration rate {3:.3f}({4}/{5})".format(
            town_name, len(cav_list), len(cav_candidate_list), len(cav_list)/num_veh, len(cav_list), num_veh
        ))
        print(len(cav_list), "CAV:", cav_list)
        print("#conn: min: {0}, max: {1}, avg: {2}".format(min_conn, max_conn, num/den))
    
    return conn_info

In [10]:
def generate_ego_list_for_various_pr(json_path, dataset_root, pr_list=[0.1], thr_conn=0.3, max_num_of_samples=12000, save_path=None):
    """Generate ego list for various CAV peneratration rate.
    Steps:
        1. Randomly choose part of cars with sensors as CAV
        2. Select ego from current CAV list, the ego should meet:
            a. it's moving at most of the time
            b. it can communicate with other agents at most of the time
    
    Parameters
    ----------
    json_path: str
        Path for conn detail.
    dataset_root: str
        root for dataset.
    pr_list: list[float]
        Expected CAV penetration rates.
    thr_conn: float
        If the time period when an agent lose connection with other agents 
        takes up more than `thr_conn`, the agent is recommended to be non-ego
        agent.
    max_num_of_samples : int
        The maximum number of samples counted by egos.
    save_path: str
        Path to save result.
    
    Returns
    --------
    pr_config:
        Agent config list for each given CAV pepetration rate (pr), e.g.: 
        {
            "0.1": [
                {"scenario_name":xxx, "seed": 1, "cav_list": [], "ego_list": [], "rsu_list": []},
                ...
            ],
            ...
        }

    Notes
    -----
    1. After above filterings, a selected ego may still lose connection with 
    other agents at some frames, which requires consideration of historic 
    observations.
    """
    seed_list = [42, 43, 44, 45, 46, 47, 48, 49, 50, 51]
    conn_detail = load_json(json_path)

    # To ensure every town can be visited and reduce redundancy, 
    # in each town, only part of CAVs are taken as ego when training.
    # E.g., 100 CAVs in a town, only 15 of them are taken as ego car.
    # The number can be varied depending on penetration rate and wanted training size.
    thr_ego_in_cav = 0.15  
    
    pr_to_config_list = {}
    
    for pr in pr_list:
        agent_config_list = []
        num_of_samples = 0
        has_reached_max_num = False
        max_conn = 0
        total_ego = 0

        # for each seed, go through all towns to increase diversity
        for i,seed in enumerate(seed_list):
            for scenario_name in os.listdir(dataset_root):
                conn_info = random_choose_cav(conn_detail, scenario_name, dataset_root, pr, seed, verbose=0)
                
                # check: only car that always connect with others and moves
                # can be ego
                moving_ego_list = check_movement(conn_info.keys(), scenario_name)
                connectable_ego_list,zero_conn = check_conn_for_candidates(conn_info, thr_conn)
                ego_list = list(set(moving_ego_list) & set(connectable_ego_list))

                # for each scenario, only part of CAVs can be ego at most
                n = int(len(conn_info)*thr_ego_in_cav)
                if n>0:
                    n = min(n, len(ego_list))
                random.seed(seed)
                ego_list = random.sample(ego_list, n)

                # limit total samples counted by egos
                used_ego_list = []
                scenario_root = os.path.join(dataset_root, scenario_name)
                for ego_id in ego_list:
                    ego_root = os.path.join(scenario_root, "cav_" + str(ego_id))
                    num_of_ego_samples = len([x for x in os.listdir(ego_root)
                                              if x.endswith(".pcd")])
                    if num_of_samples + num_of_ego_samples > max_num_of_samples:
                        has_reached_max_num = True
                        break
                    used_ego_list.append(ego_id)
                    num_of_samples += num_of_ego_samples

                _,rsu_list = get_agents_in_scenario(scenario_root)
                
                agent_config_list.append({
                    "scenario_name": scenario_name,
                    "seed": seed,
                    "cav_list": list(conn_info.keys()),
                    "ego_list": used_ego_list,
                    "rsu_list": rsu_list
                })

                # the maximum number of connections
                if len(used_ego_list)>0:
                    max_conn = max(max_conn, np.max([np.max(v) 
                                                     for k,v in conn_info.items() 
                                                     if k in used_ego_list]))
                    total_ego += len(ego_list)

                # the number of samples is big enough
                if has_reached_max_num:
                    break

            if has_reached_max_num:
                break

        pr_to_config_list[pr] = agent_config_list
        # print summary
        num_of_seeds = len(set([el['seed'] for el in agent_config_list]))
        num_of_towns = len(set([el['scenario_name'] for el in agent_config_list]))
        print("pr={0}, max_conn={1}, total ego={2}, total samples={3}, used seeds={4}, used towns={5}".format(
            pr, max_conn, total_ego, num_of_samples, num_of_seeds, num_of_towns))
    
    if save_path is not None:
        save_as_json(pr_to_config_list, save_path)

    return pr_to_config_list

In [11]:
%%time
if __name__ == '__main__':
    json_path = "./results/conn_detail.json"
    dataset_root = "root of Multi-V2X"
    save_path = "./results/pr_config_list_15k.json"
    pr_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
    res = generate_ego_list_for_various_pr(json_path, 
        dataset_root, 
        pr_list, 
        thr_conn=0.3, 
        max_num_of_samples=15000,
        save_path=save_path)

pr=0.1, max_conn=8, total ego=48, total samples=14943, used seeds=7, used towns=6
pr=0.2, max_conn=13, total ego=51, total samples=14853, used seeds=4, used towns=6
pr=0.3, max_conn=15, total ego=49, total samples=14816, used seeds=3, used towns=6
pr=0.4, max_conn=19, total ego=53, total samples=14934, used seeds=2, used towns=6
pr=0.5, max_conn=20, total ego=48, total samples=14856, used seeds=2, used towns=6
pr=0.6, max_conn=23, total ego=50, total samples=14731, used seeds=1, used towns=6
CPU times: user 3.09 s, sys: 323 ms, total: 3.41 s
Wall time: 7.82 s
