In [None]:
from utils.run_sql import run_sql

# grabbing data from asset table
asset_query = '''
    select ca.name country_name
        , ae.iso3_country
        , original_inventory_sector
        , to_char(start_time, 'YYYY-MM') as year_month
        , ae.gas
        , sum(activity) activity
        , avg(emissions_factor) mean_emissions_factor
        , sum(emissions_quantity) emissions_quantity

    from asset_emissions ae
    left join country_analysis ca
        on cast(ca.iso3_country as varchar) = cast(ae.iso3_country as varchar)

    where start_time >= '2022-02-01'
        and ae.gas = 'co2e_100yr'
        -- and original_inventory_sector in ('international-shipping','domestic-shipping','electricity-generation')
        and most_granular = true

    group by ca.name
        , ae.iso3_country
        , original_inventory_sector
        , to_char(start_time, 'YYYY-MM')
        , ae.gas
'''

asset_df = run_sql(asset_query)



In [None]:
from tqdm import tqdm
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv
from urllib.parse import quote_plus

# Load .env
load_dotenv()

# Connection vars
user = quote_plus(os.getenv("CLIMATETRACE_USER"))
password = quote_plus(os.getenv("CLIMATETRACE_PASS"))
host = os.getenv("CLIMATETRACE_HOST")
port = os.getenv("CLIMATETRACE_PORT")
database = os.getenv("CLIMATETRACE_DB")

engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")

# Your raw (non-aggregated) query
query = '''
    SELECT ae.*,
           ca.name AS country_name
    FROM asset_emissions ae
    LEFT JOIN country_analysis ca
      ON CAST(ca.iso3_country AS VARCHAR) = CAST(ae.iso3_country AS VARCHAR)
    WHERE ae.start_time >= '2022-02-01'
      AND ae.gas = 'co2e_100yr'
      AND ae.most_granular = TRUE
'''

# Output path
os.makedirs("data/asset_parquet", exist_ok=True)
parquet_path = "data/asset_parquet/asset_emissions_most_granular.parquet"

print('Running query... this will take a while...')
# Start query with chunking
chunk_iter = pd.read_sql_query(query, engine, chunksize=100_000)

# Init variables
parquet_writer = None
total_rows = 0
progress_bar = tqdm(desc="Exporting full raw data to Parquet", unit="chunk")

# First chunk to define schema
first_chunk = next(chunk_iter)
table = pa.Table.from_pandas(first_chunk)
parquet_writer = pq.ParquetWriter(parquet_path, table.schema)
parquet_writer.write_table(table)
total_rows += len(first_chunk)
progress_bar.update(1)

# Remaining chunks
for chunk in chunk_iter:
    # Align schema (just to be sure, if columns were reordered)
    chunk_table = pa.Table.from_pandas(chunk).cast(table.schema)
    parquet_writer.write_table(chunk_table)
    total_rows += len(chunk)
    progress_bar.update(1)

parquet_writer.close()
progress_bar.close()
print(f"\n✅ Done! Total rows exported: {total_rows:,}")


Running query... this will take a while...


Exporting full raw data to Parquet: 348chunk [03:55,  1.47chunk/s]


✅ Done! Total rows exported: 34,778,764





In [6]:
import duckdb

con = duckdb.connect()

# Replace with your actual path if needed
parquet_path = "data/asset_parquet/asset_emissions_most_granular.parquet"

# Count how many rows you actually wrote
result = con.execute(f"SELECT extract(year from start_time) as year, sum(emissions_quantity) FROM '{parquet_path}' group by extract(year from start_time)").fetchall()

for row in result:
    print(row)
print(result)


(2022, 32463955811.255875)
(2023, 50000020799.169)
(2025, 7630239785.8226795)
(2024, 51870569802.52423)
[(2022, 32463955811.255875), (2023, 50000020799.169), (2025, 7630239785.8226795), (2024, 51870569802.52423)]
