In [1]:
import csv
import os
import numpy as np
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

In [2]:
# 1. Extract roads' static features.

# Sumo's network file contains roads' static information.
# Capture the .xml file and save into dictionary to map road ID -> static features.
def extract_static_features(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    # Initialize dictionary.
    edge_features = []
    
    for edge in root.findall('edge'):
        edge_id = edge.get('id')
        lanes = edge.findall('lane')
        num_lanes = len(lanes)
        avg_speed = sum(float(lane.get('speed')) for lane in lanes) / num_lanes
        avg_length = sum(float(lane.get('length')) for lane in lanes) / num_lanes
        # Map road ID -> static features.
        edge_features.append((edge_id, num_lanes, avg_speed, avg_length))
    
    return edge_features

# Define data path.
path = os.path.abspath('./../SUMO_Simulation') 
xml_file = os.path.join(path,'test.net.xml') 
print('network path is:', xml_file)
static_features = extract_static_features(xml_file)


# Save roads' static information into .csv file.
def save_to_csv(data, filename):
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Edge ID', 'Number of Lanes', 'Average Speed', 'Average Length'])
        writer.writerows(data)

save_to_csv(static_features, 'edge_2_static_features_dict.csv')

network path is: /Users/xuzizhuo/Desktop/Main Folder/My_works/routeSys_Github/Traffic_Simulation_Data_Generation_for_Model_Training/SUMO_Simulation/test.net.xml


In [7]:
# 2. Capture road's connection relationships.

# Sumo's network file also contains roads' connection relationships.
# E.g. road 'A' connect to 'B' by going  straight.
def extract_connection_features(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    #  Initialize dictionary.
    connection_dict = {}
    
    for connection in root.findall('connection'):
        from_edge = connection.get('from')
        to_edge = connection.get('to')
        turning_direction = connection.get('dir')
        # Map a road to all its connected roads with different turning directions.
        connection_dict[from_edge] = (to_edge, turning_direction)
    
    return connection_dict

connection_features = extract_connection_features(xml_file)
# print(connection_features)

In [10]:
# 3. Road classification.
# Classify all roads into 'N' categories, and generate "Manual Allocation Data" based on each categroy.

# Load road's static information.
def load_static_features_from_csv(filename):
    # Initialize dictionary.
    static_features = []
    
    with open(filename, 'r') as csvfile:
        reader = csv.reader(csvfile)
        next(reader) # Skip header
        for row in reader:
            static_features.append((row[0], int(row[1]), float(row[2]), float(row[3])))
            
    return static_features

# Feature standardization
def standardize_data(features):
    scaler = StandardScaler()
    standardized_features = scaler.fit_transform(features)
    
    return standardized_features, scaler

# Select pre-defined number of roads from each category 
def select_representative_edges(edges, labels, selection_dict):
    representative_edges = {}
    unique_labels = np.unique(labels)
    
    for label in unique_labels:
        indices = np.where(labels == label)[0]
        distances = np.linalg.norm(features[indices] - kmeans.cluster_centers_[label], axis=1)
        sorted_indices = indices[np.argsort(distances)]
        num_to_select = selection_dict.get(label, 1)  # 默认选择一个
        
        if len(sorted_indices) < num_to_select:
            print(f"Warning: Cluster {label} has fewer samples ({len(sorted_indices)}) than the required number ({num_to_select})")
        
        representative_edges[label] = [edges[idx] for idx in sorted_indices[:num_to_select]]
    
    return representative_edges

# Elbow-plot to find pre-defined clusters
def find_optimal_clusters(data, max_clusters):
    sse = []
    
    for k in range(1, max_clusters+1):
        kmeans = KMeans(n_clusters=k)
        kmeans.fit(data)
        sse.append(kmeans.inertia_)  # 计算误差平方和
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, max_clusters+1), sse, marker='o')
    plt.xlabel('Number of clusters')
    plt.ylabel('SSE')
    plt.title('Elbow Method')
    plt.show()

    
# Load dictionary to map road ID to static features
static_features = load_static_features_from_csv('edge_2_static_features_dict.csv')
# Select features
features = np.array([list(edge[1:]) for edge in static_features])

# Data standardization
standardized_features, scaler = standardize_data(features)

# KMeans to classify roads
kmeans = KMeans(n_clusters=100, random_state=42)
labels = kmeans.fit_predict(standardized_features)

# Select pre-defined number of roads from each category
num = 5   # Assume there are 100 clusters and we select 5 each, we generate route data based on these 500 roads
selection_dict = {label: num for label in np.unique(labels)}
representative_edges = select_representative_edges(static_features, labels, selection_dict)
# Print selected roads' static features
count = 0
for label, edges in representative_edges.items():
    for edge in edges:
        print(f'Cluster {label}: Edge ID: {edge[0]}, Num Lanes: {edge[1]}, Avg Speed: {round(edge[2])}, Avg Length: {round(edge[3],2)}')
        count += 1
print(f"count: {count}")

# Elbow-plot
find_optimal_clusters(standardized_features, 100)

# Print selected road IDs
formatted_edge_ids = [edge[0] for edges in representative_edges.values() for edge in edges]
print("edge_ids =", formatted_edge_ids)


  super()._check_params_vs_input(X, default_n_init=10)


Cluster 0: Edge ID: 194933372#10, Num Lanes: 2, Avg Speed: 11, Avg Length: 0.43
Cluster 0: Edge ID: 194933372#5, Num Lanes: 2, Avg Speed: 11, Avg Length: 0.7
Cluster 0: Edge ID: 575746067#2, Num Lanes: 2, Avg Speed: 11, Avg Length: 0.76
Cluster 0: Edge ID: 195743166#1, Num Lanes: 2, Avg Speed: 11, Avg Length: 0.76
Cluster 0: Edge ID: 981182867#1, Num Lanes: 2, Avg Speed: 11, Avg Length: 0.81
Cluster 1: Edge ID: 1117867041#1, Num Lanes: 1, Avg Speed: 28, Avg Length: 2.17
Cluster 1: Edge ID: -486583092, Num Lanes: 1, Avg Speed: 28, Avg Length: 2.31
Cluster 1: Edge ID: 486583092, Num Lanes: 1, Avg Speed: 28, Avg Length: 2.31
Cluster 1: Edge ID: 324118557#1, Num Lanes: 1, Avg Speed: 28, Avg Length: 2.44
Cluster 1: Edge ID: 46613661#2, Num Lanes: 1, Avg Speed: 28, Avg Length: 2.8
Cluster 2: Edge ID: 5673054#1, Num Lanes: 1, Avg Speed: 14, Avg Length: 88.76
Cluster 2: Edge ID: -5673054#1, Num Lanes: 1, Avg Speed: 14, Avg Length: 88.76
Cluster 2: Edge ID: 846362785#2, Num Lanes: 1, Avg Speed:

In [12]:
# 4. Save selected road IDs into .txt file
with open('classfied_edge_ids.txt', 'w') as file:
    for edge_id in formatted_edge_ids:
        file.write(edge_id + '\n')
print("Road IDs are saved in 'classfied_edge_ids.txt'.")


Edge IDs 已成功保存到 'classfied_edge_ids.txt'.
