# Notebook Preamble

## IPython Magic

In [1]:
%load_ext autoreload
%autoreload 3

## Notebook Imports

In [2]:
# Standard Library Imports
import itertools
import logging
import os
import pathlib
import sys
import time
from dataclasses import dataclass, field
from typing import Dict, List
from pathlib import Path

# 3rd Party Imports:
import pandas as pd
from intake import open_catalog
from pudl.output.epacems import year_state_filter

## Set up a logger

In [3]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.handlers = [handler]

# Potential benefits of Intake catalogs:
**Expose metadata:** The Intake catalog doesn't contain any column-level metadata, but I think it could. This would allow a user to see what columns were available, what their types were, and read their descriptions before querying the large dataset.

**Local data caching:** Local file caching is not available. We would expect this to make using many small files more efficient for repeated access, since they would each only need to be transmitted over the network once. However, `fsspec` based file caching hasn't yet been implemented in the `intake-parquet` library.

**Data packaging:** The catalog can be packaged and versioned using conda to manage its dependencies on other software packages and ensure compatibility. With remote access or automatic local file caching, the user also doesn't need to think about where the data is being stored, or putting it in the "right place" -- Intake would manage that.

**Uniform API:** All the data sources of a given type (parquet, SQL) would have the same interface, reducing the number of things a user needs to remember to access the data.

**Decoupling data storage location:** As with DNS, we can change / update the location where the data is being stored without impacting the user directly, since the catalog acts as a decoupling reference.

## Intake References
* [Intake Documentations](https://intake.readthedocs.io/en/latest/start.html)
* [Intake Examples](https://github.com/intake/intake-examples)
* [CarbonPlan Data Catalogs](https://github.com/carbonplan/data)
* [AnacondaCon Presentation Video](https://www.youtube.com/watch?v=oyZJrROQzUs)

# Test Intake & Parquet Functionality & Performance

This notebook demonstrates several different ways of organizing and accessing the same EPA CEMS data:
* Local storage on disk vs. remote storage in Google Cloud Storage buckets
* Directly accessing the data via `pandas.read_parquet()` vs. an Intake catalog.
* Using one big Parquet file for all data vs. separate small files for each combination of state & year.

In [11]:
TEST_YEARS = [2019, 2020]
TEST_STATES = ["ID", "CO", "TX"]

In [None]:
def year_state_filter(years=(), states=()):
    """
    Create filters to read given years and states from partitioned parquet dataset.

    A subset of an Apache Parquet dataset can be read in more efficiently if files
    which don't need to be queried are avoideed. Some datasets are partitioned based
    on the values of columns to make this easier. The EPA CEMS dataset which we
    publish is partitioned by state and report year.

    However, the way the filters are specified can be unintuitive. They use DNF
    (disjunctive normal form) See this blog post for more details:

    https://blog.datasyndrome.com/python-and-parquet-performance-e71da65269ce

    This function takes a set of years, and a set of states, and returns a list of lists
    of tuples, appropriate for use with the read_parquet() methods of pandas and dask
    dataframes. The filter will include all combinations of the specified years and
    states. E.g. if years=(2018, 2019) and states=("CA", "CO") then the filter would
    result in getting 2018 and 2019 data for CO, as well as 2018 and 2019 data for CA.

    Args:
        years (iterable): 4-digit integers indicating the years of data you would like
            to read. By default it includes all years.
        states (iterable): 2-letter state abbreviations indicating what states you would
            like to include. By default it includes all states.

    Returns:
        list: A list of lists of tuples, suitable for use as a filter in the
        read_parquet method of pandas and dask dataframes.

    """
    year_filters = [("year", "=", year) for year in years]
    state_filters = [("state", "=", state.upper()) for state in states]

    if states and not years:
        filters = [
            [
                tuple(x),
            ]
            for x in state_filters
        ]
    elif years and not states:
        filters = [
            [
                tuple(x),
            ]
            for x in year_filters
        ]
    elif years and states:
        filters = [list(x) for x in product(year_filters, state_filters)]
    else:
        filters = None

    return filters

In [15]:
@dataclass
class TestEpaCemsParquet(object):
    table_name: str = "hourly_emissions_epacems"
    gcs_base: str = "gcs://catalyst.coop/intake/test"
    https_base: str = "https://storage.googleapis.com/catalyst.coop/intake/test"
    local_base: str = os.getcwd() + "/"
    pudl_catalog_yml: str = "../src/catalog/pudl-catalog.yml"
    
    def __post_init__(self):
        self.base_paths = {
            "gcs": self.gcs_base,
            "https": self.https_base,
            "local": self.local_base,
        }

    def urlpath(self, protocol: str, partition=False):
        if partition:
            assert protocol != "https"

        urlpath = self.base_paths[protocol] + "/" + self.table_name
        if not partition:
            urlpath += ".parquet"
        return urlpath
    
    def direct(self, protocol, partition, years, states):
        filters = year_state_filter(years=years, states=states)
        start_time = time.time()
        urlpath =  self.urlpath(protocol=protocol, partition=partition)
        logger.info(f"read_parquet, {protocol=}, {partition=}, {years=}, {states=}:")
        df = pd.read_parquet(urlpath, filters=filters)
        elapsed_time = time.time() - start_time
        logger.info(f"    elapsed time: {elapsed_time:.2f}s")
        return df
    
    def intake(self, protocol, partition, years, states):
        filters = year_state_filter(years=years, states=states)
        os.environ["PUDL_INTAKE_PATH"] = self.base_paths[protocol]
        pudl_cat = open_catalog(self.pudl_catalog_yml)
        if partition:
            src = pudl_cat["hourly_emissions_epacems_partitioned"](filters=filters)
        else:
            src = pudl_cat["hourly_emissions_epacems"](filters=filters)
        start_time = time.time()
        logger.info(f"intake, {protocol=}, {partition=}, {years=}, {states=}:")
        df = src.to_dask().compute()
        elapsed_time = time.time() - start_time
        logger.info(f"    elapsed time: {elapsed_time:.2f}s")
        return df
    
    def test_direct(self, years, states, expected_df):
        expected_df = self.direct(protocol="local", partition=False, years=years, states=states)
        direct_kwargs = [
            dict(protocol="local", partition=True),
            dict(protocol="gcs", partition=False),
            dict(protocol="gcs", partition=True),
            # Tries to download the entire 4.7GB file and runs out of memory
            # dict(protocol="https", partition=False),
            # https *must* refer to individual files. Can't list a directory or match patterns.
            # dict(protocol="https", partition=True),
        ]       
        for kwargs in direct_kwargs:
            kwargs.update(dict(years=years, states=states))
            test_df = self.direct(**kwargs)
            pd.testing.assert_frame_equal(expected_df, test_df)

    def test_intake(self, years, states):
        expected_df = self.direct(protocol="local", partition=False, years=years, states=states)
        intake_kwargs = [
            dict(protocol="local", partition=False),
            dict(protocol="local", partition=True),
            dict(protocol="gcs", partition=False),
            dict(protocol="gcs", partition=True),
            # Tries to download the entire 4.7GB file and runs out of memory
            # dict(protocol="https", partition=False),
            # Results in a 403 Forbidden error on parquet file parent directory
            #dict(protocol="https", partition=True),
        ]
        for kwargs in intake_kwargs:
            kwargs.update(dict(years=years, states=states))
            test_df = self.intake(**kwargs)
            logger.info("Verifying that dataframe matches expected output.")
            pd.testing.assert_frame_equal(expected_df, test_df)

    def test_all(self, years, states):
        self.test_direct(years, states)
        self.test_intake(years, states)


In [13]:
os.environ["PUDL_INTAKE_CACHE"] = str(Path.cwd() / "cache")

In [14]:
dude = TestEpaCemsParquet()
dude.test_intake(years=TEST_YEARS, states=TEST_STATES)

read_parquet, protocol='local', partition=False, years=[2019, 2020], states=['ID', 'CO', 'TX']:
    elapsed time: 2.43s
intake, protocol='local', partition=False, years=[2019, 2020], states=['ID', 'CO', 'TX']:
    elapsed time: 4.74s
intake, protocol='local', partition=True, years=[2019, 2020], states=['ID', 'CO', 'TX']:


KeyboardInterrupt: 

In [None]:
assert False

## Examine the Catalog

In [17]:
pudl_cat = open_catalog(dude.pudl_catalog_yml)
list(pudl_cat)

['hourly_emissions_epacems', 'hourly_emissions_epacems_partitioned']

In [18]:
pudl_cat.metadata

{'creator': {'title': 'Catalyst Cooperative',
  'email': 'pudl@catalyst.coop',
  'path': 'https://catalyst.coop'}}

### Source level description
* Basically from the YAML file, but with some extra things, `container`, `direct_access`, `user_parameters`

In [19]:
pudl_cat.hourly_emissions_epacems.describe()

{'name': 'hourly_emissions_epacems',
 'container': 'dataframe',
 'plugin': ['parquet'],
 'driver': ['parquet'],
 'description': 'Hourly pollution emissions and plant operational data reported via Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75. Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and gross power output. Hourly values reported by US EIA ORISPL code and emissions unit (smokestack) ID.',
 'direct_access': 'forbid',
 'user_parameters': [],
 'metadata': {'title': 'Continuous Emissions Monitoring System (CEMS) Hourly Data',
  'type': 'application/parquet',
  'provider': 'US Environmental Protection Agency Air Markets Program',
  'path': 'https://ampd.epa.gov/ampd',
  'license': {'name': 'CC-BY-4.0',
   'title': 'Creative Commons Attribution 4.0',
   'path': 'https://creativecommons.org/licenses/by/4.0'}},
 'args': {'engine': 'pyarrow',
  'urlpath': 'simplecache::{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet',
  

### Source internals
* Categorical values showing up as integers.
* String values showing up as objects.
* No length in the shape, but 19 columns.
* `npartitions` is apparently referring to file, not row-group based partitions.

In [20]:
pudl_cat.hourly_emissions_epacems.discover()

{'dtype': {'plant_id_eia': 'int32',
  'unitid': 'object',
  'operating_datetime_utc': 'datetime64[ns, UTC]',
  'year': 'int32',
  'state': 'int64',
  'facility_id': 'int32',
  'unit_id_epa': 'object',
  'operating_time_hours': 'float32',
  'gross_load_mw': 'float32',
  'heat_content_mmbtu': 'float32',
  'steam_load_1000_lbs': 'float32',
  'so2_mass_lbs': 'float32',
  'so2_mass_measurement_code': 'int64',
  'nox_rate_lbs_mmbtu': 'float32',
  'nox_rate_measurement_code': 'int64',
  'nox_mass_lbs': 'float32',
  'nox_mass_measurement_code': 'int64',
  'co2_mass_tons': 'float32',
  'co2_mass_measurement_code': 'int64'},
 'shape': (None, 19),
 'npartitions': 1,
 'metadata': {'title': 'Continuous Emissions Monitoring System (CEMS) Hourly Data',
  'type': 'application/parquet',
  'provider': 'US Environmental Protection Agency Air Markets Program',
  'path': 'https://ampd.epa.gov/ampd',
  'license': {'name': 'CC-BY-4.0',
   'title': 'Creative Commons Attribution 4.0',
   'path': 'https://creat

### The other source internals
* Here we have nullable ints, but they're 64-bit?
* Categories show up as `category` not integers.
* Strings show up as `string` not `object`
* `npartitions` is referring to the separate files. How do we get information about how the partitions are structured in here?
* Somehow in `shape` we've lost a couple of columns! There's no state or year. Probably this is because of how I converted from the old hive partitioned version of epacems.

In [21]:
pudl_cat.hourly_emissions_epacems_partitioned.discover()

{'dtype': {'plant_id_eia': 'int32',
  'unitid': 'object',
  'operating_datetime_utc': 'datetime64[ns, UTC]',
  'year': 'int32',
  'state': 'int64',
  'facility_id': 'int32',
  'unit_id_epa': 'object',
  'operating_time_hours': 'float32',
  'gross_load_mw': 'float32',
  'heat_content_mmbtu': 'float32',
  'steam_load_1000_lbs': 'float32',
  'so2_mass_lbs': 'float32',
  'so2_mass_measurement_code': 'int64',
  'nox_rate_lbs_mmbtu': 'float32',
  'nox_rate_measurement_code': 'int64',
  'nox_mass_lbs': 'float32',
  'nox_mass_measurement_code': 'int64',
  'co2_mass_tons': 'float32',
  'co2_mass_measurement_code': 'int64'},
 'shape': (None, 19),
 'npartitions': 1274,
 'metadata': {'title': 'Continuous Emissions Monitoring System (CEMS) Hourly Data',
  'type': 'application/parquet',
  'provider': 'US Environmental Protection Agency Air Markets Program',
  'path': 'https://ampd.epa.gov/ampd',
  'license': {'name': 'CC-BY-4.0',
   'title': 'Creative Commons Attribution 4.0',
   'path': 'https://cr

## PUDL Baseline
Read the test data from your local EPA CEMS outputs directly.
* On an SSD this should take less than 10 seconds.
* The only `string` type columns should be `unitid` and `unit_id_epa`
* The dataframe should take about 1.4 GB of memory and have ~8M rows.

In [22]:
%%time
pudl_epacems = dude.direct(protocol="local", partition=False, years=TEST_YEARS, states=TEST_STATES)

read_parquet, protocol='local', partition=False, years=[2019, 2020], states=['ID', 'CO', 'TX']:
    elapsed time: 2.57s
CPU times: user 2.66 s, sys: 1.34 s, total: 3.99 s
Wall time: 2.57 s


In [23]:
display(pudl_epacems.info(show_counts=True, memory_usage="deep"))

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8006424 entries, 0 to 8006423
Data columns (total 19 columns):
 #   Column                     Non-Null Count    Dtype              
---  ------                     --------------    -----              
 0   plant_id_eia               8006424 non-null  int32              
 1   unitid                     8006424 non-null  object             
 2   operating_datetime_utc     8006424 non-null  datetime64[ns, UTC]
 3   year                       8006424 non-null  int32              
 4   state                      8006424 non-null  category           
 5   facility_id                8006424 non-null  int32              
 6   unit_id_epa                8006424 non-null  object             
 7   operating_time_hours       8003928 non-null  float32            
 8   gross_load_mw              8006424 non-null  float32            
 9   heat_content_mmbtu         8006424 non-null  float32            
 10  steam_load_1000_lbs        33252 non-null 

None

## Single File Local
For the single file local tests, download [this file](https://storage.googleapis.com/catalyst.coop/intake/test/hourly_emissions_epacems.parquet) into the same directory as this notebook.

### Direct access with `read_parquet()`
* This takes 3-4 seconds

In [None]:
%%time
df = pd.read_parquet(
    local_single_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
)

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

### Via Intake Catalog
* This takes 3-4 seconds

In [None]:
%%time
os.environ["INTAKE_PATH"] = str(INTAKE_PATH_LOCAL)
pudl_cat = open_catalog(pudl_catalog_path)
source = pudl_cat.epacems_one_file(filters=TEST_FILTERS)
dd = source.to_dask()
df = dd.compute()

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

## Single File Remote

### Direct access with `read_parquet()`
* Using the authenticated `gs://` URL it takes **20 seconds**
* Using the public `https://` URL this takes **10+ minutes**

In [None]:
%%time
df = pd.read_parquet(
    remote_single_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
)

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

### Via Intake Catalog
* With `gs://` URL this takes **1 minute**
* With `https://` URL it downloads a huge amount of data and then times out.

In [None]:
%%time
os.environ["INTAKE_PATH"] = INTAKE_PATH_REMOTE
pudl_cat = open_catalog(pudl_catalog_path)
source = pudl_cat.epacems_one_file(filters=TEST_FILTERS)
dd = source.to_dask()
df = dd.compute()

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

## Multi File Local

For the multi-file local tests download [this tarball](https://storage.googleapis.com/catalyst.coop/intake/test/hourly_emissions_epacems.tar) and extract it in the same directory as this notebook.

### Direct access with `read_parquet()`
* This takes 5 seconds, and results in an excessively large 3GB dataframe because I generated these parquet files before fixing the string-to-categorical type issue.

In [None]:
%%time
df = pd.read_parquet(
    local_multi_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
)

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

### Via Intake Catalog
* This takes about 15 seconds, and results in the 3GB dataframe as above.

In [None]:
%%time
os.environ["INTAKE_PATH"] = str(INTAKE_PATH_LOCAL)
pudl_cat = open_catalog(pudl_catalog_path)
source = pudl_cat.epacems_multi_file(
    filters=TEST_FILTERS,
    engine="pyarrow",
)
dd = source.to_dask()
df = dd.compute()

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

## Multi File Remote

### Direct access with `read_parquet()`
* With the `gs://` URL this takes **1 minute** and downloads minimal data.
* With the `https://` URL this results in a 403 Forbidden error.

In [None]:
%%time
df = pd.read_parquet(
    remote_multi_file,
    engine="pyarrow",
    filters=TEST_FILTERS,
    use_nullable_dtypes=True,
)

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

### Via Intake Catalog
* With the `gs://` URL this takes **3 minutes** and downloads a little bit of data across the whole time.
* With the `https://` URL this results in a 403 Forbidden error.

In [None]:
%%time
os.environ["INTAKE_PATH"] = INTAKE_PATH_REMOTE
pudl_cat = open_catalog(pudl_catalog_path)
source = pudl_cat.epacems_multi_file(
    filters=TEST_FILTERS,
    engine="pyarrow",
)
dd = source.to_dask()
df = dd.compute()

In [None]:
display(df.info(show_counts=True, memory_usage="deep"))
del df

# Notes
* Unsurprisingly, local access is blazing fast regardless of whether it's a single file or many, and while the Intake catalog access takes around 3x as long, it seems fast enough to be plenty usable.
* Remote performance using a single file, the `gs://` protocol, and `read_parquet()` was shockingly fast. It took less than 10x as long as direct local access.
* Remote performance over `https://` was painfully slow, to the point of being unusable in all uses of Intake. It also seemed to be transmitting far, far more data than in the `gs://` case.
* Basically none of the `https://` cases were usable. The only one that completed took 10 minutes.
* The only remote Intake catalog case that worked was the single-file `gs://`, which (as with the local catalogs) took about 3x as long as the `read_parquet()` case.
* Over `https://` it seems like we can't use directories or wildcards -- we have to enumerate each filename specifically.
* Some of the issues here have to be network speed, but I have 50-100Mbit download speeds, and the amount of data being transmitted varied widely between the different cases.
* Still some data type issues happening in all of the Intake cases. Strings get turned into objects.

# Questions
* Can non-authenticated users access publicly readable data using `gs://` URLs?
* How do we add column-level metadata to the catalog appropriately? Can we get the embedded descriptions to show up?
* How do we add information about what's in the different partitions (i.e. split by year and state, allowable values)
* Why are we getting jumbled nullable/non-nullable, ints/categories, strings/objects in the types?