In [87]:
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import seaborn as sns
import matplotlib.pyplot as plt
import sqlite3
import plotly.io as pio

In [88]:
def map_tech_to_mode(tech):
    class_mapping = {
    'T_HDV_AJ': 'Air jet',
    'T_HDV_B': 'Bus',
    'T_HDV_R': 'Rail',
    'T_HDV_T': 'HD Truck',
    'T_HDV_W': 'Marine vessel',
    'T_MDV_T': 'MD Truck',
    'T_LDV_C_': 'LD Car',
    'T_LDV_LT': 'LD Truck',
    'T_LDV_M': 'Motorcycle',
    'T_IMP_': 'Fuel use',
    'H2_COMP_100_700': 'Fuel use'
    }

    for prefix, class_name in class_mapping.items():
        if tech.startswith(prefix):
            return class_name
    return 'Other'

def map_tech_to_fuel(tech):
    carrier_mapping = {
        'BEV': 'Battery electric',
        'GSL': 'Gasoline',
        'DSL': 'Diesel',
        'CNG': 'Compressed NG',
        'LNG': 'Liquified NG',
        'JTF': 'Jet Fuel',
        'SPK': 'Synth. Jet Fuel',
        'HFO': 'Heavy Fuel Oil',
        'MDO': 'Marine Diesel Oil',
        'ELC': 'Electricity',
        'ETH': 'Ethanol',	
        'RDSL': 'Ren. Diesel',
    }
    # Order matters 
    if 'PHEV35' in tech:
        return 'PHEV (35-mile AER)'
    if 'PHEV50' in tech:
        return 'PHEV (50-mile AER)'
    if 'PHEV' in tech:
        return 'Plug-in hybrid'
    if 'BEV150' in tech:
        return 'BEV (150-mile AER)'
    if 'BEV200' in tech:
        return 'BEV (200-mile AER)'
    if 'BEV300' in tech:
        return 'BEV (300-mile AER)'
    if 'BEV400' in tech:
        return 'BEV (400-mile AER)'
    if 'FC' in tech:
        return 'Fuel-cell electric'
    if 'HEV' in tech:
        return 'Hybrid'
    if 'H2' in tech:
        return 'Hydrogen'
    if 'BEV_CHRG' in tech:
        return 'LD BEV charger'
    if 'CHRG' in tech:
        return 'Other charger'
    for prefix, carrier in carrier_mapping.items():
        if prefix in tech:
            return carrier 
    return 'Other'

In [89]:
def import_db_capacity_data(db_variant='lifetimes', scenario='trn_lifetimes', path='C:/Users/rashi/ESM_databases/temoa/data_files/'):
    db_file = path + f'canoe_on_12d_{db_variant}.sqlite'
    conn = sqlite3.connect(db_file)

    # filter db tables
    query_net_capacity = f"SELECT * FROM OutputNetCapacity WHERE sector = 'Transport' AND scenario = '{scenario}'"
    query_built_capacity = f"SELECT * FROM OutputBuiltCapacity WHERE sector = 'Transport' AND scenario = '{scenario}'"
    query_existing_capacity = "SELECT * FROM ExistingCapacity WHERE tech like 'T_%'"

    net_cap = pd.read_sql_query(query_net_capacity, conn).drop(columns=['region', 'sector'])
    new_cap = pd.read_sql_query(query_built_capacity, conn).drop(columns=['region', 'sector'])
    ex_cap =  pd.read_sql_query(query_existing_capacity, conn)[['tech', 'vintage', 'capacity', 'units']]
    conn.close()

    # label mode fuel and lifetime classes
    net_cap['mode'] = net_cap['tech'].apply(map_tech_to_mode)
    new_cap['mode'] = new_cap['tech'].apply(map_tech_to_mode)
    ex_cap['mode'] = ex_cap['tech'].apply(map_tech_to_mode)

    net_cap['fuel'] = net_cap['tech'].apply(map_tech_to_fuel)
    new_cap['fuel'] = new_cap['tech'].apply(map_tech_to_fuel)
    ex_cap['fuel'] = ex_cap['tech'].apply(map_tech_to_fuel)

    # extract the last two digits (if present), else NaN - fill NaN with 50 to denote median parent class
    percentile = net_cap['tech'].str.extract(r'_S(\d{2})$')[0]
    net_cap['life'] = 'Survival P' + percentile.fillna('50')
    percentile = new_cap['tech'].str.extract(r'_S(\d{2})$')[0]
    new_cap['life'] = 'Survival P' + percentile.fillna('50')
    percentile = ex_cap['tech'].str.extract(r'_S(\d{2})$')[0]
    ex_cap['life'] = 'Survival P' + percentile.fillna('50')

    # prepare capacity dfs
    net_cap_group = net_cap.groupby(['mode', 'fuel', 'life', 'period'], as_index=False).sum('capacity').drop(columns='vintage').rename(columns={'period': 'vintage'})
    ex_cap_group = ex_cap.copy()
    ex_cap_group['vintage'] = 2021
    ex_cap_group = ex_cap_group.groupby(['mode', 'fuel', 'life', 'vintage', 'units'], as_index=False).sum('capacity')
    new_cap_group = new_cap.groupby(['mode', 'fuel', 'life', 'vintage'], as_index=False).sum('capacity')

    # brand capacity types and merge, filling empty values with 0
    net_cap_group = net_cap_group[['mode', 'fuel', 'life', 'vintage', 'capacity']].rename(columns={'capacity': 'net capacity'})
    new_cap_group = new_cap_group[['mode', 'fuel', 'life', 'vintage', 'capacity']].rename(columns={'capacity': 'new capacity'})
    ex_cap_group = ex_cap_group[['mode', 'fuel', 'life', 'vintage', 'capacity']].rename(columns={'capacity': 'ex capacity'})
    merged_cap = net_cap_group.merge(new_cap_group, on=['mode', 'fuel', 'life', 'vintage'], how='left').merge(ex_cap_group, on=['mode', 'fuel', 'life', 'vintage'], how='left').fillna(0.)

    # sort and calculate retired capacity
    merged_cap = merged_cap.sort_values(by=['mode', 'fuel', 'life', 'vintage'])
    merged_cap['retired cap'] = 0.

    for _, group in merged_cap.groupby(['mode', 'fuel', 'life']):
        for i in range(1, len(group)):
            current_idx = group.index[i]
            previous_idx = group.index[i-1]

            # calculate retired capacity as netcap_i + newcap_i - netcap_{i-1}, where excap_i = netcap_{i-1}
            merged_cap.loc[current_idx, 'retired cap'] = (
                merged_cap.loc[previous_idx, 'net capacity'] + 
                merged_cap.loc[current_idx, 'new capacity'] - 
                merged_cap.loc[current_idx, 'net capacity']
            )

    for _, group in merged_cap.groupby(['mode', 'fuel', 'life']):
        for i in range(1, len(group)):
            current_idx = group.index[i]
            previous_idx = group.index[i-1]

            # fill the "ex capacity" for years after 2021 using the "net capacity" from the previous year
            merged_cap.loc[current_idx, 'ex capacity'] = merged_cap.loc[previous_idx, 'net capacity']

    merged_cap['retired cap'] = -merged_cap['retired cap']  # negative values for retired capacity
    merged_cap = merged_cap.round(2)
    return merged_cap

In [90]:
import_db_capacity_data()

Unnamed: 0,mode,fuel,life,vintage,net capacity,new capacity,ex capacity,retired cap
0,Air jet,Other,Survival P50,2021,70.23,1.39,67.90,-0.00
1,Air jet,Other,Survival P50,2021,70.23,1.39,70.23,-1.39
2,Air jet,Other,Survival P50,2025,80.89,10.66,70.23,-0.00
3,Air jet,Other,Survival P50,2030,89.21,19.57,80.89,-11.25
4,Air jet,Other,Survival P50,2035,97.07,18.38,89.21,-10.52
...,...,...,...,...,...,...,...,...
688,Rail,Liquified NG,Survival P50,2030,0.00,0.00,0.00,-0.00
689,Rail,Liquified NG,Survival P50,2035,0.00,0.00,0.00,-0.00
690,Rail,Liquified NG,Survival P50,2040,0.00,0.00,0.00,-0.00
691,Rail,Liquified NG,Survival P50,2045,0.00,0.00,0.00,-0.00


In [91]:
color_fuel_map = {
    'Gasoline': 'red',
    'Diesel': 'brown',
    'Compressed NG': 'tomato',
    'Hybrid': 'darkorange',
    'PHEV (35-mile AER)': 'dodgerblue',
    'PHEV (50-mile AER)': 'darkblue',
    'BEV (150-mile AER)': 'limegreen',
    'BEV (200-mile AER)': 'seagreen',
    'BEV (300-mile AER)': 'olive',
    'BEV (400-mile AER)': 'darkgreen',
    'Plug-in hybrid': 'blue',
    'Battery electric': 'green',
    'Fuel-cell electric': 'mediumvioletred'
}

In [92]:
def plot_capacity_flow(db_variant='lifetimes', scenario='trn_lifetimes', lifetime_classes=None, scenario_title=None,
                        modes=['LD Car', 'LD Truck', 'MD Truck', 'HD Truck'], color_map=color_fuel_map):
    df = import_db_capacity_data(db_variant=db_variant, scenario=scenario)
    df = df.melt(id_vars=['mode', 'fuel', 'life', 'vintage'],
                       value_vars=['net capacity', 'ex capacity', 'new capacity', 'retired cap'],
                       var_name='cap type', value_name='capacity')

    # df['capacity'] = df['capacity'] / 1E3     # convert to million units
    df['vintage'] = df['vintage'].astype('str')
    df_filtered = df[(abs(df['capacity']) > 1e-3) & (df['mode'].isin(modes)) & (df['cap type'] != 'ex capacity')].reset_index(drop=True)
    if lifetime_classes is None:
        df_filtered = df_filtered.groupby(['mode', 'fuel', 'vintage', 'cap type'], as_index=False).sum('capacity')
    else:
        df_filtered = df_filtered.groupby(['mode', 'life', 'vintage', 'cap type'], as_index=False).sum('capacity')

    
    fig = px.bar(df_filtered, x='cap type', y='capacity', color='fuel' if lifetime_classes is None else 'life', 
            pattern_shape='cap type', pattern_shape_sequence=["", ".", "/"],
            facet_col='vintage', facet_col_spacing=2E-2, 
            facet_row='mode', facet_row_spacing=1.5E-2,
            category_orders={"cap type": ["net capacity", "new capacity", "retired cap"],
                             'vintage': sorted(df['vintage'].unique()),
                             "fuel": list(color_map.keys()) if lifetime_classes is None else [],
                             'life': sorted(df['life'].unique()) if lifetime_classes is not None else [],
                             'mode': modes},
            template='plotly_white', orientation='v', 
            color_discrete_map = color_map if lifetime_classes is None else {},
            color_discrete_sequence=px.colors.qualitative.Bold if lifetime_classes is not None else [],
            text_auto='.2s', width=1200, height=900
            )

    fig.update_layout(
        margin=dict(
            t=65, b=30),
        title=dict(
            text=f'<b>Vehicle fleet stock and flow in ON by vehicle class ({scenario_title})</b>',
            x=0.5, y=0.98, xanchor='center', yanchor='top'),
        yaxis_title_standoff=0,
        legend_title=dict(
            text='<b>Fuel/powertrain type</b>' if lifetime_classes is None else '<b>Lifetime class</b>', 
            font=dict(size=16)),
        bargap=0.1,
        legend=dict(
            traceorder='grouped', orientation='v', yanchor='top', y=0.8, xanchor='center', x=1.15),
        font=dict(
            size=15)
        )

    fig.for_each_trace(lambda trace: trace.update(textfont=dict(size=11)))
    fig.for_each_xaxis(lambda axis: axis.update(title_text='', showticklabels=False))
    fig.for_each_yaxis(lambda axis: axis.update(title_text=''))

    unique_xdomains = sorted(set(fig.layout[axis].domain[0] for axis in fig.layout if axis.startswith('yaxis')))
    n_vintages = df['vintage'].nunique()
    for row_i, _ in enumerate(unique_xdomains, start=1):
        anchor_i = (row_i - 1) * n_vintages + 1   # left-most col in that row
        match_anchor = 'y' if row_i == 1 else f'y{anchor_i}'
        fig.update_yaxes(matches=match_anchor, row=row_i, col=None, nticks=8, zeroline=True, zerolinecolor='black', tickformat='~s')
    
    for annotation in fig.layout.annotations:           # Fix facet cols and facet row annotations
        if 'mode' in annotation.text:
            annotation.text = f'<b>{annotation.text.split('=')[1]}</b>'
            annotation.font.size = 16
            annotation.x = 1.01
            annotation.xanchor = 'center'
        else:
            annotation.text = f'<b>{annotation.text.split('=')[1]}</b>'
            annotation.font.size = 16
            annotation.y = 0
            annotation.yanchor = 'top' 

    shown_legends = set()
    for trace in fig.data:                              # Show only one legend for each fuel type
        trace.name = trace.name.split(",")[0]
        if trace.name not in shown_legends:
            trace.showlegend = True
            shown_legends.add(trace.name)
        else:
            trace.showlegend = False
    
    # enforce fuel/life order in legend
    if lifetime_classes is None:
        _order = list(color_map.keys())
    else: 
        _order = sorted(df['life'].unique())
    fig.data = tuple(
        sorted(fig.data, key=lambda t: (_order.index(t.name) if t.name in _order else len(_order)))
    )

    # add capacity type legends separated by a blank entry
    fig.add_trace(go.Bar(
        x=[None], y=[None],
        showlegend=True,
        legendgroup='Blank',
        legendgrouptitle=None,
        name='',
        marker_color='rgba(0,0,0,0)'
    ))

    for i, (name, pat) in enumerate([
        ("Net capacity",   ""),
        ("New capacity",   "."),
        ("Retired capacity", "/"),
    ]):
        fig.add_trace(go.Bar(
            x=[None], y=[None],
            name=name,
            showlegend=True,
            legendgroup='Capacity type',
            legendgrouptitle=dict(text="<b>Capacity type</b>", font=dict(size=16)) if i == 0 else None,
            marker=dict(
                color='rgba(0,0,0,0)',
                line=dict(color='black', width=1),
                pattern_shape=pat
            )
        ))

    fig.add_annotation(
        text='<b>Fleet capacity (k vehicles)</b>',
        x=-0.08, y=0.5, xref="paper", yref="paper", showarrow=False, textangle=-90, font=dict(size=17))
        
    fig.show()

In [117]:
def plot_capacity_flow(db_variant='lifetimes', scenario='trn_lifetimes', lifetime_classes=None, scenario_title=None,
                        modes=['LD Car', 'LD Truck', 'MD Truck', 'HD Truck'],
                        cap_types=['net capacity', 'new capacity', 'retired cap'],
                        color_map=color_fuel_map):
    df = import_db_capacity_data(db_variant=db_variant, scenario=scenario)
    df = df.melt(id_vars=['mode', 'fuel', 'life', 'vintage'],
                       value_vars=['net capacity', 'ex capacity', 'new capacity', 'retired cap'],
                       var_name='cap type', value_name='capacity')

    df['capacity'] = df['capacity'] * 1E3     # convert to normal units
    df['vintage'] = df['vintage'].astype('str')
    df_filtered = df[(abs(df['capacity']) > 1e-3) & (df['mode'].isin(modes)) & (df['cap type'].isin(cap_types))].reset_index(drop=True)
    df_filtered = df_filtered[df_filtered['vintage'] != '2021'].copy()
    if lifetime_classes is None:
        df_filtered = df_filtered.groupby(['mode', 'fuel', 'vintage', 'cap type'], as_index=False).sum('capacity')
    else:
        df_filtered = df_filtered.groupby(['mode', 'life', 'vintage', 'cap type'], as_index=False).sum('capacity')

    
    fig = px.bar(df_filtered, x='cap type', y='capacity', color='fuel' if lifetime_classes is None else 'life', 
            pattern_shape='cap type', 
            pattern_shape_sequence=["", ".", "/"] if len(cap_types) == 3 else ["", "/"],
            facet_col='vintage', facet_col_spacing=2E-2, 
            facet_row='mode', facet_row_spacing=0.04,
            category_orders={"cap type": cap_types,
                             'vintage': sorted(df_filtered['vintage'].unique()),
                             "fuel": list(color_map.keys()) if lifetime_classes is None else [],
                             'life': sorted(df_filtered['life'].unique()) if lifetime_classes is not None else [],
                             'mode': modes},
            template='plotly_white', orientation='v', 
            color_discrete_map = color_map if lifetime_classes is None else {},
            color_discrete_sequence=px.colors.qualitative.Bold if lifetime_classes is not None else [],
            text_auto='.2s', width=1200, height=450
            )

    fig.update_layout(
        margin=dict(
            t=65, b=30),
        title=dict(
            text=f'<b>Vehicle fleet stock and flow in ON by vehicle class ({scenario_title})</b>',
            x=0.08, y=0.94, xanchor='left', yanchor='top'),
        yaxis_title_standoff=0,
        legend_title=dict(
            text='<b>Fuel/powertrain type</b>' if lifetime_classes is None else '<b>Lifetime class</b>', 
            font=dict(size=16)),
        bargap=0.1,
        legend=dict(
            traceorder='grouped', orientation='v', yanchor='top', y=1.0, xanchor='center', x=1.15),
        font=dict(
            size=15)
        )

    fig.for_each_trace(lambda trace: trace.update(textfont=dict(size=11)))
    fig.for_each_xaxis(lambda axis: axis.update(title_text='', showticklabels=False))
    fig.for_each_yaxis(lambda axis: axis.update(title_text=''))

    unique_xdomains = sorted(set(fig.layout[axis].domain[0] for axis in fig.layout if axis.startswith('yaxis')))
    n_vintages = df['vintage'].nunique()
    for row_i, _ in enumerate(unique_xdomains, start=1):
        anchor_i = (row_i - 1) * n_vintages + 1   # left-most col in that row
        match_anchor = 'y' if row_i == 1 else f'y{anchor_i}'
        fig.update_yaxes(matches=match_anchor, row=row_i, col=None, nticks=8, zeroline=True, zerolinecolor='black', tickformat='~s')
    
    for annotation in fig.layout.annotations:           # Fix facet cols and facet row annotations
        if 'mode' in annotation.text:
            annotation.text = f'<b>{annotation.text.split('=')[1]}</b>'
            annotation.font.size = 16
            annotation.x = 1.01
            annotation.xanchor = 'center'
        else:
            annotation.text = f'<b>{annotation.text.split('=')[1]}</b>'
            annotation.font.size = 16
            annotation.y = 0
            annotation.yanchor = 'top' 

    shown_legends = set()
    for trace in fig.data:                              # Show only one legend for each fuel type
        trace.name = trace.name.split(",")[0]
        if trace.name not in shown_legends:
            trace.showlegend = True
            shown_legends.add(trace.name)
        else:
            trace.showlegend = False
    
    # enforce fuel/life order in legend
    if lifetime_classes is None:
        _order = list(color_map.keys())
    else: 
        _order = sorted(df['life'].unique())
    fig.data = tuple(
        sorted(fig.data, key=lambda t: (_order.index(t.name) if t.name in _order else len(_order)))
    )

    # add capacity type legends separated by a blank entry
    fig.add_trace(go.Bar(
        x=[None], y=[None],
        showlegend=True,
        legendgroup='Blank',
        legendgrouptitle=None,
        name='',
        marker_color='rgba(0,0,0,0)'
    ))

    for i, (name, pat) in enumerate([
        # ("Net capacity",   ""),
        ("New capacity",   ""),
        ("Retired capacity", "/"),
    ]):
        fig.add_trace(go.Bar(
            x=[None], y=[None],
            name=name,
            showlegend=True,
            legendgroup='Capacity type',
            legendgrouptitle=dict(text="<b>Capacity type</b>", font=dict(size=16)) if i == 0 else None,
            marker=dict(
                color='rgba(0,0,0,0)',
                line=dict(color='black', width=1),
                pattern_shape=pat
            )
        ))

    fig.add_annotation(
        text='<b>Fleet capacity (vehicle units)</b>',
        x=-0.08, y=0.5, xref="paper", yref="paper", showarrow=False, textangle=-90, font=dict(size=17))

    fig.show()
    fig.write_image("vanilla4.svg", scale=1, engine='kaleido')

In [118]:
fig = plot_capacity_flow(db_variant='vanilla4', 
                   scenario='vanilla4_cftnorm',
                   modes=['LD Truck', 'HD Truck'],
                   cap_types=['new capacity', 'retired cap'], 
                #    lifetime_classes='yes',
                   scenario_title='vanilla model')

In [119]:
plot_capacity_flow(db_variant='life_3', 
                   scenario='life_3',
                   modes=['LD Truck', 'HD Truck'],
                   cap_types=['new capacity', 'retired cap'], 
                #    lifetime_classes='yes',
                   scenario_title='3 lifetime classes')

In [116]:
plot_capacity_flow(db_variant='life_7', 
                   scenario='life_7',
                   modes=['LD Truck', 'HD Truck'],
                   cap_types=['new capacity', 'retired cap'], 
                #    lifetime_classes='yes',
                   scenario_title='7 lifetime classes')