In [1]:
import os
import numpy as np
import pandas as pd

In [2]:
raw_data_path = r'../../../../data/raw/freight_emission/'
processed_data_path = r'../../../../data/intermediate/test/freight_emission/'

In [3]:
filenames = list(filter(lambda x: '.XLSX' in x, os.listdir(raw_data_path)))
filenames

# Check the consistency of EF

In [32]:
def check_and_fill_warm_emission_factors(df, vehicle_type='eBike', field_name='V_100%'):

    sub_df = df[df['Subsegment'] == vehicle_type]

    full_ef = df['Component'].unique()
    full_traffic_sit = df['TrafficSit'].unique()

    full_fields_list = df.columns.tolist()
    idx_V = full_fields_list.index(field_name)
   
    post_fields_list = full_fields_list[idx_V:]

    if sub_df.empty:
        raise ValueError(f"No records found for vehicle type '{vehicle_type}'")
    sample_eBike_record = sub_df.iloc[0]

    # create new records
    new_records = []
    for ef in full_ef:
        avg_ef_value = sub_df[sub_df['Component'] == ef][post_fields_list[1]].mean()
        print(f"Average value for {ef} is {avg_ef_value}")
        for ts in full_traffic_sit:
            if not ((sub_df['Component'] == ef) & (sub_df['TrafficSit'] == ts)).any():
                
                new_record = sample_eBike_record.to_dict()
                new_record['Component'] = ef
                new_record['TrafficSit'] = ts
                for field in post_fields_list:
                    # TODO: This is a hack, need to find a better way to fill the values; maybe use the average of the values
                    if field == 'EFA':
                        new_record[field] = avg_ef_value
                    else:
                        new_record[field] = 0
                new_records.append(new_record)

    # Add new records to the dataframe
    if new_records:
        new_records_df = pd.DataFrame(new_records)
        df = pd.concat([df, new_records_df], ignore_index=True)

    return df


In [5]:
def check_and_fill_cold_emission_factors(df, vehicle_type='LCV petrol N1-III Euro-6c', field_name='%OfSubsegment'):

    sub_df = df[df['Subsegment'] == vehicle_type]

    full_ef = df['Component'].unique()
    full_traffic_sit = df['AmbientCondPattern'].unique()

    full_fields_list = df.columns.tolist()
    idx_V = full_fields_list.index(field_name)
    post_fields_list = full_fields_list[idx_V:]

    if sub_df.empty:
        raise ValueError(f"No records found for vehicle type '{vehicle_type}'")
    sample_eBike_record = sub_df.iloc[0]

    # create new records
    new_records = []
    for ef in full_ef:
        avg_ef_value = sub_df[sub_df['Component'] == ef][field_name].mean()
        for ts in full_traffic_sit:
            if not ((sub_df['Component'] == ef) & (sub_df['AmbientCondPattern'] == ts)).any():
                
                new_record = sample_eBike_record.to_dict()
                new_record['Component'] = ef
                new_record['AmbientCondPattern'] = ts
                for field in post_fields_list:
                    new_record[field] = avg_ef_value
                new_records.append(new_record)

    # Add new records to the dataframe
    if new_records:
        new_records_df = pd.DataFrame(new_records)
        df = pd.concat([df, new_records_df], ignore_index=True)

    return df

In [42]:
def check_and_fill_cold_emission_factorsV2(df, vehicle_type='LCV petrol N1-III Euro-6c', field_name='%OfSubsegment'):

    sub_df = df[df['Subsegment'] == vehicle_type]

    full_ef = df['Component'].unique()
    full_traffic_sit = df['AmbientCondPattern'].unique()

    full_fields_list = df.columns.tolist()
    idx_V = full_fields_list.index(field_name)
    post_fields_list = full_fields_list[idx_V:]

    if sub_df.empty:
        raise ValueError(f"No records found for vehicle type '{vehicle_type}'")
    sample_eBike_record = sub_df.iloc[0]

    # create new records
    new_records = []
    for ef in full_ef:
        # avg_ef_value = sub_df[sub_df['Component'] == ef][field_name].mean()
        for ts in full_traffic_sit:
            if not ((sub_df['Component'] == ef) & (sub_df['AmbientCondPattern'] == ts)).any():
                print(f"adding missing record for {ef} and {ts}")
                new_record = sample_eBike_record.to_dict()
                new_record['Component'] = ef
                new_record['AmbientCondPattern'] = ts
                for field in post_fields_list:
                    new_record[field] = 0
                new_records.append(new_record)
            

    # Add new records to the dataframe
    if new_records:
        new_records_df = pd.DataFrame(new_records)
        df = pd.concat([df, new_records_df], ignore_index=True)

    # Add records with lack of hours
    new_records = []
    sub_df = df[df['Subsegment'] == vehicle_type]
    lack_hours = [6, 7, 8, 9, 10, 11, 12]
    for ef in full_ef:
        ## Sample records
        distance_01 = sub_df[sub_df['AmbientCondPattern'].str.contains('4-5h,0-1km') & (sub_df['Component'] == ef)]
        assert len(distance_01) == 1
        distance_12 = sub_df[sub_df['AmbientCondPattern'].str.contains('4-5h,1-2km') & (sub_df['Component'] == ef)]
        assert len(distance_12) == 1
        sample_prefix = sub_df.iloc[0]['AmbientCondPattern'].split(',')[0]
        for h in lack_hours:
            if not ((sub_df['Component'] == ef) & (sub_df['AmbientCondPattern'] == f'{sample_prefix},{h-1}-{h}h,0-1km')).any():
                new_record = distance_01.iloc[0].to_dict()
                new_record['AmbientCondPattern'] = f'{sample_prefix},{h-1}-{h}h,0-1km'
                new_records.append(new_record)
            if not ((sub_df['Component'] == ef) & (sub_df['AmbientCondPattern'] == f'{sample_prefix},{h-1}-{h}h,1-2km')).any():
                new_record = distance_12.iloc[0].to_dict()
                new_record['AmbientCondPattern'] = f'{sample_prefix},{h-1}-{h}h,1-2km'
                new_records.append(new_record)
    if new_records:
        new_records_df = pd.DataFrame(new_records)
        df = pd.concat([df, new_records_df], ignore_index=True)
    return df

## Convert the sizeClasse of eBike into NA

In [7]:
def convert_eBike_sizeClass(row: pd.Series):
    if row['SizeClasse'] == 'not specified' and row['Subsegment'] == 'eScooter':
        return 'NA' 
    else:
        return row['SizeClasse']       

# Replace the value of EFA for CO2e

In [8]:
def replace_efa4co2e(row: pd.Series):
    if row['Component'] == 'CO2e':
        return row['EFA_WTW']
    else:
        return row['EFA']

# Output

In [None]:
for filename in filenames:
    df = pd.read_excel(raw_data_path + filename)
    # Remove rows with error region/country info
    df = df.query("AmbientCondPattern != 'ØGermany'")
    if df.empty:
        pass
    else:
        print(f"Processing file {filename}")
        # Fill missing emission factors
        if 'Cold' in filename and 'MC' in filename:
            df = check_and_fill_cold_emission_factors(df, vehicle_type='eBike')
        elif 'Cold' in filename and 'LCV' in filename:
            df = check_and_fill_cold_emission_factors(df, vehicle_type='LCV petrol N1-III Euro-6c')
        elif 'HOT' in filename and 'MC' in filename:
            df = check_and_fill_warm_emission_factors(df, vehicle_type='eBike')
        elif 'HOT' in filename and 'LCV' in filename:
            df = check_and_fill_warm_emission_factors(df, vehicle_type='LCV petrol N1-III Euro-6c')
        else:
            # Average table
            pass
        
    if 'average' not in filename:
        # replace the value of 'EFA' with 'EFA_WTW' for emission factor - CO2e
        df['EFA'] = df.apply(lambda x: replace_efa4co2e(x), axis=1)

    if 'MC' in filename and not df.query("Subsegment == 'eBike'").empty:
        # Convert the SizeClasse for eBike
        df['SizeClasse'] = df.apply(lambda x: convert_eBike_sizeClass(x), axis=1)

    # check the consistency of the emission factors, and then fill the missing factors.
    df.to_csv(processed_data_path + filename[:-5] + "V2.csv.gz", sep=';', index=False, compression='gzip', encoding='utf-8-sig')
    print(f"File {filename} processed and saved to {processed_data_path + filename[:-5] + 'V2.csv.gz'}")




In [None]:
for filename in filenames:
    df = pd.read_excel(raw_data_path + filename)
    # Remove rows with error region/country info
    df = df.query("AmbientCondPattern != 'ØGermany'")
    if df.empty:
        pass
    else:
        print(f"Processing file {filename}")
        # Fill missing emission factors
        if 'Cold' in filename and 'MC' in filename:
            df = check_and_fill_cold_emission_factors(df, vehicle_type='eBike')
        elif 'Cold' in filename and 'LCV' in filename:
            df = check_and_fill_cold_emission_factors(df, vehicle_type='LCV petrol N1-III Euro-6c')
        elif 'HOT' in filename and 'MC' in filename:
            df = check_and_fill_warm_emission_factors(df, vehicle_type='eBike')
        elif 'HOT' in filename and 'LCV' in filename:
            df = check_and_fill_warm_emission_factors(df, vehicle_type='LCV petrol N1-III Euro-6c')
        else:
            # Average table
            pass
        
    if 'average' not in filename:
        # replace the value of 'EFA' with 'EFA_WTW' for emission factor - CO2e
        df['EFA'] = df.apply(lambda x: replace_efa4co2e(x), axis=1)

    if 'MC' in filename and not df.query("Subsegment == 'eBike'").empty:
        # Convert the SizeClasse for eBike
        df['SizeClasse'] = df.apply(lambda x: convert_eBike_sizeClass(x), axis=1)

    # check the consistency of the emission factors, and then fill the missing factors.
    df.to_csv(processed_data_path + filename[:-5] + "V2.csv.gz", sep=';', index=False, compression='gzip', encoding='utf-8-sig')
    print(f"File {filename} processed and saved to {processed_data_path + filename[:-5] + 'V2.csv.gz'}")


# Output V2

In [34]:
filenames = list(filter(lambda x: '.XLSX' in x and 'Cold' in x and 'detailed' in x, os.listdir(raw_data_path)))
filenames

In [44]:
''' Add lack hours for cold emission factors '''
for filename in filenames:
    df = pd.read_excel(raw_data_path + filename)
    # Remove rows with error region/country info
    df = df.query("AmbientCondPattern != 'ØGermany'")
    if df.empty:
        pass
    else:
        print(f"Processing file {filename}")
        # Fill missing emission factors
        if 'Cold' in filename and 'MC' in filename:
            df = check_and_fill_cold_emission_factorsV2(df, vehicle_type='eScooter')
        elif 'Cold' in filename and 'LCV' in filename:
            df = check_and_fill_cold_emission_factorsV2(df, vehicle_type='LCV petrol N1-III Euro-6c')
            # df = check_and_fill_cold_emission_factorsV2(df, vehicle_type='LCV BEV N1-IIII')  # This is a new added vehicle type for EV
        elif 'HOT' in filename and 'MC' in filename:
            df = check_and_fill_warm_emission_factors(df, vehicle_type='eScooter')
        elif 'HOT' in filename and 'LCV' in filename:
            df = check_and_fill_warm_emission_factors(df, vehicle_type='LCV petrol N1-III Euro-6c')
            # df = check_and_fill_warm_emission_factors(df, vehicle_type='LCV BEV N1-IIII')  # This is a new added vehicle type for EV
        else:
            # Average table
            pass
        
    if 'average' not in filename:
        # replace the value of 'EFA' with 'EFA_WTW' for emission factor - CO2e
        df['EFA'] = df.apply(lambda x: replace_efa4co2e(x), axis=1)

    if 'MC' in filename and not df.query("Subsegment == 'eScooter'").empty:
        # Convert the SizeClasse for eBike
        df['SizeClasse'] = df.apply(lambda x: convert_eBike_sizeClass(x), axis=1)

    # check the consistency of the emission factors, and then fill the missing factors.
    df.to_csv(processed_data_path + filename[:-5] + "V4.csv.gz", sep=';', index=False, compression='gzip', encoding='utf-8-sig')
    print(f"File {filename} processed and saved to {processed_data_path + filename[:-5] + 'V4.csv.gz'}")



# Test

In [8]:
test_data = pd.read_excel(raw_data_path + filenames[-2])
test_data

In [9]:
test_data.query("AmbientCondPattern != 'ØGermany'")

In [None]:
test_data.query("Subsegment == 'LCV petrol N1-III Euro-6c' and TrafficSit == 'URB/Access/30/Heavy'")

In [None]:
test_data_ = check_and_fill_warm_emission_factors(test_data, vehicle_type='LCV petrol N1-III Euro-6c')
test_data_.query("Subsegment == 'LCV petrol N1-III Euro-6c' and TrafficSit == 'URB/Access/30/Heavy'")

In [None]:
test_data_.query("Subsegment == 'LCV petrol N1-III Euro-6c' and TrafficSit == 'URB/Access/30/Heavy'").iloc[-1, :]

# TestV2

In [12]:
input_path = r'../../../../data/intermediate/test/freightEmissions/'

In [24]:
lcv_warm_processed = pd.read_csv(input_path + 'EFA_HOT_Subsegm_LCV2025detailed.csv.gz', sep=';', compression='gzip')
lcv_warm_processed



In [13]:
lcv_cold_processed = pd.read_csv(input_path + 'EFA_ColdStart_Subsegm_LCV2025detailed.csv.gz', sep=';')
lcv_cold_processed

In [20]:
v1_data = check_and_fill_cold_emission_factors(lcv_cold_processed)
v1_data

In [21]:
v2_data = check_and_fill_cold_emission_factorsV2(lcv_cold_processed)
v2_data

In [51]:
lcv_cold_processed[lcv_cold_processed['AmbientCondPattern'].str.contains('5-6h,0-1km') & (lcv_cold_processed['Component'] == 'CO2e')]

In [44]:
lcv_cold_processed.iloc[0]['AmbientCondPattern'].split(',')[0]

# Test V3 for EV

In [23]:
lcv_cold_processed['Technology'].unique()

In [44]:
lcv_cold_processed['Component'].unique()

## Warm

In [25]:
lcv_warm_processed['Technology'].unique()

In [45]:
lcv_warm_processed['Component'].unique()

In [46]:
set(lcv_cold_processed['Component'].unique()) - set(lcv_warm_processed['Component'].unique())

In [30]:
lcv_warm_processed

In [31]:
lcv_warm_processed[lcv_warm_processed['Subsegment'] == 'LCV BEV N1-IIII']

In [32]:
filled_lcv_warm = check_and_fill_warm_emission_factors(lcv_warm_processed, vehicle_type='LCV BEV N1-IIII')
filled_lcv_warm

In [41]:
filled_lcv_warm[filled_lcv_warm['Subsegment'] == 'LCV petrol N1-III Euro-6c'].query("Component == 'CO2(rep)'")

In [None]:
filled_lcv_warm[filled_lcv_warm['Subsegment'] == 'LCV BEV N1-IIII']['Component'].unique()

In [42]:
lcv_cold_processed[lcv_cold_processed['Subsegment'] == 'LCV petrol N1-III Euro-6c'].query("Component == 'CO2e'")

# Test V4

This test aims to check whether the average value of **newly-added records** which are missed in the raw table is added
particularly for the LCVs

In [22]:
test_warm_lcv = pd.read_excel(raw_data_path + 'EFA_HOT_Subsegm_LCV2025detailed.XLSX')
test_warm_lcv

In [23]:
test_warm_lcv[(test_warm_lcv['EmConcept']=='LCV P Euro-6') 
                   & (test_warm_lcv['Component']=='PM2.5')
                    & (test_warm_lcv['SizeClasse']=='N1-III')]

In [33]:
test_filled_df = check_and_fill_warm_emission_factors(test_warm_lcv, vehicle_type='LCV petrol N1-III Euro-6c')
test_filled_df



In [34]:
test_filled_df[(test_filled_df['EmConcept']=='LCV P Euro-6') 
                   & (test_filled_df['Component']=='PM2.5')
                    & (test_filled_df['SizeClasse']=='N1-III')]

In [35]:
test_cold_lcv = pd.read_excel(raw_data_path + 'EFA_ColdStart_Subsegm_LCV2025detailed.XLSX')
test_cold_lcv

In [39]:
test_cold_lcv[(test_cold_lcv['EmConcept']=='LCV P Euro-6') 
                   & (test_cold_lcv['Component']=='CO')
                    & (test_cold_lcv['SizeClasse']=='N1-III')]

In [43]:
test_cold_filled = check_and_fill_cold_emission_factorsV2(test_cold_lcv, vehicle_type='LCV petrol N1-III Euro-6c')
test_cold_filled

In [40]:
test_cold_filled[(test_cold_filled['EmConcept']=='LCV P Euro-6') 
                   & (test_cold_filled['Component']=='CO')
                    & (test_cold_filled['SizeClasse']=='N1-III')]

## LCV cold

In [14]:
lcv_cold_processed = pd.read_csv(os.path.join(processed_data_path, 'EFA_ColdStart_Subsegm_LCV2025detailedV4.csv.gz'), sep=';', compression='gzip')
lcv_cold_processed

In [20]:
lcv_cold_processed[(lcv_cold_processed['EmConcept']=='LCV P Euro-6') 
                #    & (lcv_cold_processed['Component']=='PM2.5')
                    & (lcv_cold_processed['SizeClasse']=='N1-III')]

## LCV warm

In [45]:
lcv_warm_processed = pd.read_csv(os.path.join(processed_data_path, 'EFA_HOT_Subsegm_LCV2025detailedV4.csv.gz'), sep=';', compression='gzip')
lcv_warm_processed



In [46]:
lcv_warm_processed[(lcv_warm_processed['EmConcept']=='LCV P Euro-6') 
                   & (lcv_warm_processed['Component']=='PM2.5')
                    & (lcv_warm_processed['SizeClasse']=='N1-III')]

## MC warm

In [3]:
mc_warm_v4 = pd.read_csv(os.path.join(processed_data_path, 'EFA_HOT_Subsegm_MC2025detailed.csv.gz'), sep=';', compression='gzip')
mc_warm_v4

In [18]:
mc_warm_v4[mc_warm_v4['Subsegment'] == 'eScooter']

In [4]:
mc_warm_v3 = pd.read_csv('/Volumes/External/gitProj/xanderPeng/data/intermediate/test/freightEmissions/EFA_HOT_Subsegm_MC2025detailed.csv.gz', sep=';', compression='gzip')
mc_warm_v3

In [19]:
mc_warm_v3[mc_warm_v3['Subsegment'] == 'eScooter']