In [None]:
import pandas as pd
import yaml

# Function to read the YAML config file
def read_yaml(file_path):
    with open(file_path, 'r') as file:
        try:
            data = yaml.safe_load(file)
            return data
        except yaml.YAMLError as exc:
            print(f"Error reading YAML file: {exc}")
            return None

config = read_yaml('./config.yaml')

dataframes = {}
if config and 'sheets' in config:
    sheets_list = config['sheets']
    for sheet in sheets_list:
        dataframes[sheet] = pd.read_excel(
            './data/raw_data-SA Exercise.xlsx', sheet_name=sheet, dtype={'EF ID CO2': 'Int64', 'EF CO2': 'Int64'})

## Create mock SAP data

In [None]:
sap_df = pd.DataFrame({
    'SAP ID': [2, 5, 7, 27, 81, 45, 8, 12, 6, 24],
    'Supplier Name': ['Glassy Glass inc.', 'Glassy Glass inc.', 'Spice girls inc.','Spice girls inc.', 'Spice girls inc.', 'Spice girls inc.', 'Ship happens inc.', 'Ship happens inc.', 'Dumpster Divers inc.', 'Dumpster Divers inc.']
})


for sheet_name, df in dataframes.items():
    updated_df = df.merge(sap_df[['SAP ID', 'Supplier Name']], on='SAP ID', how='left')
    dataframes[sheet_name] = updated_df
    print(updated_df.to_string())

## Validate schema against config
**Required**: Activity Label | Activity Unit	| Scope	| Quantity | GHG Category

**Optional**: Global Region	| Country | EF CO2 | EF ID CO2	| Taxonomy	| Region | CO2AI Taxonomy	

**Custom Fields**: GBU	| Very Custom	Category	| SAP ID			

In [None]:
import pandera as pa
from pandera import Column, DataFrameSchema

# Function to create a pandera schema from the YAML config
def create_schema_from_config(config):
    schema_dict = {}
    for field, dtype in config['schema']['required_fields'].items():
        schema_dict[field] = Column(getattr(pa, dtype.capitalize()), nullable=False, coerce=True)
    for field, dtype in config['schema']['optional_fields'].items():
        schema_dict[field] = Column(getattr(pa, dtype.capitalize()), nullable=True, required=False)
    for field, dtype in config['schema']['custom_fields'].items():
        schema_dict[field] = Column(getattr(pa, dtype.capitalize()), nullable=True, required=False)
    schema = DataFrameSchema(schema_dict, strict=True)
    return schema

# Get Schema
schema = create_schema_from_config(config)

# Validate the DataFrame
for sheet_name, df in dataframes.items():
    try:
        dataframes[sheet_name] = schema.validate(df)
        print(f"{sheet_name} sheet is valid.")
    except pa.errors.SchemaError as e:
        print(f"{sheet_name} sheet is invalid.")
        print(e)


In [None]:
for sheet_name, df in dataframes.items():
  print(df.to_string())

### Check required fields for allowed values
GHG Categories:

**Scope 1**

- Direct emissions

**Scope 2**

- Energy and Electricity Consumption

**Scope 3**

- Purchased goods and services
- Capital goods
- Fuel and energy related activities
- Upstream transportation and distribution
- Waste generated in operations
- Business travel
- Employee commuting
- Upstream leased assets
- Downstream transportation and distribution
- Processing of sold products
- Use of sold products
- End of life treatment of sold products
- Downstream leased assets
- Franchises
- Investments

In [None]:
# Check to ensure Scope values are 1, 2, or 3

valid_scope_values = [1, 2, 3]

for sheet_name, df in dataframes.items():
  df["is_valid_scope"] = df["Scope"].isin(valid_scope_values)

  # Filter out invalid rows
  invalid_scope_rows = df[~df["is_valid_scope"]]
  if not invalid_scope_rows.empty:
      print("Scope Rows with invalid scope values:")
      print(invalid_scope_rows)
  else:
      print("All Scope Rows have valid scope values")

  df.drop(columns=["is_valid_scope"], inplace=True)

In [None]:
# Check to ensure scope value has valid corresponding category

allowed_ghg_categories = {1: ["Direct Emissions"], 2: ["Energy and Electricity Consumption"], 3: ["Purchased goods and services", "Capital goods", "Fuel and energy related activites", "Upstream transportation and distribution",
                                                                                                                          "Waste generated in operations", "Business travel", "Employee commuting", "Upstream leased assets", "Downstream transportation and distribution", "Processing of sold products", "Use of sold products", "End of life treatment of sold products", "Downstream leased assets", "Franchises", "Investments"]}
def ghg_category_check(row):
    scope = row["Scope"]
    ghg_category = row["GHG Category"]
    valid_categories = allowed_ghg_categories.get(scope)
    return ghg_category in valid_categories

# Apply the custom check function
for sheet_name, df in dataframes.items():
    df["valid"] = df.apply(ghg_category_check, axis=1)

    # Filter out invalid rows
    invalid_rows = df[~df["valid"]]
    if not invalid_rows.empty:
        print("Custom check validation errors:")
        print(invalid_rows)
    else:
        print("All rows passed the custom check validation")

    # Drop the auxiliary 'valid' column
    df.drop(columns=["valid"], inplace=True)


In [None]:
# Add default taxonomy values based on config

# default_taxonomies = config['default_taxonomies']
# 
# for sheet_name, df in dataframes.items():
#   df['Category'] = df['Category'].str.title()
#   df['Taxonomy'] = df['Category'].map(default_taxonomies)
#   dataframes[sheet_name] = df
#   print(dataframes[sheet_name].to_string())

### Check for valid units
If an invalid dimension exists then we need to update it to the valid dimension and update the quantity

Dimension	Unit
- CO₂ Emissions : kgCO2eq
- Currency : usd
- Weight: kg
- Distance : km
- Surface: m2
- Volume: m3
- Energy: kWh
- Time: day
- Person: individual
- Vehicle: vehicle
- Dimensionless: dimensionless
- Item: item

In [None]:
conversion_factors = {
    "g": {"to": "kg", "factor": 0.001},        # 1 gram is 0.001 kg
    "lbs": {"to": "kg", "factor": 0.453592},   # 1 lb is approximately 0.453592 kg
    "miles": {"to": "km", "factor": 1.60934}   # 1 mile is approximately 1.60934 km
}

def update_activity_unit(row):
    activity_unit = row["Activity Unit"]
    quantity = row["Quantity"]
    
    if activity_unit in conversion_factors:
        row["Activity Unit"] = conversion_factors[activity_unit]["to"]
        row["Quantity"] = quantity * conversion_factors[activity_unit]["factor"]
    
    return row

for sheet_name, df in dataframes.items():
    dataframes[sheet_name] = df.apply(update_activity_unit, axis=1)
    print(dataframes[sheet_name].to_string())

### Check country names
Ensure country names are a part of valid list -> https://co2-ai.gitbook.io/delivery-documentation/appendix/allowed-values/locations 

In [None]:
import pycountry

country_mapping = {
    "España": "Spain",
    "Russia": "Russia"
}

def normalize_country_name(country_name):
    try:
        country = pycountry.countries.lookup(country_name)
        return country.name.lower()
    except LookupError:
        if country_name in country_mapping:
            return country_mapping[country_name].lower()
        elif country_name not in country_mapping and country_name not in country_mapping.values():
            raise ValueError(f"Country name '{country_name}' is not in the mapping. Please add it to the mapping.")

for sheet_name, df in dataframes.items():
    try:
        df['Country'] = df['Country'].apply(normalize_country_name)
        dataframes[sheet_name] = df
        print(dataframes[sheet_name].to_string())
    except ValueError as e:
        print(e)

### Combine sheets into one
Add data source column and drop null columns

In [None]:
df_list = []

for sheet_name, df in dataframes.items():
    df["Data Source"] = sheet_name
    df_list.append(df)

combined_df = pd.concat(df_list, ignore_index=True)

df_cleaned = combined_df.dropna(axis=1, how='all')

print(df_cleaned.to_string())

### Drop fields based on config
  - If `use_preset_ef` is set to `false` then we will not use the `EF CO2 || EF ID CO2` fields

In [None]:
if config['use_preset_ef'] == False:
  preset_ef_cols = ['EF CO2', 'EF ID CO2', "CO2AI Taxonomy"]
  cols_to_drop = [col for col in preset_ef_cols if col in df.columns]
  if cols_to_drop:
    df_cleaned = df_cleaned.drop(columns=cols_to_drop)
elif config['use_preset_ef'] == True:
  df_cleaned['EF ID CO2'] = df_cleaned['EF ID CO2'].astype('Int64')
  df_to_validate = df_cleaned

### Final Validation
And create new excel spreadsheet that will be uploaded to platform

In [None]:
try:
    final_validated_df = schema.validate(df_to_validate)
    print("Final spreadsheet is valid.")
except pa.errors.SchemaError as e:
    print("Spreadsheet is invalid.")
    print(df.dtypes.to_string())
    print(e)

print(final_validated_df.to_string())

In [None]:
# Save as excel
combined_excel_path = "./data/combined_cleansed-SA Exercise.xlsx"

final_validated_df.to_excel(combined_excel_path, index=False)