In [1]:
# ---------------------------------- LIBRARY IMPORTS AND FUNCTION DEFINITIONS ----------------------------------
import pandas as pd
from pathlib import Path
import duckdb
import sys, os
from dotenv import load_dotenv
from urllib.parse import quote_plus

sys.path.append(os.path.abspath(".."))
from utils.utils import data_add_moer
import pyarrow as pa
import pyarrow.parquet as pq
import psycopg2
import shutil
import gc

load_dotenv()

user = quote_plus(os.getenv("CLIMATETRACE_USER"))
password = quote_plus(os.getenv("CLIMATETRACE_PASS"))
host = os.getenv("CLIMATETRACE_HOST")
port = os.getenv("CLIMATETRACE_PORT")
database = os.getenv("CLIMATETRACE_DB")

postgres_url = f"postgresql://{user}:{password}@{host}:{port}/{database}"


def split_or_move_parquet(input_file, output_dir, target_size_mb=40):
    """
    Splits a large Parquet file into ~target_size_mb chunks or moves it directly 
    to desdired folder if under the threshold.
    
    Args:
        input_file (str): Path to input parquet file.
        output_dir (str): Directory to save chunked (or moved) parquet files.
        target_size_mb (int): Approx size limit per chunk in MB.
    """
    input_file = Path(input_file)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    file_size_mb = input_file.stat().st_size / (1024 * 1024)

    # If file is already below threshold, just move it
    if file_size_mb <= target_size_mb:
        dest_file = output_dir / input_file.name
        shutil.move(str(input_file), dest_file)
        print(f"‚úÖ File {input_file.name} was {file_size_mb:.1f} MB, moved to {dest_file}")
        return

    print(f"‚ö° Splitting {input_file.name} ({file_size_mb:.1f} MB)...")

    # Load full Parquet into DataFrame
    df = pd.read_parquet(input_file)
    total_rows = len(df)

    # Estimate bytes per row
    test_sample = df.iloc[:min(10000, total_rows)]
    test_table = pa.Table.from_pandas(test_sample)
    pq.write_table(test_table, "temp.parquet")
    bytes_per_row = os.path.getsize("temp.parquet") / len(test_sample)
    os.remove("temp.parquet")

    # Rows per chunk
    target_bytes = target_size_mb * 1024 * 1024
    rows_per_chunk = max(1, int(target_bytes / bytes_per_row))

    # Split and write
    for i, start in enumerate(range(0, total_rows, rows_per_chunk)):
        end = min(start + rows_per_chunk, total_rows)
        chunk_df = df.iloc[start:end]
        chunk_table = pa.Table.from_pandas(chunk_df)
        output_path = output_dir / f"{input_file.stem}_chunk_{i+1}.parquet"
        pq.write_table(chunk_table, output_path)
        size_mb = output_path.stat().st_size / (1024 * 1024)
        print(f"  - Saved {output_path} ({size_mb:.1f} MB, rows {start}‚Äì{end})")

    # Delete original after chunking
    input_file.unlink()
    print(f"üóëÔ∏è Deleted original {input_file.name}")
    print("‚úÖ Splitting complete")



In [None]:
# delete archived data and move recent data into archive

In [None]:
"""
This script takes CSVs dropped into data/zzz_landing_zone,
routes them based on filename, and converts them into Parquet.
If the new parquet is larger than 45MB, it will be split into 
smaller chunks before routing to its destination folder.

‚ö†Ô∏è Only processes .csv files
"""

input_dir = Path("zzz_landing_zone")
output_base = Path("statistics")

routing_map = {
    "country_subsector_emissions_statistics": "country_subsector_emissions_statistics",
    "country_subsector_emissions_totals": "country_subsector_emissions_totals",
    "gadm_1_emissions_statistics": "gadm_1_emissions_statistics"
}

# Ensure output subfolders exist
for subfolder in routing_map.values():
    (output_base / subfolder).mkdir(parents=True, exist_ok=True)

# Process only CSVs
for csv_file in input_dir.glob("*.csv"):
    print(f"Converting {csv_file.name}...")

    # Convert CSV ‚Üí Parquet in landing zone
    df = pd.read_csv(csv_file)
    parquet_file = input_dir / csv_file.with_suffix(".parquet").name
    df.to_parquet(parquet_file, engine="pyarrow", index=False)
    print(f"‚úÖ Converted {csv_file.name} ‚Üí {parquet_file.name} in landing zone")

    # Delete original CSV
    csv_file.unlink()
    print(f"üóëÔ∏è Deleted original CSV: {csv_file.name}")

    # Route Parquet into correct stats subfolder (split if needed)
    destination = None
    for pattern, subfolder in routing_map.items():
        if pattern in parquet_file.name:
            destination = output_base / subfolder
            break

    if destination:
        split_or_move_parquet(parquet_file, destination)
    else:
        print(f"‚ö†Ô∏è No matching subfolder for {parquet_file.name}, skipping.")

print("üéâ CSV to Parquet + routing complete.")

In [2]:
# 1. ------------ THIS NEEDS TO BE FIXED AND PROBABLY BE STORED AT ASSET LEVEL... TRY CALCULATING AT ASSET LEVEL FIRST
# 2. CHECK OTHER TABLES FOR TEMP GRAIN (GADM & CITY)

# ------------------------------------- ASSET EMISSIONS COUNTRY SUBSECTOR LEVEL -------------------------------------

parquet_path = "zzz_landing_zone/asset_emissions_country_subsector.parquet"
output_path = "asset_emissions/country_subsector_level"

# Use DuckDB to write directly from PostgreSQL to Parquet
con = duckdb.connect()


print("Getting max month...")
max_date = con.execute(f"""
    select max(start_time)
    from postgres_scan('{postgres_url}', 'public', 'asset_emissions')                       
""").fetchone()[0]

print("Aggregating assets to subsector-level and writing to parquet file, this may take a while...")
con.execute(f"""
    INSTALL postgres;
    LOAD postgres;

    CREATE TABLE asset_emissions_parquet AS
    SELECT ae.iso3_country,
        ae.original_inventory_sector,
        itm.activity_is_temporal,
        ae.start_time,
        ae.gas,
        sch.sector,
        ca.name as country_name,
        ca.continent,
        ca.unfccc_annex,
        ca.em_finance,
        ca.eu,
        ca.oecd,
        ca.developed_un,
        ae.release,
        sum(emissions_quantity) emissions_quantity,
        case when activity_is_temporal = true then sum(activity) else avg(activity) end as activity,
        sum(emissions_quantity) / sum(activity) weighted_average_emissions_factor
    
    FROM postgres_scan('{postgres_url}', 'public', 'asset_emissions') ae
    LEFT JOIN postgres_scan('{postgres_url}', 'public', 'country_analysis') ca
        ON CAST(ca.iso3_country AS VARCHAR) = CAST(ae.iso3_country AS VARCHAR)
    LEFT JOIN (
        SELECT DISTINCT sector, subsector FROM postgres_scan('{postgres_url}', 'public', 'asset_schema')
    ) sch
        ON CAST(sch.subsector AS VARCHAR) = CAST(ae.original_inventory_sector AS VARCHAR)
    left join postgres_scan('{postgres_url}', 'public', 'is_temporal_map') itm
        on itm.original_inventory_sector = ae.original_inventory_sector
    
    WHERE ae.start_time >= (
                date_trunc('year', DATE '{max_date}') - INTERVAL '3 YEARS'
            )
      AND ae.gas in ('co2e_100yr','ch4')
      AND ae.most_granular = TRUE
    
    GROUP BY ae.iso3_country,
        ae.original_inventory_sector,
        itm.activity_is_temporal,
        ae.start_time,
        ae.gas,
        sch.sector,
        ca.name,
        ca.continent,
        ca.unfccc_annex,
        ca.em_finance,
        ca.eu,
        ca.oecd,
        ca.developed_un,
        ae.release;

    COPY asset_emissions_parquet TO '{parquet_path}' (FORMAT PARQUET);
""")
con.close()

split_or_move_parquet(parquet_path, output_path)

print("‚úÖ Asset parquet file exported")

Getting max month...
Aggregating assets to subsector-level and writing to parquet file, this may take a while...
‚úÖ File asset_emissions_country_subsector.parquet was 13.6 MB, moved to asset_emissions/country_subsector_level/asset_emissions_country_subsector.parquet
‚úÖ Asset parquet file exported


In [None]:
# ------------------------------------ ERS STRATEGY SCORE ---------------------------------------

parquet_path = "zzz_landing_zone/effectiveness.parquet"

con = duckdb.connect()

print('Query Running: Getting effectiveness score for strategies')
con.execute(f'''
    INSTALL postgres;
    LOAD postgres;

    CREATE TEMP TABLE rdf as
    SELECT rdf.*
        , coalesce(om.asset_output, 'other') asset_output
        , cs.confidence
        , cs.confidence_score
        , cs.feasibility
        , cs.feasibility_score
        , cs.cost
        , 6 - cs.cost_score as cost_score

    FROM postgres_scan('{postgres_url}', 'public', 'reductions_data_fusion') rdf
    LEFT JOIN read_parquet('strategy/categorical_scores/strategy_categorical_scores.parquet') cs
        on cs.strategy_id = rdf.strategy_id 
    LEFT JOIN postgres_scan('{postgres_url}', 'public', 'asset_type_to_output_map') om
        on om.subsector = rdf.original_inventory_sector
        and om.asset_type = rdf.asset_type
    
    WHERE strategy_rank = 1
      AND gas = 'co2e_100yr';

    COPY (
        WITH asset_rf AS (
                SELECT
                        asset_id
                        , asset_output
                        , strategy_name
                        , strategy_description
                        , original_inventory_sector AS subsector
                        , old_emissions_factor
                        , old_activity
                        , total_emissions_reduced_per_year
                        , (total_emissions_reduced_per_year / old_activity) AS reduction_factor
                        , (old_emissions_factor * old_activity) AS w_inventory
                        , confidence
                        , confidence_score
                        , feasibility
                        , feasibility_score
                        , cost
                        , cost_score
                
                FROM rdf
                
                WHERE total_emissions_reduced_per_year IS NOT NULL
                        AND old_activity IS NOT NULL
                        AND old_activity > 0
                ),

        -- 1) Subsector stats using the SAME weight (activity) for mean and stdev
        subsector_stats AS (
                SELECT
                        subsector
                        , asset_output
                        , SUM(w_inventory) AS subsector_emissions_inventory
                       
                        , SUM(total_emissions_reduced_per_year) / NULLIF(SUM(old_activity), 0) AS mu_rf
                        --, avg(reduction_factor) mu_rf
                        --, stddev_samp(reduction_factor) sigma_rf
                        , SQRT(
                                GREATEST(
                                        SUM(old_activity * POWER(reduction_factor, 2)) / NULLIF(SUM(old_activity), 0)
                                                - POWER( SUM(old_activity * reduction_factor) / NULLIF(SUM(old_activity), 0), 2 ),
                                0
                        )
                        ) AS sigma_rf
                
                FROM asset_rf
                
                GROUP BY subsector
                        , asset_output
        ),

        -- 2) Asset-level z-scores
        asset_rf_zscore AS (
                SELECT
                        a.asset_id
                        , a.asset_output
                        , a.strategy_name
                        , a.strategy_description
                        , a.subsector
                        , a.old_emissions_factor
                        , a.old_activity
                        , a.total_emissions_reduced_per_year
                        , a.reduction_factor
                        , a.w_inventory
                        , s.mu_rf
                        , s.sigma_rf
                        , CASE
                                WHEN s.sigma_rf IS NULL OR s.sigma_rf < 1e-12 THEN 0.0
                                ELSE (a.reduction_factor - s.mu_rf) / s.sigma_rf
                          END AS asset_rf_zscore
                        , confidence
                        , confidence_score
                        , feasibility
                        , feasibility_score
                        , cost
                        , cost_score

                FROM asset_rf a
                JOIN subsector_stats s 
                        on a.subsector = s.subsector
                        and a.asset_output = s.asset_output

        )

         -- 3) Strategy roll-up: inventory-weighted mean of asset z
        select strategy_name
                , strategy_description
                , subsector
                , asset_count
                , old_activity
                , strategy_emissions_inventory
                , total_emissions_reduced_per_year
                , reduction_factor 
                , strategy_rf_zscore
                , rank() over (partition by subsector order by strategy_rf_zscore desc) score_rank
                , confidence
                , confidence_score
                , feasibility
                , feasibility_score
                , cost
                , cost_score

        from (
                SELECT
                        arz.strategy_name
                        , arz.strategy_description
                        , arz.subsector
                        , count(distinct arz.asset_id) asset_count
                        , sum(arz.old_activity) old_activity
                        , sum(arz.w_inventory) strategy_emissions_inventory
                        , sum(arz.total_emissions_reduced_per_year) total_emissions_reduced_per_year
                        , sum(arz.total_emissions_reduced_per_year) / sum(arz.old_activity) as reduction_factor
                        --, avg(arz.w_inventory * arz.asset_rf_zscore) / NULLIF(SUM(arz.w_inventory), 0) AS strategy_rf_zscore
                        , avg(arz.asset_rf_zscore) strategy_rf_zscore
                        , confidence
                        , confidence_score
                        , feasibility
                        , feasibility_score
                        , cost
                        , cost_score
                
                FROM asset_rf_zscore arz
                
                GROUP BY arz.strategy_name
                        , arz.strategy_description
                        , arz.subsector
                        , arz.confidence
                        , arz.confidence_score
                        , arz.feasibility
                        , arz.feasibility_score
                        , arz.cost
                        , arz.cost_score
        ) score_subsector_rank

        where total_emissions_reduced_per_year > 0
                and total_emissions_reduced_per_year is not null

        order by subsector, score_rank                                       
    
    ) TO '{parquet_path}' (FORMAT PARQUET);
''')


con.close()

Query Running: Getting effectiveness score for strategies


In [None]:
# -------------------------------- this is asset level scoring! ------------------------------------

# ------------------------------------ ERS STRATEGY SCORE ---------------------------------------

parquet_path = "zzz_landing_zone/effectiveness_asset.parquet"

con = duckdb.connect()

print('Query Running: Getting effectiveness score for assets')
con.execute(f'''
    INSTALL postgres;
    LOAD postgres;

    CREATE TEMP TABLE rdf as
    SELECT rdf.*
        , coalesce(om.asset_output, 'other') asset_output
        , cs.confidence
        , cs.confidence_score
        , cs.feasibility
        , cs.feasibility_score
        , cs.cost
        , 6 - cs.cost_score as cost_score

    FROM postgres_scan('{postgres_url}', 'public', 'reductions_data_fusion') rdf
    LEFT JOIN read_parquet('strategy/categorical_scores/strategy_categorical_scores.parquet') cs
        on cs.strategy_id = rdf.strategy_id 
    LEFT JOIN postgres_scan('{postgres_url}', 'public', 'asset_type_to_output_map') om
        on om.subsector = rdf.original_inventory_sector
        and om.asset_type = rdf.asset_type
    
    WHERE strategy_rank = 1
      AND gas = 'co2e_100yr'
      and rdf.original_inventory_sector in ('solid-waste-disposal', 'iron-and-steel', 'cement', 'electricity-generation', 'water-reservoirs', 'cropland-fires');

    COPY (
        WITH asset_rf AS (
                SELECT
                        asset_id
                        , asset_output
                        , strategy_name
                        , strategy_description
                        , original_inventory_sector AS subsector
                        , old_emissions_factor
                        , old_activity
                        , total_emissions_reduced_per_year
                        , (total_emissions_reduced_per_year / old_activity) AS reduction_factor
                        , (old_emissions_factor * old_activity) AS w_inventory
                        , confidence
                        , confidence_score
                        , feasibility
                        , feasibility_score
                        , cost
                        , cost_score
                
                FROM rdf
                
                WHERE total_emissions_reduced_per_year IS NOT NULL
                        AND old_activity IS NOT NULL
                        AND old_activity > 0
                ),

        subsector_stats AS (
                SELECT
                        subsector
                        , asset_output
                        , SUM(w_inventory) AS subsector_emissions_inventory
                       
                        , SUM(total_emissions_reduced_per_year) / NULLIF(SUM(old_activity), 0) AS mu_rf
                        --, avg(reduction_factor) mu_rf
                        --, stddev_samp(reduction_factor) sigma_rf
                        , SQRT(
                                GREATEST(
                                        SUM(old_activity * POWER(reduction_factor, 2)) / NULLIF(SUM(old_activity), 0)
                                                - POWER( SUM(old_activity * reduction_factor) / NULLIF(SUM(old_activity), 0), 2 ),
                                0
                        )
                        ) AS sigma_rf
                
                FROM asset_rf
                
                GROUP BY subsector
                        , asset_output
        )
                
        SELECT
                a.asset_id
                , a.asset_output
                , a.strategy_name
                , a.strategy_description
                , a.subsector
                , a.old_emissions_factor
                , a.old_activity
                , a.total_emissions_reduced_per_year
                , a.reduction_factor
                , a.w_inventory
                , s.mu_rf
                , s.sigma_rf
                , CASE
                        WHEN s.sigma_rf IS NULL OR s.sigma_rf < 1e-12 THEN 0.0
                        ELSE (a.reduction_factor - s.mu_rf) / s.sigma_rf
                  END AS asset_rf_zscore
                
                , CASE
                        WHEN s.sigma_rf IS NULL OR s.sigma_rf < 1e-12 THEN 0.0
                        ELSE (
                            -- Step 1: Z-score bounded by -3 -> 3
                            CASE
                                WHEN (a.reduction_factor - s.mu_rf) / s.sigma_rf < -3 THEN -3
                                WHEN (a.reduction_factor - s.mu_rf) / s.sigma_rf > 3 THEN 3
                                ELSE (a.reduction_factor - s.mu_rf) / s.sigma_rf
                            END
                            
                            -- Step 2: Normalize to 1‚Äì5 score
                            + 3.0
                        ) * (4.0 / 6.0) + 1.0
                    END AS asset_rf_score
                
                , confidence
                , confidence_score
                , feasibility
                , feasibility_score
                , cost
                , cost_score

        FROM asset_rf a
        JOIN subsector_stats s 
                on a.subsector = s.subsector
                and a.asset_output = s.asset_output

                                         
    
    ) TO '{parquet_path}' (FORMAT PARQUET);
''')


con.close()

Query Running: Getting effectiveness score for assets


CatalogException: Catalog Error: Type with name c does not exist!
Did you mean "char"?

In [3]:
# ------------------------------------ Asset Annual Emissions ------------------------------------

################### CURRENTLY USING DATA FUSION TABLES, NEEDS TO BE CHANGED BACK WHEN READY

parquet_path = "zzz_landing_zone/asset_annual_emissions.parquet"

con = duckdb.connect()

print('Query Running: Aggregating asset data to annual level and adding ERS...')
con.execute( f'''
	INSTALL postgres;
	LOAD postgres;

	CREATE TABLE asset_annual_emissions_parquet AS
	select extract(year from ae.start_time) as year
		, cast(ae.asset_id as text) as asset_id
		, ai.asset_type
		, CASE 
				WHEN ae.original_inventory_sector = 'iron-and-steel' AND ai.asset_type LIKE '%BF%' 
					THEN '{{''iron-and-steel'': [''BF'', ''DRI-EAF'']}}'
				WHEN ae.original_inventory_sector = 'aluminum' AND ai.asset_type LIKE '%Refinery%' 
					THEN '{{''aluminum'': [''Refinery'']}}'
				WHEN ae.original_inventory_sector = 'aluminum' AND ai.asset_type LIKE '%Smelting%' 
					THEN '{{''aluminum'': [''Smelting'']}}'
				ELSE 'all' 
			END AS asset_type_2
		, ai.asset_name
		, ae.iso3_country
		, ca.name as country_name
        , abc.region balancing_authority_region
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
		, asch.sector
		, ae.original_inventory_sector as subsector
        , itm.activity_is_temporal
        , ae.lat_lon
		, al.gadm_1
		, al.gadm_2
		, al.ghs_fua
		, al.city_id
		, ae.other1
		, ae.other2
		, ae.other3
		, ae.other4
		, ae.other5
		, ae.other6
		, ae.other7
		, ae.other8
		, ae.other9
		, ae.other10
		, ae.activity_units
		, sum(capacity) capacity
		, case when activity_is_temporal = true then sum(activity) else avg(activity) end as activity
		, avg(emissions_factor) average_emissions_factor
		, sum(emissions_quantity) emissions_quantity
        , 'asset' as reduction_q_type
        , ers.strategy_id
		, ers.strategy_name
		, ers.strategy_description
		, ers.mechanism
		, ers.old_activity
		, ers.affected_activity
		, ers.old_emissions_factor
		, ers.new_emissions_factor
		, ers.emissions_reduced_at_asset
		, ers.induced_sector_1
		, ers.induced_sector_1_induced_emissions
		, ers.induced_sector_2
		, ers.induced_sector_2_induced_emissions
		, ers.induced_sector_3
		, ers.induced_sector_3_induced_emissions
		, ers.total_emissions_reduced_per_year

	from postgres_scan('{postgres_url}','public', 'asset_emissions_data_fusion') ae
	left join postgres_scan('{postgres_url}','public', 'asset_information_data_fusion') ai
		on ai.asset_id = ae.asset_id
	left join postgres_scan('{postgres_url}','public', 'asset_location_data_fusion') al
		on al.asset_id = ae.asset_id
	left join (
		select distinct sector, subsector from postgres_scan('{postgres_url}','public', 'asset_schema')
	) asch
		on cast(asch.subsector as varchar) = cast(ae.original_inventory_sector as varchar)
	left join postgres_scan('{postgres_url}','public', 'country_analysis') ca
		on cast(ca.iso3_country as varchar) = cast(ae.iso3_country as varchar)
    left join postgres_scan('{postgres_url}','public', 'asset_ba_crosswalk') abc
		on cast(abc.asset_id as text) = cast(ae.asset_id as text)
    left join (
		select rdf.* 
		from postgres_scan('{postgres_url}','public','reductions_data_fusion') rdf
        where strategy_rank = 1
			and rdf.gas = 'co2e_100yr'
    ) ers
		on ers.asset_id = ae.asset_id
    left join postgres_scan('{postgres_url}', 'public', 'is_temporal_map') itm
         on cast(itm.original_inventory_sector as text) = cast(ae.original_inventory_sector as text)

	where extract(year from ae.start_time) = 2024
		and ae.most_granular = true
		and ae.gas = 'co2e_100yr'


	group by extract(year from ae.start_time)
		, ae.asset_id
		, ai.asset_type
        , CASE 
				WHEN ae.original_inventory_sector = 'iron-and-steel' AND ai.asset_type LIKE '%BF%' 
					THEN '{{''iron-and-steel'': [''BF'', ''DRI-EAF'']}}'
				WHEN ae.original_inventory_sector = 'aluminum' AND ai.asset_type LIKE '%Refinery%' 
					THEN '{{''aluminum'': [''Refinery'']}}'
				WHEN ae.original_inventory_sector = 'aluminum' AND ai.asset_type LIKE '%Smelting%' 
					THEN '{{''aluminum'': [''Smelting'']}}'
				ELSE 'all' 
			END
		, ai.asset_name
		, ae.iso3_country
		, ca.name
        , abc.region
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
		, asch.sector
		, ae.original_inventory_sector
        , itm.activity_is_temporal
        , ae.lat_lon
		, al.gadm_1
		, al.gadm_2
		, al.ghs_fua
		, al.city_id
		, ae.other1
		, ae.other2
		, ae.other3
		, ae.other4
		, ae.other5
		, ae.other6
		, ae.other7
		, ae.other8
		, ae.other9
		, ae.other10
		, ae.activity_units
        , ers.strategy_id
		, ers.strategy_name
		, ers.strategy_description
		, ers.mechanism
		, ers.old_activity
		, ers.affected_activity
		, ers.old_emissions_factor
		, ers.new_emissions_factor
		, ers.emissions_reduced_at_asset
		, ers.induced_sector_1
		, ers.induced_sector_1_induced_emissions
		, ers.induced_sector_2
		, ers.induced_sector_2_induced_emissions
		, ers.induced_sector_3
		, ers.induced_sector_3_induced_emissions
		, ers.total_emissions_reduced_per_year
        
        UNION ALL
        
        SELECT 
			NULL AS year,
			asset_id,
			gr.asset_type,
			NULL AS asset_type_2,
			gr.asset_name,
			ca.iso3_country,
			ca.name AS country_name,
			NULL AS balancing_authority_region,
			ca.continent,
			ca.eu,
			ca.oecd,
			ca.unfccc_annex,
			ca.developed_un,
			ca.em_finance,
			asch.sector,
			gr.original_inventory_sector AS subsector,
			itm.activity_is_temporal,
            null as lat_lon,
			CASE 
				WHEN gb.admin_level = 1 THEN gb.gadm_id 
				WHEN gb.admin_level = 2 THEN gb.immediate_parent 
				ELSE NULL 
			END AS gadm_1,
			CASE 
				WHEN gb.admin_level = 2 THEN gb.gadm_id 
				ELSE NULL 
			END AS gadm_2,
			NULL AS ghs_fua,
			NULL AS city_id,
			NULL AS other1,
			NULL AS other2,
			NULL AS other3,
			NULL AS other4,
			NULL AS other5,
			NULL AS other6,
			NULL AS other7,
			NULL AS other8,
			NULL AS other9,
			NULL AS other10,
			NULL AS activity_units,
			0 AS capacity,
			0 AS activity,
			0 AS average_emissions_factor,
			0 AS emissions_quantity,
			'remainder' AS reduction_q_type,
			gr.strategy_id,
			gr.strategy_name,
			gr.strategy_description,
			gr.mechanism,
			gr.old_activity,
			gr.affected_activity,
			gr.old_emissions_factor,
			gr.new_emissions_factor,
			gr.emissions_reduced_at_asset,
			gr.induced_sector_1,
			gr.induced_sector_1_induced_emissions,
			gr.induced_sector_2,
			gr.induced_sector_2_induced_emissions,
			gr.induced_sector_3,
			gr.induced_sector_3_induced_emissions,
			gr.total_emissions_reduced_per_year
        
        FROM postgres_scan('{postgres_url}','public', 'gadm_reductions_data_fusion') gr
        LEFT JOIN (
			select distinct sector, subsector from postgres_scan('{postgres_url}','public', 'asset_schema')
		) asch
			on cast(asch.subsector as varchar) = cast(gr.original_inventory_sector as varchar)
        LEFT JOIN (
			select distinct gadm_id, iso3_country, admin_level, immediate_parent
            from postgres_scan('{postgres_url}','public', 'gadm_boundaries')
        ) gb
			on gb.gadm_id = gr.asset_id
        LEFT JOIN postgres_scan('{postgres_url}','public', 'country_analysis') ca
			on ca.iso3_country = gb.iso3_country
        LEFT JOIN postgres_scan('{postgres_url}', 'public', 'is_temporal_map') itm
        	on cast(itm.original_inventory_sector as text) = cast(gr.original_inventory_sector as text)
            
        WHERE gr.strategy_rank = 1
			and gr.gas = 'co2e_100yr'
        ;
            
    COPY asset_annual_emissions_parquet TO '{parquet_path}' (FORMAT PARQUET);
            
    ''')


# removing forestry sectors from query
		# and ae.original_inventory_sector not in ('forest-land-clearing',
		# 											'forest-land-degradation',
		# 											'forest-land-fires',
		# 											'net-forest-land',
		# 											'net-shrubgrass',
		# 											'net-wetland',
		# 											'removals',
		# 											'shrubgrass-fires',
		# 											'water-reservoirs',
		# 											'wetland-fires')

con.close()

## ---------------------------------- ADD MOER FACTORS --------------------------------------
parquet_path = Path('zzz_landing_zone/asset_annual_emissions.parquet')
landing_zone_path = Path('zzz_landing_zone/asset_annual_emissions_moer.parquet')
output_path =  Path('asset_emissions/asset_level_2024')

df_asset = pd.read_parquet(parquet_path)

# adding moer data to assets
asset_moer_df = data_add_moer(df_asset, cond={"moer": True})

# converting data to new parquet file
asset_moer_df.to_parquet(landing_zone_path, index=False)

# freeing up memory
print("Deleting asset-moer dataframes to free up memory.")
del df_asset
del asset_moer_df
gc.collect()

# deleting original asset file
parquet_path.unlink()
print("Original asset file deleted")

# splitting asset data into chunks
split_or_move_parquet(landing_zone_path, output_path)

print("Successfully updated asset_level data.")

Query Running: Aggregating asset data to annual level and adding ERS...
Deleting asset-moer dataframes to free up memory.
Original asset file deleted
‚ö° Splitting asset_annual_emissions_moer.parquet (673.5 MB)...
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer_chunk_1.parquet (46.1 MB, rows 0‚Äì426113)
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer_chunk_2.parquet (46.0 MB, rows 426113‚Äì852226)
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer_chunk_3.parquet (46.0 MB, rows 852226‚Äì1278339)
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer_chunk_4.parquet (46.1 MB, rows 1278339‚Äì1704452)
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer_chunk_5.parquet (46.2 MB, rows 1704452‚Äì2130565)
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer_chunk_6.parquet (46.1 MB, rows 2130565‚Äì2556678)
  - Saved asset_emissions/asset_level_2024/asset_annual_emissions_moer

In [4]:
# -------------------------------------- Uncomment and run this if you created the file in the block above, but couldn't get it to split and moved to the correct folder

# parquet_path = Path('zzz_landing_zone/asset_annual_emissions.parquet')
# landing_zone_path = Path('zzz_landing_zone/asset_annual_emissions_moer.parquet')
# output_path =  Path('asset_emissions/asset_level_2024')

# df_asset = pd.read_parquet(parquet_path)

# # adding moer data to assets
# asset_moer_df = data_add_moer(df_asset, cond={"moer": True})

# # converting data to new parquet file
# asset_moer_df.to_parquet(landing_zone_path, index=False)

# # freeing up memory
# print("Deleting asset-moer dataframes to free up memory.")
# del df_asset
# del asset_moer_df
# gc.collect()

# # deleting original asset file
# parquet_path.unlink()
# print("Original asset file deleted")

# # splitting asset data into chunks
# split_or_move_parquet(landing_zone_path, output_path)

# print("Successfully updated asset_level data.")

In [5]:
# ------------------------------------ GADM_0 Emissions ------------------------------------


parquet_path = "zzz_landing_zone/gadm_0_emissions.parquet"
output_path = "gadm_emissions/gadm_0"

# Use DuckDB to write directly from PostgreSQL to Parquet
con = duckdb.connect()


print('Running query')
con.execute(f'''
    INSTALL postgres;
    LOAD postgres;

    CREATE TABLE gadm_0_emissions_parquet AS
    select extract(year from g0e.start_time) as year 
        , g0e.gadm_id
        , gb.gid
        , gb.admin_level
        , g0e.iso3_country
        , ca.name as country_name
        , gb.name gadm_0_name
        , gb.corrected_name gadm_0_corrected_name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
        , asch.sector
        , g0e.original_inventory_sector subsector
        , itm.activity_is_temporal
        , g0e.gas
        , case when activity_is_temporal = true then sum(asset_activity) else avg(asset_activity) end as asset_activity
        , sum(asset_emissions) asset_emissions
        , case when activity_is_temporal = true then sum(remainder_activity) else avg(remainder_activity) end as remainder_activity
        , sum(remainder_emissions) remainder_emissions
        , sum(asset_emissions) + sum(remainder_emissions) as emissions_quantity

    from postgres_scan('{postgres_url}', 'public', 'gadm_0_emissions') g0e
    left join (
        select distinct gadm_id
            , gid
            , name
            , corrected_name
            , admin_level
        from postgres_scan('{postgres_url}','public', 'gadm_boundaries') 
        where admin_level = 0
    ) as gb
        on g0e.gadm_id = gb.gadm_id
    left join (
        select distinct sector
            , subsector
        from postgres_scan('{postgres_url}','public', 'asset_schema') 
    ) asch
        on cast(asch.subsector as varchar) = cast(g0e.original_inventory_sector as varchar)
    left join postgres_scan('{postgres_url}','public', 'country_analysis') ca
		on cast(ca.iso3_country as varchar) = cast(g0e.iso3_country as varchar)
    left join postgres_scan('{postgres_url}', 'public', 'is_temporal_map') itm
         on itm.original_inventory_sector = g0e.original_inventory_sector

    where g0e.gas = 'co2e_100yr'
        and extract(year from start_time) = 2024
        
    group by extract(year from g0e.start_time) 
        , g0e.gadm_id
        , gb.gid
        , gb.admin_level
        , g0e.iso3_country
        , ca.name
        , gb.name 
        , gb.corrected_name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
        , asch.sector
        , g0e.original_inventory_sector
        , itm.activity_is_temporal
        , g0e.gas;

    COPY gadm_0_emissions_parquet TO '{parquet_path}' (FORMAT PARQUET);
''')

# and g0e.original_inventory_sector not in ('forest-land-clearing',
#                                                 'forest-land-degradation',
#                                                 'forest-land-fires',
#                                                 'net-forest-land',
#                                                 'net-shrubgrass',
#                                                 'net-wetland',
#                                                 'removals',
#                                                 'shrubgrass-fires',
#                                                 'water-reservoirs',
#                                                 'wetland-fires')


con.close()

split_or_move_parquet(parquet_path, output_path)

print('Successfully refreshed GADM_0 data.')

Running query
‚úÖ File gadm_0_emissions.parquet was 0.5 MB, moved to gadm_emissions/gadm_0/gadm_0_emissions.parquet
Successfully refreshed GADM_0 data.


In [6]:

# ------------------------------------ GADM 1 Emissions ------------------------------------

parquet_path = "zzz_landing_zone/gadm_1_emissions.parquet"
output_path = "gadm_emissions/gadm_1"

# Use DuckDB to write directly from PostgreSQL to Parquet
con = duckdb.connect()


print('Running query')
con.execute(f'''
    INSTALL postgres;
    LOAD postgres;

    CREATE TABLE gadm_1_emissions_parquet AS
    select extract(year from g1e.start_time) as year 
        , g1e.gadm_id
        , gb.gid
        , gb.admin_level
        , g1e.iso3_country
        , ca.name as country_name
        , gb.name gadm_1_name
        , gb.corrected_name gadm_1_corrected_name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
        , asch.sector
        , g1e.original_inventory_sector subsector
        , itm.activity_is_temporal
        , g1e.gas
        , case when activity_is_temporal = true then sum(asset_activity) else avg(asset_activity) end as asset_activity
        , sum(asset_emissions) asset_emissions
        , case when activity_is_temporal = true then sum(remainder_activity) else avg(remainder_activity) end as remainder_activity
        , sum(remainder_emissions) remainder_emissions
        , sum(asset_emissions) + sum(remainder_emissions) as emissions_quantity

    from postgres_scan('{postgres_url}', 'public', 'gadm_1_emissions') g1e
    left join (
        select distinct gadm_id
            , gid
            , name
            , corrected_name
            , admin_level
        from postgres_scan('{postgres_url}','public', 'gadm_boundaries') 
        where admin_level = 1
    ) as gb
        on g1e.gadm_id = gb.gadm_id
    left join (
        select distinct sector
            , subsector
        from postgres_scan('{postgres_url}','public', 'asset_schema') 
    ) asch
        on cast(asch.subsector as varchar) = cast(g1e.original_inventory_sector as varchar)
    left join postgres_scan('{postgres_url}','public', 'country_analysis') ca
		on cast(ca.iso3_country as varchar) = cast(g1e.iso3_country as varchar)
    left join postgres_scan('{postgres_url}', 'public', 'is_temporal_map') itm
         on itm.original_inventory_sector = g1e.original_inventory_sector

    where g1e.gas = 'co2e_100yr'
        and extract(year from start_time) = 2024
        

    group by extract(year from g1e.start_time) 
        , g1e.gadm_id
        , gb.gid
        , gb.admin_level
        , g1e.iso3_country
        , ca.name
        , gb.name 
        , gb.corrected_name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
        , asch.sector
        , g1e.original_inventory_sector
        , itm.activity_is_temporal
        , g1e.gas;

    COPY gadm_1_emissions_parquet TO '{parquet_path}' (FORMAT PARQUET);
''')

# and g1e.original_inventory_sector not in ('forest-land-clearing',
#                                                 'forest-land-degradation',
#                                                 'forest-land-fires',
#                                                 'net-forest-land',
#                                                 'net-shrubgrass',
#                                                 'net-wetland',
#                                                 'removals',
#                                                 'shrubgrass-fires',
#                                                 'water-reservoirs',
#                                                 'wetland-fires')
con.close()

split_or_move_parquet(parquet_path, output_path)

print('Successfully refreshed GADM_1 data.')

Running query
‚úÖ File gadm_1_emissions.parquet was 7.7 MB, moved to gadm_emissions/gadm_1/gadm_1_emissions.parquet
Successfully refreshed GADM_1 data.


In [7]:
# --------------------------------------------------------- GADM 2 BATCH -----------------------------------------------------------------

conn = psycopg2.connect(
    dbname=database,
    user=user,
    password=password,
    host=host,
    port=port
)

cur = conn.cursor(name='parquet_cursor')  # server-side cursor

cur.execute("""
     select extract(year from ge.start_time) as year 
        , gb1.gadm_id gadm_1_id
        , gb1.name gadm_1_name
        , gb1.corrected_name gadm_1_corrected_name
        , ge.gadm_id gadm_2_id
        , gb2.name gadm_2_name
        , gb2.corrected_name gadm_2_corrected_name
        , gb2.gid
        , gb2.admin_level
        , ge.iso3_country
        , ca.name as country_name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
        , asch.sector
        , ge.original_inventory_sector subsector
        , itm.activity_is_temporal
        , case when activity_is_temporal = true then sum(asset_activity) else avg(asset_activity) end as asset_activity
        , sum(asset_emissions) asset_emissions
        , case when activity_is_temporal = true then sum(remainder_activity) else avg(remainder_activity) end as remainder_activity
        , sum(remainder_emissions) remainder_emissions
        , sum(asset_emissions) + sum(remainder_emissions) as emissions_quantity

    from gadm_emissions ge
    inner join (
        select distinct gadm_id
            , gid
            , immediate_parent
            , name
            , corrected_name
            , admin_level
        from gadm_boundaries
        where admin_level = 2
    ) as gb2
        on ge.gadm_id = gb2.gadm_id
    left join (
        select distinct sector
            , subsector
        from asset_schema
    ) asch
        on cast(asch.subsector as varchar) = cast(ge.original_inventory_sector as varchar)
    left join (
        select gadm_id
            , name
            , corrected_name
        from gadm_boundaries
        where admin_level = 1
    ) gb1
        on gb1.gadm_id = gb2.immediate_parent
    left join country_analysis ca
        on cast(ca.iso3_country as varchar) = cast(ge.iso3_country as varchar)
    left join is_temporal_map itm
        on itm.original_inventory_sector = cast(ge.original_inventory_sector as text)

    where ge.gas = 'co2e_100yr'
        and extract(year from start_time) = 2024


    group by extract(year from ge.start_time)
        , gb1.gadm_id 
        , gb1.name
        , gb1.corrected_name
        , ge.gadm_id 
        , gb2.name
        , gb2.corrected_name
        , gb2.gid
        , gb2.admin_level
        , ge.iso3_country
        , ca.name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
        , asch.sector
        , ge.original_inventory_sector
        , itm.activity_is_temporal
    """)

        # and ge.original_inventory_sector not in ('forest-land-clearing',
        #                                         'forest-land-degradation',
        #                                         'forest-land-fires',
        #                                         'net-forest-land',
        #                                         'net-shrubgrass',
        #                                         'net-wetland',
        #                                         'removals',
        #                                         'shrubgrass-fires',
        #                                         'water-reservoirs',
        #                                         'wetland-fires')

# Set up Parquet writer
batch_size = 10000
output_file = "zzz_landing_zone/gadm_2_emissions.parquet"
output_path = "gadm_emissions/gadm_2"
batch_count = 0
total_rows = 0

print("executing gadm_2 query...")

# Fetch first batch
rows = cur.fetchmany(batch_size)
if not rows:
    raise Exception("No data returned from query.")

field_names = [desc[0] for desc in cur.description]
first_table = pa.Table.from_pylist([dict(zip(field_names, row)) for row in rows])
writer = pq.ParquetWriter(output_file, first_table.schema)
writer.write_table(first_table)
batch_count += 1
total_rows += len(rows)
print(f"Processed batch {batch_count} ({len(rows)} rows), total rows: {total_rows}")

# Process remaining batches
while True:
    rows = cur.fetchmany(batch_size)
    if not rows:
        break

    table = pa.Table.from_pylist([dict(zip(field_names, row)) for row in rows])
    table = table.cast(writer.schema)  # ensure schema matches first batch
    writer.write_table(table)

    batch_count += 1
    total_rows += len(rows)
    print(f"Processed batch {batch_count} ({len(rows)} rows), total rows: {total_rows}")

writer.close()
cur.close()
conn.close()

split_or_move_parquet(output_file, output_path)

print("Successfully refreshed GADM_2 data.")

executing gadm_2 query...
Processed batch 1 (10000 rows), total rows: 10000
Processed batch 2 (10000 rows), total rows: 20000
Processed batch 3 (10000 rows), total rows: 30000
Processed batch 4 (10000 rows), total rows: 40000
Processed batch 5 (10000 rows), total rows: 50000
Processed batch 6 (10000 rows), total rows: 60000
Processed batch 7 (10000 rows), total rows: 70000
Processed batch 8 (10000 rows), total rows: 80000
Processed batch 9 (10000 rows), total rows: 90000
Processed batch 10 (10000 rows), total rows: 100000
Processed batch 11 (10000 rows), total rows: 110000
Processed batch 12 (10000 rows), total rows: 120000
Processed batch 13 (10000 rows), total rows: 130000
Processed batch 14 (10000 rows), total rows: 140000
Processed batch 15 (10000 rows), total rows: 150000
Processed batch 16 (10000 rows), total rows: 160000
Processed batch 17 (10000 rows), total rows: 170000
Processed batch 18 (10000 rows), total rows: 180000
Processed batch 19 (10000 rows), total rows: 190000
Proc

In [8]:
# ------------------------------------ City Emissions ------------------------------------

parquet_path = "zzz_landing_zone/city_emissions.parquet"
output_path = "city_emissions"

# Use DuckDB to write directly from PostgreSQL to Parquet
con = duckdb.connect()

print('Running query...')
con.execute( f'''
	INSTALL postgres;
	LOAD postgres;

	CREATE TABLE city_emissions_parquet AS
    
	select extract(year from start_time) as year
		, ce.city_id
		, cb.name as city_name
		, cb.corrected_name as corrected_name
		, ce.iso3_country
		, ca.name as country_name
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
		, asch.sector
		, ce.original_inventory_sector as subsector
        , itm.activity_is_temporal
		, case when activity_is_temporal = true then sum(asset_activity) else avg(asset_activity) end as asset_activity
		, sum(asset_emissions) asset_emissions
		, case when activity_is_temporal = true then sum(remainder_activity) else avg(remainder_activity) end as remainder_activity
		, sum(remainder_emissions) remainder_emissions
		, sum(asset_emissions) + sum(remainder_emissions) as emissions_quantity

	from postgres_scan('{postgres_url}','public', 'city_emissions') ce
	left join postgres_scan('{postgres_url}','public', 'city_boundaries') cb
		on cb.city_id = ce.city_id
        and cb.reporting_entity = 'ghs-fua'
	left join (
		select distinct sector, subsector
		from postgres_scan('{postgres_url}','public', 'asset_schema')
	) asch
		on cast(asch.subsector as varchar) = cast(ce.original_inventory_sector as varchar)
	left join postgres_scan('{postgres_url}','public', 'country_analysis') ca
		on cast(ca.iso3_country as varchar) = cast(ce.iso3_country as varchar)
    left join postgres_scan('{postgres_url}', 'public', 'is_temporal_map') itm
         on itm.original_inventory_sector = ce.original_inventory_sector

	where extract(year from ce.start_time) = 2024
		and ce.gas = 'co2e_100yr'
        and cb.city_id is not null

	group by extract(year from start_time) 
		, ce.city_id
		, cb.name 
		, cb.corrected_name 
		, ce.iso3_country
		, ca.name 
        , ca.continent
        , ca.eu
        , ca.oecd
        , ca.unfccc_annex
        , ca.developed_un
        , ca.em_finance
		, asch.sector
		, ce.original_inventory_sector
        , itm.activity_is_temporal;
            
    COPY city_emissions_parquet TO '{parquet_path}' (FORMAT PARQUET);
            
    ''')

# and ce.original_inventory_sector not in ('forest-land-clearing',
# 														'forest-land-degradation',
# 														'forest-land-fires',
# 														'net-forest-land',
# 														'net-shrubgrass',
# 														'net-wetland',
# 														'removals',
# 														'shrubgrass-fires',
# 														'water-reservoirs',
# 														'wetland-fires')

con.close()

split_or_move_parquet(parquet_path, output_path)

print('Successfuly refreshed city_emissions data.')

Running query...
‚úÖ File city_emissions.parquet was 25.7 MB, moved to city_emissions/city_emissions.parquet
Successfuly refreshed city_emissions data.


In [9]:
# ------------------------------------ Asset Ownership ------------------------------------

parquet_path = "zzz_landing_zone/asset_ownership.parquet"
output_path = "ownership"

# Use DuckDB to write directly from PostgreSQL to Parquet
con = duckdb.connect()

print('Running query...')
con.execute( f'''
	INSTALL postgres;
	LOAD postgres;

	CREATE TABLE asset_ownership_parquet AS
            
    SELECT *
    FROM postgres_scan('{postgres_url}','public', 'asset_ownership');
    
    COPY asset_ownership_parquet TO '{parquet_path}' (FORMAT PARQUET);
            
    ''')

con.close()

split_or_move_parquet(parquet_path, output_path)

print('Successfully refreshed ownership data.')

Running query...
‚úÖ File asset_ownership.parquet was 7.5 MB, moved to ownership/asset_ownership.parquet
Successfully refreshed ownership data.


In [10]:
# ------------------------------------ Demographic ------------------------------------

parquet_path = "zzz_landing_zone/deomgraphic.parquet"
output_path = "demographic"

con = duckdb.connect()

print('Running query...')
con.execute( f'''
	INSTALL postgres;
	LOAD postgres;

	CREATE TABLE demographic_parquet AS
            
    select *
    from postgres_scan('{postgres_url}', 'public', 'demographic_data')
    where version = 'global_pop_2024_CN_1km_R2024B_UA_v1';
    
    COPY demographic_parquet TO '{parquet_path}' (FORMAT PARQUET);
            
    ''')

con.close()

split_or_move_parquet(parquet_path, output_path)

print('Successfully refreshed ownership data.')

Running query...
‚úÖ File deomgraphic.parquet was 1.6 MB, moved to demographic/deomgraphic.parquet
Successfully refreshed ownership data.


In [11]:
print('Data refresh complete!')

Data refresh complete!
