# Notebook Preamble

## IPython Magic

In [None]:
%load_ext autoreload
%autoreload 3

## Notebook Imports

In [None]:
# Standard Library Imports
import logging
import os
import pathlib
import sys
from typing import List
from pathlib import Path

# 3rd Party Imports:
import numpy as np
import pandas as pd
import sqlalchemy as sa
import pyarrow as pa
import pyarrow.parquet as pq
from intake import open_catalog

# Local Imports
import pudl
from pudl.output.pudltabl import PudlTabl
from pudl.metadata.classes import Resource
from pudl.output.epacems import year_state_filter

## Set up a logger

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.handlers = [handler]

## Set up standard PUDL DB connections

In [None]:
pudl_settings = pudl.workspace.setup.get_defaults()
ferc1_engine = sa.create_engine(pudl_settings["ferc1_db"])
pudl_engine = sa.create_engine(pudl_settings["pudl_db"])

pudl_out_raw = pudl.output.pudltabl.PudlTabl(pudl_engine=pudl_engine)
pudl_out = pudl_out_raw

pudl_settings

# Re-organize PUDL Parquet Files
Re-organizing the existing PUDL parquet datasets for inclusion in the experimental catalog

In [None]:
epacems_dir = pudl_settings["parquet_dir"] + "/epacems"

emissions_categories =  {
    "so2_mass_measurement_code": pd.CategoricalDtype(),
    "co2_mass_measurement_code": pd.CategoricalDtype(),
    "nox_mass_measurement_code": pd.CategoricalDtype(),
    "nox_rate_measurement_code": pd.CategoricalDtype(),
}

## Single file, year-state row groups

In [None]:
def epacems_single_file(
    years: List[int],
    input_dir: str,
    outfile: str,
    states: List[str] = pudl.metadata.enums.EPACEMS_STATES,
) -> None:
    schema = pq.read_table(
        source=input_dir,
        filters=year_state_filter(years=[2020], states=["ID"])
    ).schema

    if Path(outfile).exists():
        os.unlink(outfile)
        
    with pq.ParquetWriter(
        where=outfile, schema=schema, compression="snappy", version="2.6",
    ) as pqwriter:
        for year in years:
            print(year, end=" ")
            for state in states:
                filters=year_state_filter(years=[year], states=[state])
                table = pq.read_table(source=input_dir, filters=filters)
                pqwriter.write_table(table)
                del table
        print("")

In [None]:
%%time
epacems_single_file(
    years=range(1995,2021),
    input_dir=epacems_dir,
    outfile="hourly_emissions_epacems.parquet",
)

## Year + State Partitioning + Row Groups

In [None]:
def epacems_multi_file(
    years: List[int],
    input_dir: str,
    output_dir: str,
    states: List[str] = pudl.metadata.enums.EPACEMS_STATES,
) -> None:
    schema = pq.read_table(
        source=input_dir,
        filters=year_state_filter(years=[2020], states=["ID"])
    ).schema

    for year in years:
        print(year, end=": ")
        for state in states:
            print(state, end=" ")
            outfile = output_dir + f"/epacems-{year}-{state}.parquet"
            if Path(outfile).exists():
                os.unlink(outfile)
            with pq.ParquetWriter(
                where=outfile, schema=schema, compression="snappy", version="2.6",
            ) as pqwriter:
                filters=year_state_filter(years=[year], states=[state])
                table = pq.read_table(source=input_dir, filters=filters)
                pqwriter.write_table(table)
                del table
        print("")

In [None]:
%%time
epacems_multi_file(
    years=range(1995, 2021),
    input_dir=epacems_dir,
    output_dir="hourly_emissions_epacems"
)

# Test Parquet Performance

In [None]:
TEST_FILTERS = year_state_filter(years=[2019, 2020], states=["CO", "TX", "ID"])
INTAKE_PATH_LOCAL = Path(os.getcwd())
INTAKE_PATH_REMOTE = "gs://catalyst.coop/intake/test"

local_single_file = str(INTAKE_PATH_LOCAL / "hourly_emissions_epacems.parquet")
local_multi_file = str(INTAKE_PATH_LOCAL / "hourly_emissions_epacems")
remote_single_file = INTAKE_PATH_REMOTE + "/hourly_emissions_epacems.parquet"
remote_multi_file = INTAKE_PATH_REMOTE + "/hourly_emissions_epacems"

pudl_catalog_path = str(INTAKE_PATH_LOCAL / "pudl-catalog.yml")

## PUDL Hive Baseline

In [None]:
%%time
pudl_hive = pd.read_parquet(
    epacems_dir,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
).astype(emissions_categories)
pudl_hive.info(show_counts=True, memory_usage="deep")

## Single File Local Direct

In [None]:
%%time
single_file_local_direct_df = pd.read_parquet(
    local_single_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
).astype(emissions_categories)
single_file_local_direct_df.info(show_counts=True, memory_usage="deep")

## Single File Remote Direct

In [None]:
%%time
single_file_remote_direct_df = pd.read_parquet(
    remote_single_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
).astype(emissions_categories)
single_file_remote_direct_df.info(show_counts=True, memory_usage="deep")

## Single File Local Intake

In [None]:
%%time
os.environ["INTAKE_PATH"] = str(intake_path_local)
pudl_cat = open_catalog(pudl_catalog_path)
single_file_local_intake = pudl_cat.epacems_one_file(
    filters=TEST_FILTERS,
    engine="pyarrow",
)
single_file_local_intake_dd = single_file_local_intake.to_dask()
single_file_local_intake_df = single_file_local_intake_dd.compute()
single_file_local_intake_df.info(show_counts=True, memory_usage="deep")

## Single File Remote Intake

In [None]:
%%time
os.environ["INTAKE_PATH"] = intake_path_remote
pudl_cat = open_catalog(pudl_catalog_path)
single_file_remote_intake = pudl_cat.epacems_one_file(
    filters=TEST_FILTERS,
    engine="pyarrow",
)
single_file_remote_intake_dd = single_file_remote_intake.to_dask()
single_file_remote_intake_df = single_file_remote_intake_dd.compute()
single_file_remote_intake_df.info(show_counts=True, memory_usage="deep")

## Multi File Local Direct

In [None]:
%%time
multi_file_local_direct_df = pd.read_parquet(
    local_multi_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
).astype(emissions_categories)
multi_file_local_direct_df.info(show_counts=True, memory_usage="deep")

## Multi File Remote Direct

In [None]:
%%time
multi_file_remote_direct_df = pd.read_parquet(
    remote_multi_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
).astype(emissions_categories)
multi_file_remote_direct_df.info(show_counts=True, memory_usage="deep")

## Multi File Local Intake

In [None]:
%%time
os.environ["INTAKE_PATH"] = str(intake_path_local)
pudl_cat = open_catalog(pudl_catalog_path)
multi_file_local_intake = pudl_cat.epacems_multi_file(
    filters=TEST_FILTERS,
    engine="pyarrow",
)
multi_file_local_intake_dd = multi_file_local_intake.to_dask().astype(emissions_categories)
multi_file_local_intake_df = multi_file_local_intake_dd.compute()
multi_file_local_intake_df.info(show_counts=True, memory_usage="deep")

## Multi File Remote Intake

In [None]:
%%time
os.environ["INTAKE_PATH"] = intake_path_remote
pudl_cat = open_catalog(pudl_catalog_path)
multi_file_remote_intake = pudl_cat.epacems_multi_file(
    filters=TEST_FILTERS,
    engine="pyarrow",
)
multi_file_remote_intake_dd = multi_file_remote_intake.to_dask().astype(emissions_categories)
multi_file_remote_intake_df = multi_file_remote_intake_dd.compute()
multi_file_remote_intake_df.info(show_counts=True, memory_usage="deep")