# CEMS Allocater

In [2]:
%load_ext autoreload
%autoreload 2

import numpy as np
import pudl
import pandas as pd
import logging
import sys
import sqlalchemy as sa
import dask.dataframe as dd

# basic setup for logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
logger.handlers = [handler]

In [3]:
pudl_settings = pudl.workspace.setup.get_defaults()
pudl_engine = sa.create_engine(pudl_settings["pudl_db"])
pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine,freq='AS')

#### CEMS

In [8]:
epacems_path = (pudl_settings['parquet_dir'] + f'/epacems/hourly_emissions_epacems.parquet')
cems_dd = dd.read_parquet(
    epacems_path, 
    columns=["year", "plant_id_eia", "emissions_unit_id_epa", "co2_mass_tons"],
)

In [9]:
cems_df = cems_dd.groupby(["year", "plant_id_eia", "emissions_unit_id_epa"]).sum().compute()

In [49]:
cems_df = cems_df.reset_index()

In [118]:
cems_df["plant_id_eia"] = cems_df.plant_id_eia.astype("Int64")
cems_df["co2_mass_tons"] = cems_df.co2_mass_tons.fillna(0)

#### Crosswalk

In [23]:
crosswalk_df = pudl_out.epacamd_eia_crosswalk()

#### EIA

In [12]:
eia_gens_df  = pudl_out.gens_eia860()

Filling technology type
Filled technology_type coverage now at 98.1%


#### Allocate

In [355]:
def allocate_cols(
    to_allocate: pd.DataFrame, by: list, data_and_allocator_cols: dict
) -> pd.DataFrame:
    """
    Allocate larger dataset records porportionally by EIA plant-part columns.
    Args:
        to_allocate: table of data that has been merged with the EIA plant-parts
            records of the scale that you want the output to be in.
        by: columns to group by.
        data_and_allocator_cols: dict of data columns that you want to allocate (keys)
            and ordered list of columns to allocate porportionally based on. Values
            ordered based on priority: if non-null result from frist column, result
            will include first column result, then second and so on.
    Returns:
        an augmented version of ``to_allocate`` with the data columns (keys in
        ``data_and_allocator_cols``) allocated proportionally.
    """
    # add a total column for all of the allocate cols.
    all_allocator_cols = list(set(sum(data_and_allocator_cols.values(), [])))
    to_allocate.loc[:, [f"{c}_total" for c in all_allocator_cols]] = (
        to_allocate.groupby(by=by, dropna=False)[all_allocator_cols]
        .transform(sum, min_count=1)
        .add_suffix("_total")
    )
    # for each of the columns we want to allocate the frc data by
    # generate the % of the total group, so we can allocate the data_col
    to_allocate = to_allocate.assign(
        **{
            f"{col}_proportion": to_allocate[col] / to_allocate[f"{col}_total"]
            for col in all_allocator_cols
        }
    )
    # do the allocation for each of the data columns
    for data_col in data_and_allocator_cols:
        output_col = f"{data_col}_allocated"
        to_allocate.loc[:, output_col] = pd.NA
        # choose the first non-null option. The order of the allocate_cols will
        # determine which allocate_col will be used
        for allocator_col in data_and_allocator_cols[data_col]:
            to_allocate[output_col] = to_allocate[output_col].fillna(
                to_allocate[data_col] * to_allocate[f"{allocator_col}_proportion"]
            )
    # drop and rename all the columns in the data_and_allocator_cols dict keys and
    # return these columns in the dataframe
    to_allocate = (
        to_allocate.drop(columns=list(data_and_allocator_cols.keys()))
        .rename(
            columns={
                f"{data_col}_allocated": data_col
                for data_col in data_and_allocator_cols
            }
        )
        .drop(
            columns=list(to_allocate.filter(like="_proportion").columns)
            + [f"{c}_total" for c in all_allocator_cols]
        )
    )
    return to_allocate

#### TEST ALLOCATE

In [299]:
test = eia_gens_df[(eia_gens_df["plant_id_eia"]==3) & (eia_gens_df["report_date"].dt.year==2020)]

In [300]:
test_merge = test.merge(crosswalk_df[["plant_id_eia", "generator_id", "emissions_unit_id_epa"]], how="left", on=["plant_id_eia", "generator_id"])
test_merge = test_merge[["report_date", "plant_id_eia", "generator_id", "emissions_unit_id_epa", "capacity_mw"]]
test_merge["year"] = test_merge.report_date.dt.year
test_merge["year"] = test_merge.year.astype("Int64")
test_merge = test_merge.merge(cems_df, how="left", on=["year", "plant_id_eia", "emissions_unit_id_epa"])
test_merge["co2_mass_tons"] = test_merge.co2_mass_tons.fillna(0).astype(int)

In [301]:
print(len(test_merge))
print(len(test))

15
13


In [302]:
# If you want to allocate by something other than generator (plant or prive mover),
# make sure the capacity value is for that level of aggregation.

tt = allocate_cols(
    to_allocate=test_merge,
    by=["report_date", "plant_id_eia", "emissions_unit_id_epa"],
    data_and_allocator_cols={"co2_mass_tons": ["capacity_mw"]}  
)

In [308]:
# Now sum up to generator level 
tt.groupby(["report_date", "plant_id_eia", "generator_id"]).sum().reset_index().drop(columns=["year"])

Unnamed: 0,report_date,plant_id_eia,generator_id,capacity_mw,co2_mass_tons
0,2020-01-01,3,1,153.1,4667.0
1,2020-01-01,3,2,153.1,1697.0
2,2020-01-01,3,3,272.0,0.0
3,2020-01-01,3,4,403.7,164045.0
4,2020-01-01,3,5,788.8,3128656.0
5,2020-01-01,3,A1CT,170.1,396405.2
6,2020-01-01,3,A1CT2,170.1,430972.8
7,2020-01-01,3,A1ST,390.4,949466.1
8,2020-01-01,3,A2C1,170.1,410815.5
9,2020-01-01,3,A2C2,170.1,413754.2


#### ALLOCATE WITH ALL GENS

In [356]:
## Merge with whole CEMS!
#test = eia_gens_df[(eia_gens_df["plant_id_eia"]==3) & (eia_gens_df["report_date"].dt.year==2020)]
cems_merge1 = eia_gens_df.merge(crosswalk_df[["plant_id_eia", "generator_id", "emissions_unit_id_epa"]], how="left", on=["plant_id_eia", "generator_id"])
cems_merge1 = cems_merge1[["report_date", "plant_id_eia", "generator_id", "emissions_unit_id_epa", "capacity_mw", "technology_description"]]
cems_merge1["year"] = cems_merge1.report_date.dt.year
cems_merge1["year"] = cems_merge1.year.astype("Int64")
cems_merge2 = cems_merge1.merge(cems_df, how="left", on=["year", "plant_id_eia", "emissions_unit_id_epa"])
cems_merge2["co2_mass_tons"] = cems_merge2.co2_mass_tons.fillna(0).astype(int)

In [357]:
cems_gen_agg = allocate_cols(
    to_allocate=cems_merge2,
    by=["report_date", "plant_id_eia", "emissions_unit_id_epa"],
    data_and_allocator_cols={"co2_mass_tons": ["capacity_mw"]}
).groupby(["report_date", "plant_id_eia", "generator_id"]).sum(
).reset_index(
).drop(columns=["year"])

In [343]:
print(len(cems_gen_agg))

491469


In [346]:
bb = cems_merge1.drop_duplicates(subset=["report_date", "plant_id_eia", "generator_id"])
print(len(bb[bb["emissions_unit_id_epa"].isna()]) / len(bb) * 100)
print(len(bb[bb["emissions_unit_id_epa"].isna()]))
print(len(bb))
print("")

fossil = cems_merge1[cems_merge1["technology_description"].isin(
    ["Conventional Steam Coal",
     "Natural Gas Fired Combined Cycle",
     "Natural Gas Fired Combustion Turbine",
     "Natural Gas Steam Turbine",
     "Petroleum Liquids",
     "Natural Gas Internal Combustion Engine",
     "Municipal Solid Waste",
     "Wood/Wood Waste Biomass",
     "Coal Integrated Gasification Combined Cycle",
     "Petroleum Coke",
     "Landfill Gas",
     "Natural Gas with Compressed Air Storage",
     "Other Gases",
     "Other Waste Biomass",
     "Other Natural Gas"])]

ff = fossil.drop_duplicates(subset=["report_date", "plant_id_eia", "generator_id"])
print(len(ff[ff["emissions_unit_id_epa"].isna()]) / len(ff) * 100)
print(len(ff[ff["emissions_unit_id_epa"].isna()]))
print(len(ff))

78.90833399461614
387810
491469

68.23238873700286
219376
321513


In [360]:
non_agg.to_pickle("/Users/austensharpe/Desktop/non_agg.pkl")

In [361]:
agg.to_pickle("/Users/austensharpe/Desktop/agg.pkl")

Unnamed: 0,report_date,plant_id_eia,generator_id,capacity_mw,co2_mass_tons
0,2001-01-01,2,1,45.0,0.0
1,2001-01-01,3,1,153.1,951508.0
2,2001-01-01,3,2,153.1,902068.0
3,2001-01-01,3,3,272.0,1969314.0
4,2001-01-01,3,4,403.7,2843765.0
...,...,...,...,...,...
491464,2021-01-01,65333,785,200.0,0.0
491465,2021-01-01,65334,PLTVW,81.0,0.0
491466,2021-01-01,65335,WAPPA,171.8,0.0
491467,2021-01-01,65337,MAYBK,5.0,0.0
