# Final Simulation of Singular Routes

In [None]:
import main_1 as m
import models.road_network.create_graph as cg
import models.vehicle_models.battery_deg as bd
import scipy.stats as stats

import json
from pprint import pprint
import matplotlib.pyplot as plt
import simulation.simulate_routes as sr
import datetime
import os
import numpy as np


In [None]:
road_network_file, road_df, static_data, vehicle_data, battery_data, map_data, weights = m.import_data()

In [None]:
OCV = battery_data["OCV"]
capacity = battery_data["Capacity"]
R_int = battery_data["R_internal"]
motor_eff = vehicle_data["motor_eff"]
graph = cg.create_osmnx_compatible_graph(road_network_file, debug = False)

In [None]:
# m.create_new_test_set(road_df)

In [None]:
with open("test_data/test_route_set.json", "r") as file:
        test_routes_dict = json.load(file)

## Simulate a Set of Routes

In [None]:
def simulate(results_log_file,results_filename, test_set_data ):
    simulation_results = {}

    start_time = datetime.datetime.now()
    start_time_str = start_time.strftime("%B %d, %Y at %I:%M:%S %p")

    with open(results_log_file, 'w') as log_file:
        log_file.write(f"{'='*50}\n")
        log_file.write(f"SIMULATION STARTED: {start_time_str}\n")
        log_file.write(f"Output file: {os.path.abspath(results_filename)}\n")
        log_file.write(f"Test set: test_set1\n")
        log_file.write(f"Routes to process: {len(test_set_data)}\n")
        log_file.write(f"{'='*50}\n\n")

# Use enumerate instead of manual counter
    for i, points in enumerate(test_set_data, 1):
        print(f"Processing route {i}/{len(test_set_data)}: {points[0]} to {points[1]}")
        
        # Initialize dictionary entry for this simulation
        simulation_results[f'sim{i}'] = {
            'start_point': points[0],
            'end_point': points[1],
            'optimised': {},
            'distance': {}
        }
        
        try:
            # Calculate optimised route
            route_output_optimised = sr.find_route(
                map_data, road_df, graph, points[0], points[1], 
                weights, plot=False, weights_type='objective'
            )
            
            # Store optimised route data
            opt_results = sr.return_route_data_complex(
                route_output_optimised, vehicle_data, static_data, 
                motor_eff, battery_data
            )
            
            simulation_results[f'sim{i}']['optimised'] = {
                'total_distance': opt_results[0],
                'total_consumption': opt_results[1],
                'total_climb': opt_results[2],
                'detailed_results': opt_results[3],
                'current_list': opt_results[4],
                'climbs': opt_results[5],
                'distances': opt_results[6],
                'consumptions': opt_results[7]
            }
            
            # Calculate distance-based route
            route_output_distance = sr.find_route(
                map_data, road_df, graph, points[0], points[1], 
                weights, plot=False, weights_type='distance'
            )
            
            # Store distance route data
            dist_results = sr.return_route_data_complex(
                route_output_distance, vehicle_data, static_data, 
                motor_eff, battery_data
            )
            
            simulation_results[f'sim{i}']['distance'] = {
                'total_distance': dist_results[0],
                'total_consumption': dist_results[1],
                'total_climb': dist_results[2],
                'detailed_results': dist_results[3],
                'current_list': dist_results[4],
                'climbs': dist_results[5],
                'distances': dist_results[6],
                'consumptions': dist_results[7]
            }
            
            print(f"Route {i}/{len(test_set_data)} complete")
            
        except Exception as e:
            print(f"Error on route {points}: {str(e)}")
            simulation_results[f'sim{i}']['error'] = str(e)

    class CustomEncoder(json.JSONEncoder):
        def default(self, obj):
            # Handle various non-serializable types
            if isinstance(obj, (np.bool_, bool)) or str(type(obj)) == "<class 'bool'>":
                return bool(obj)
            elif isinstance(obj, np.ndarray):
                return obj.tolist()
            elif isinstance(obj, np.integer):
                return int(obj)
            elif isinstance(obj, np.floating):
                return float(obj)
            elif hasattr(obj, 'tolist'):
                return obj.tolist()
            elif hasattr(obj, '__dict__'):
                return obj.__dict__
            return super().default(obj)

    # Use this encoder
    with open(results_filename, "w") as file:
        json.dump(simulation_results, file, indent=4, cls=CustomEncoder)

    end_time = datetime.datetime.now()
    end_time_str = end_time.strftime("%B %d, %Y at %I:%M:%S %p")
    total_duration = end_time - start_time

    with open(results_log_file, 'a') as log_file:
        log_file.write(f"\n{'='*50}\n")
        log_file.write(f"SIMULATION COMPLETED: {end_time_str}\n")
        log_file.write(f"Total duration: {total_duration}\n")
        log_file.write(f"Routes processed: {len(test_set_data)}\n")
        log_file.write(f"Results saved to: {os.path.abspath(results_filename)}\n")
        log_file.write(f"{'='*50}\n\n")


In [None]:
test_set_data = test_routes_dict['test_set5']
results_filename = "results/singular_routes_sim/simulation_test_set5.json"
results_log_file = "./results/singular_routes_sim/simulation_log5.txt"

simulate(results_log_file, results_filename, test_set_data)

## Collect Consumptions for a Set

In [None]:
def collect_consumptions(results_file: str):
    with open(results_file, "r") as file:
        sim_results = json.load(file)

    consumption_dict = {}
    consumptions_distance = []
    consumptions_optimised = []
    for key, values in sim_results.items():
        if f'{key}' not in consumption_dict:
            consumption_dict[f'{key}'] = {}

        found_consumption = False
        for test, test_data in values.items():
            if 'distance' in test:
                for id, item in test_data.items():
                    consumption_dict[f'{key}'][f'{test}'] = test_data['total_consumption']
                    consumptions_distance.append(test_data['total_consumption'])
                    found_consumption = True
                    break
            elif 'optimised' in test:
                for id, item in test_data.items():
                    consumption_dict[f'{key}'][f'{test}'] = test_data['total_consumption']
                    consumptions_optimised.append(test_data['total_consumption'])
                    found_consumption = True
                    break
    if not found_consumption:
        pass
    return consumptions_distance, consumptions_optimised

# cap_loss_distance, cap_loss_optimised = collect_consumptions('results/singular_routes_sim/simulation_test_set1.json')
# print(len(cap_loss_distance))

In [None]:
def collect_ageing(results_file: str):
    with open(results_file, "r") as file:
        sim_results = json.load(file)
    
    results_dict = {}
    cap_loss_optimised = []
    cap_loss_distance = []
    for key, values in sim_results.items():
        # print(key)
        results_dict[key] = {}  # No need for f-string here
        
        for test, test_data in values.items():
            if 'distance' in test:
                for id, item in test_data.items():
                    # Initialize the test dictionary first
                    results_dict[key][test] = {}  # No need for f-string here
                    
                    # Calculate capacity loss using your existing function
                    capacity_loss = bd.route_analysis(
                        test_data['detailed_results'], 
                        test_data['current_list'], 
                        test_data['consumptions'],
                        OCV, 
                        capacity
                    )
                    # Store just the capacity loss result
                    cap_loss_distance.append(capacity_loss)
                    results_dict[key][test]['capacity_loss'] = capacity_loss
                    break
            if 'optimised' in test:
                for id, item in test_data.items():

                    # Initialize the test dictionary first
                    results_dict[key][test] = {}  # No need for f-string here
                    
                    # Calculate capacity loss using your existing function
                    capacity_loss = bd.route_analysis(
                        test_data['detailed_results'], 
                        test_data['current_list'], 
                        test_data['consumptions'],
                        OCV, 
                        capacity
                    )
                    # Store just the capacity loss result
                    cap_loss_optimised.append(capacity_loss)
                    results_dict[key][test]['capacity_loss'] = capacity_loss
                    break
            

                
    return cap_loss_distance, cap_loss_optimised


In [None]:
def plot_percentage_difference(normal_list: list, optimised_list: list, type:str, title_prefix: str = "", 
                               equal_threshold: float = 0.001, results_file=None):
    # Convert lists to numpy arrays
    normal = np.array(normal_list)
    optimised = np.array(optimised_list)
    
    # Calculate percentage difference: (optimised - normal) / normal * 100
    # Using np.divide with 'out' parameter to handle division by zero
    percent_diff = np.zeros_like(normal, dtype=float)
    np.divide((optimised - normal), normal, out=percent_diff, where=normal!=0)
    percent_diff = percent_diff * 100  # Convert to percentage
    
    # Create figure
    fig, ax = plt.subplots(figsize=(14, 4))
    
    # Set title
    if type == 'consumption':
        if title_prefix:
            title = f"{title_prefix} - Energy Consumption Percentage Comparison"
        else:
            title = "Energy Consumption Percentage Comparison"
    elif type == 'degradation':
        if title_prefix:
            title = f"{title_prefix} - Capacity Loss Percentage Comparison"
        else:
            title = "Capacity Loss Percentage Comparison"
    
    fig.suptitle(title, fontsize=18, fontweight='bold')
    
    # Plot the percentage difference
    x = np.arange(len(normal))
    ax.plot(x, percent_diff, '.-', color='teal', linewidth=2, markersize=8)
    ax.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    
    # Set labels
    ax.set_xlabel('Route Index', fontsize=16)
    ax.set_ylabel('Percentage Difference (%)', fontsize=16)
    
    # Set x-ticks for better readability
    ax.set_xticks(x[::10])
    
    # Add grid
    ax.grid(True, linestyle='--', alpha=0.7)
    
    # Identify where values are "the same" (based on threshold)
    is_better = percent_diff < -equal_threshold
    is_worse = percent_diff > equal_threshold
    is_same = (~is_better) & (~is_worse)
    
    # Color-code the differences
    if type == 'consumption':
        ax.fill_between(x, percent_diff, 0, where=is_better, color='green', alpha=0.3, 
                       label='Optimised Reduces Consumption')
        ax.fill_between(x, percent_diff, 0, where=is_worse, color='red', alpha=0.3, 
                       label='Non-Optimised Reduces Consumption')
        ax.fill_between(x, percent_diff, 0, where=is_same, color='blue', alpha=0.2, 
                       label='Equal Consumption (±{:.2f}%)'.format(equal_threshold*100))
    if type == 'degradation':
        ax.fill_between(x, percent_diff, 0, where=is_better, color='green', alpha=0.3, 
                       label='Optimised Reduces Capacity Loss')
        ax.fill_between(x, percent_diff, 0, where=is_worse, color='red', alpha=0.3, 
                       label='Non-Optimised Reduces Capacity Loss')  
        ax.fill_between(x, percent_diff, 0, where=is_same, color='blue', alpha=0.2, 
                       label='Equal Capacity Loss (±{:.2f}%)'.format(equal_threshold*100))
    
    # Add legend
    ax.legend(fontsize=14)
    
    # Add summary statistics as text
    average_percent = np.mean(percent_diff)
    median_percent = np.median(percent_diff)
    percent_improved = (is_better).sum() / len(percent_diff) * 100
    percent_same = (is_same).sum() / len(percent_diff) * 100
    percent_worse = (is_worse).sum() / len(percent_diff) * 100
    std_dev = np.std(percent_diff)
    
    # Coefficient of Variation (CV)
    # We use the absolute mean since percentage differences can be around zero
    # making CV meaningless with a denominator close to zero
    cv = std_dev / abs(average_percent) if average_percent != 0 else np.nan
    
    # Statistical significance (paired t-test)
    t_stat, p_value = stats.ttest_rel(optimised, normal)
    
    # 95% Confidence Interval for mean percentage difference
    # Using t-distribution for small samples
    n = len(percent_diff)
    sem = std_dev / np.sqrt(n)  # Standard error of the mean
    t_critical = stats.t.ppf(0.975, n-1)  # 95% confidence (two-tailed)
    ci_lower = average_percent - t_critical * sem
    ci_upper = average_percent + t_critical * sem
    
    stats_text = (f"Average percentage difference: {average_percent:.2f}%\n"
                 f"Median percentage difference: {median_percent:.2f}%\n"
                 f"Routes improved: {percent_improved:.1f}%\n"
                 f"Routes the same: {percent_same:.1f}%\n"
                 f"Routes worsened: {percent_worse:.1f}%\n"
                 f"Standard deviation: {std_dev:.2f}%\n"
                 f"Coefficient of variation: {cv:.2f}\n"
                 f"95% Confidence interval: [{ci_lower:.2f}%, {ci_upper:.2f}%]\n"
                 f"p-value (paired t-test): {p_value:.6f} {'(significant)' if p_value < 0.05 else '(not significant)'}")
    
    print(stats_text)
    
    # Write results to file if provided
    if results_file:
        with open(results_file, 'a') as f:
            if type == 'degradation':
                f.write(f"capacity results\n")
            else:
                f.write(f"consumption results\n")
            f.write(stats_text + "\n\n")
    
    plt.tight_layout()
    plt.show()
    
    # Return the percentages for potential further use
    return {
        "improved": percent_improved,
        "same": percent_same,
        "worse": percent_worse
    }


In [None]:
def analyse_simulation_cap_loss(sim, results_file):
    if '1' in str(sim):
        title = 'Simulation 1'
    if '2' in str(sim):
        title = 'Simulation 2'
    if '3' in str(sim):
        title = 'Simulation 3'      
    if '4' in str(sim):
        title = 'Simulation 4'      
    if '5' in str(sim):
        title = 'Simulation 5'   
    
    # Write simulation header to file
    with open(results_file, 'a') as f:
        f.write(f"{title}:\n")
                
    cap_loss_distance, cap_loss_optimised = collect_ageing(sim)

    # Then plot using the filtered data
    plot_percentage_difference(cap_loss_distance, cap_loss_optimised, 'degradation', title, results_file=results_file)


In [None]:
def analyse_simulation_consumption(sim, results_file):
    if '1' in str(sim):
        title = 'Simulation 1'
    if '2' in str(sim):
        title = 'Simulation 2'
    if '3' in str(sim):
        title = 'Simulation 3'      
    if '4' in str(sim):
        title = 'Simulation 4'      
    if '5' in str(sim):
        title = 'Simulation 5'   
                
    consumption_distance, consumption_optimised = collect_consumptions(sim)

    # Then plot using the filtered data
    plot_percentage_difference(consumption_distance, consumption_optimised, 'consumption', title, results_file=results_file)


In [None]:
results_file = "route_simulation_results.txt"
with open(results_file, 'w') as f:
    f.write("Simulation Results Summary\n")
    f.write("=========================\n\n")

simulation1 = "results/singular_routes_sim/simulation_test_set1.json"
simulation2 = "results/singular_routes_sim/simulation_test_set2.json"
simulation3 = "results/singular_routes_sim/simulation_test_set3.json"
simulation4 = "results/singular_routes_sim/simulation_test_set4.json"
simulation5 = "results/singular_routes_sim/simulation_test_set5.json"
sims = [simulation1, simulation2, simulation3, simulation4, simulation5]

for sim in sims:
    analyse_simulation_cap_loss(sim, results_file)
    analyse_simulation_consumption(sim, results_file)