In [0]:
import dlt
import unicodedata
from pyspark.sql.functions import (
    substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map, coalesce
)
import json


def get_country_code(country, TOP_DIR = "/Volumes/prd_mega/sboost4/vboost4"):
    """returns the 3 digit country code of a country"""

    AUXILIARY_DIR = f"{TOP_DIR}/Documents/input/Auxiliary"

    with open(f"{AUXILIARY_DIR}/countries.json") as f:
        country_codes = json.loads(f.read())
    country = [code for code in country_codes if code['name'].lower() == country.lower()]
    
    assert len(country) == 1, f"Country {country} not found"
    country_code = country[0]['code3'].lower()

    return country_code

def process_country(country):
    TOP_DIR = "/Volumes/prd_mega/sboost4/vboost4"
    INPUT_DIR = f"{TOP_DIR}/Documents/input/Countries"
    AUXILIARY_DIR = f"{TOP_DIR}/Documents/input/Auxiliary"
    WORKSPACE_DIR = f"{TOP_DIR}/Workspace"
    COUNTRY = country
    COUNTRY_MICRODATA_DIR = f'{WORKSPACE_DIR}/microdata_csv/{COUNTRY}'

    CSV_READ_OPTIONS = {
        "header": "true",
        "multiline": "true",
        "quote": '"',
        "escape": '"',
    }

    country_code = get_country_code(country, TOP_DIR)

    @dlt.table(name=f'{country_code}_boost_bronze')
    def boost_bronze():
        return (spark.read
            .format("csv")
            .options(**CSV_READ_OPTIONS)
            .option("inferSchema", "true")
            .load(f'{COUNTRY_MICRODATA_DIR}/Data.csv'))
        
    @dlt.table(name=f'{country_code}_boost_silver')
    def boost_silver():
        df = dlt.read(f'{country_code}_boost_bronze') 

        econ_categories = ['Capital expenditures','Goods and services','Subsidies','Social benefits','Interest on debt','Other grants and transfers','Other expenses']

        for old_name, new_name in rename_mappings.items():
            df = df.withColumnRenamed(old_name, new_name)

        columns_to_clean = [
            "Econ0", "Econ1", "Econ2", "Econ3", 
            "Econ4", "county", "FUND"
        ]
        for column in columns_to_clean:
            df = df.withColumn(column, coalesce(col(column).cast("string"), lit("")))

        # --- Global Filters ---
        # Filtering out unwanted values at the total expenditures level
        df = df.filter((col('Econ0') != '4 Liabilities') & (col('Econ1') != '32 Financial assets') & (col('Econ0') != '1 Revenues'))

        # used quite often, so to limit repetition
        not_dept = ~col('department').startswith('10401') 

        # --- Admin and Geo Data Adjustments ---
        # admin and geo data appear to be swapped
        df = df.withColumn(
            'admin0', when(col('county').startswith('00'), 'Central').otherwise('Regional')
        ).withColumn(
            'admin1', when(col('county').startswith('00'), 'Central Scope')
                    .otherwise(regexp_replace(col('county'), r'^\d+\s+', ''))
        ).withColumn(
            'admin2', regexp_replace(col('ministry'), r'^\d+\s+', '')
        )

        # --- Functional Classifications ---
        func_mapping = {
            "02": ["Defence",False],
            "03": ["Public order and safety",False],
            "04": ["Economic affairs",False],
            "05": ["Environmental protection",True],
            "06": ["Housing and community amenities",False],
            "07": ["Health",True],
            "08": ["Recreation, culture and religion",False],
            "09": ["Education",True],
            "10": ["Social protection",False]
        }
        # =SUM(SUMIFS(executed,Year,O$1,Econ0, "2 Expenses",Func1,"09 Education",Econ0,"<>4 Liabilities",Econ1,"<>32 Financial assets"))
        # Extra conditions
        # Econ0 startswith 2
        # Missing Conditions
        # 
        func_filter = None
        for key, value_list in func_mapping.items():
            # this is the general condition of each function mapping
            year_change = value_list[1]
            condition_value = value_list[0]
            base_condition = (col('Func1').startswith(key))
            condition_1 = base_condition & not_dept 
            condition_2 = base_condition & col('Econ0').startswith('2')

            if year_change == False:
                func_filter = func_filter.when(condition_1, condition_value) if not func_filter is None else when(condition_1, condition_value)
            else:
                func_filter = func_filter.when((col('year') == '2012') & condition_1, condition_value)
                func_filter = func_filter.when((col('year') != '2012') & condition_2, condition_value)
            
        func_filter = func_filter.otherwise("General public services") 
        df = df.withColumn("func", func_filter)

        # --- Sub-Functional Classifications ---
        df = df.withColumn(
            # --- nested when
            'func_sub', when((col("func") == "Public order and safety"), when(col('Func2').startswith('033'), "judiciary").otherwise("public safety"))
                .when(not_dept & (col('Func2').startswith('042')), 'agriculture')
                .when(col('Func2').startswith('045'), 'transport')
                .when(not_dept & (col('Func3').startswith('0451')), 'roads')
                .when(col('ministry').startswith('429'), 'air transport')
                .when(not_dept & (col('Func2').startswith('043')), 'energy')
                .when(col('ministry').startswith('418'), 'telecoms')
                .when(col('Func2').startswith('07 ') | col('Func2').startswith('074'), 'primary and secondary health')
                .when(col('Func2').startswith('073'), 'tertiary and quaternary health')
        )
        
        # --- Econ and sub econ reused filters ---
        pensions_filter = (col('Econ0').startswith('2')) & (col('Econ2').startswith('271'))
        social_assistance_filter = ((col('Func1').startswith('10')) & not_dept)
        allowances_filter = ((col('Econ2').startswith('211')) & not_dept)

        # --- Economic Classifications ---     
        df = df.withColumn(
            'econ', when((col('Econ1').startswith('21')) & (~col('Econ0').startswith('4')), 'Wage bill') # removed global condition
                .when(
                    (col('budget').startswith('4')) 
                    & not_dept 
                    & (~col('Econ1').startswith('21'))
                    , 'Capital expenditures')
                .when(
                    (col('Econ1').startswith('22')) 
                    & (col('budget').startswith('1'))
                    , 'Goods and services')
                .when(
                    col('Econ1').startswith('25'), 
                    'Subsidies')
                .when(
                    pensions_filter
                    | social_assistance_filter, 'Social benefits')
                .when(
                    (col('Econ1').startswith('24')) 
                    & not_dept
                    , 'Interest on debt')
                .when(
                        (col('Econ1').startswith('13') | col('Econ1').startswith('26')) & 
                        ~col('Func1').startswith('10') & 
                        col('budget').startswith('1') &  
                        not_dept,
                        'Other grants and transfers'
                    )
                .otherwise('Other expenses')
        )

        # --- Sub-Economic Classifications ---     
        df = df.withColumn(
            'econ_sub', when((col('econ') == 'Wage bill'), when(allowances_filter, 'allowances').otherwise('basic wages'))
                .when(pensions_filter, 'pensions')
                .when(not_dept & (col('Econ2').startswith('212')), 'social benefits (pension contributions)')
                .when((col('Econ3').startswith('2213')) | (col('Econ3').startswith('2218')), 'basic services')
                .when(col('Econ3').startswith('2215'), 'recurrent maintenance')
                .when(social_assistance_filter, 'social assistance')
        )

        # --- Foreign Classification ---
        df = df.withColumn(
            'is_foreign', (col('fund') == 'Foreign')
        )

        # --- Geo ---
        df = df.withColumn(
            'geo1', lower(col('admin1'))
        )

        return df

process_country('Liberia')