## Install PUDL
* Until we get our custom Docker image built, PUDL needs to be installed in your user environment each session.
* If you are using this notebook on the Catalyst JupyterHub, and this is the first notebook you've used this session, then uncomment the commands in the following cell and run it before anything else.

In [None]:
#!conda install --yes --quiet python-snappy
#!pip install --upgrade pip
#!pip install --quiet git+https://github.com/catalyst-cooperative/pudl.git@jupyterhub-beta
#!cp ~/shared/shared-pudl.yml ~/.pudl.yml

# An Introduction to Working with EPA CEMS data

CEMS or <a href='https://www.epa.gov/emc/emc-continuous-emission-monitoring-systems'>**Continusous Emissions Monitoring Systems**</a> are used to track power plant's compliance with EPA emission standards. Among the data are hourly measurements of SO2, CO2, and NOx emissions associated with a given plant. The EPA's <a href='https://www.epa.gov/airmarkets'>Clean Air Markets Division</a> has collected CEMS data stretching back to 1990 and publicized it in their <a href='https://ampd.epa.gov/ampd/'>data portal</a>. Combinging the CEMS data with EIA and FERC data provides access to greater and more specific information about utilities and generation facilities. This notebook provides a glimpse into the analysis potential of CEMS data usage and integration.

**NOTE**: This Notebook presuposes access to the parquet files in which the CEMS data is stored.

# PUDL Notebook Setup

The following packages enable interaction with the CEMS dataset through pudl.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# Standard libraries
import logging
import pathlib
import sys

# 3rd party libraries
import geopandas as gpd
import dask.dataframe as dd
import numpy as np
import pandas as pd
import sqlalchemy as sa

# Local libraries
import pudl

In [None]:
logger=logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
logger.handlers = [handler]

In [None]:
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
sns.set()
%matplotlib inline

In [None]:
pudl_settings = pudl.workspace.setup.get_defaults()
#display(pudl_settings)

ferc1_engine = sa.create_engine(pudl_settings['ferc1_db'])
#display(ferc1_engine)

pudl_engine = sa.create_engine(pudl_settings['pudl_db'])
#display(pudl_engine)

#pudl_engine.table_names()
pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine, freq='AS') #annual frequency

# Bigger Data in Pandas

The CEMS dataset is enormous! It contains hourly operations and emissions data for thousands of power plants between 1995 and 2019. The full dataset is close to a billion rows and takes up 100 GB of space uncompressed, which means it will not all fit in memory on your laptop at the same time. One of the best tools for dealing with larger-than-memory data analysis in Python is the [Dask](https://dask.org) library, which extends Pandas, while maintaining a pretty similiar interface. We'll be using some of Dask's features below to work with this larger dataset.

Additional resources you might want to check out:
* [Scaling Pandas](https://tomaugspurger.github.io/modern-8-scaling.html) is a brief introduction to working with larger datasets using Dask.
* [How to learn Dask in 2020](https://coiled.io/blog/how-to-learn-dask-in-2020/) is a more extensive collection of self-guided educational resources.

We store the EPA CEMS data in [Apache Parquet files](https://parquet.apache.org/), which are optimized for fast reading of individual columns of tabular data. We partition the dataset by state and year, to make it easy to read in only the chunks that you actually need.

One of the main features of Dask is "lazy execution" -- it doesn't read data or do any computation until it has to, or you explicitly tell it to. Instead, it allows you to build of a series of computational instructions, and then it intelligently looks at those instructions and executes them so as to minimize memory utilization and/or parallelize the tasks if possible.

For example, the following cell will "read" in the full EPA CEMS dataset almost instantly... but that's because no data has been read off of disk yet. The instructions for reading off of the disk have been added to the list of things the Dask dataframe will do when you actually as it for a result.

In [None]:
epacems_path = pathlib.Path(pudl_settings["parquet_dir"]) / "epacems"
epacems_dd = dd.read_parquet(epacems_path)

However, some information about the dataframe is available at this point -- including the names of the columns and their data types -- because that information is stored in the Parquet file metadata:

In [None]:
epacems_dd.dtypes

If you want to know anything about the contents of the data itself, some data will have to be read in, and some computation performed. Before any aggregations or calculations or selections have happened, you can ask how many rows there are in the whole parquet dataset, and it will be returned relatively quickly, since that information is also stored in the metadata. This would behave differently later on, since after you've told the dataframe to do some computation, you'd be asking for the length of the **result of that computation** not the original inputs.

In [None]:
%%time
len(epacems_dd)

# Analyzing a Subset of EPA CEMS Data

One strategy for working with a large dataset is to only look at small subsets of it in any given analysis.

You can point Dask at a large parquet dataset like EPA CEMS and tell it to read in only a subset of the columns, and the select a subset of the rows based on some criteria of interest. So long as the collection of rows and columns you've specified end up being smaller than the available memory on you computer, this operation will succeed. However, depending on how fast your disk is, it may still take longer than you would like.

## Simple Filtering
The following statement will tell Dask where the dataset is, and describe a subset of the data that should be returned as a pandas dataframe (only data about power plants in Colorado in 2019). This should be small enough to fit in memory, but it won't actually read the data in yet:

In [None]:
slow_epacems_dd = (
    dd.read_parquet(epacems_path)
    .query("year==2019")
    .query("state=='CO'")
)

If we're satisfied that this is the data we want, then we can go ahead and ask Dask to read it in and give us a pandas dataframe back. This will allows us to search through a billion rows of data on disk and select only the small subset that we are interested in without blowing up our memory usage. This is very flexible -- you can select on whatever criteria you want -- but it's not very efficient. It will still take maybe 5 minutes on a laptop with a fast SSD (uncomment it if you want to actually give this a try...):

In [None]:
#%%time
#slow_epacems_df = slow_epacems_dd.compute()

## Filter using Parquet partitions
To make operations like this faster and easier, we have pre-partitioned the EPA CEMS Parquet dataset by `state` and `year` -- there's a separate parquet file that contains each state's data for each year, in a directory structure that many tools for working with Parquet files understand. This means you can tell Dask that there are only certain files it needs to look in for the data you're interested in. Rather than scanning through 100 GB of data across 1300 different files, it can just read data from the single file that contains the 2019 data for Colorado. On a computer with a fast SSD, this should takes about a fraction of a second:

In [None]:
%%time
epacems_dd = dd.read_parquet(
    epacems_path,
    filters=[[("state", "=", "CO"), ("year", "=", 2019)]],
)
epacems_df = epacems_dd.compute()
epacems_df.sample(10)

### A Partition Filtering Shortcut
Unfortunately, the nested list of lists of boolean predicates used to specify the filters (known as [disjunctive normal form](https://en.wikipedia.org/wiki/Disjunctive_normal_form) is a little unwieldy, so we have some helper functions that can construct them for you. `pudl.output.epacems.year_state_filter()` will give you a filter that selects the data for all combinations of the years and states you specify:

In [None]:
demo_filter = pudl.output.epacems.year_state_filter(years=[2018, 2019], states=["CO", "CA"])
demo_filter

In [None]:
%%time
epacems_dd = dd.read_parquet(
    epacems_path,
    filters=demo_filter
)
epacems_df = epacems_dd.compute()
epacems_df.sample(10)

## Limiting columns
Often you will only be interested in a certain subset of the columns stored in a parquet dataset. Parquet files are organized internally to make it very efficient to only select certain columns, and the `read_parquet()` method makes this easy. Say you're only interested in power outputs, fuel consumed, and CO2 emissions, but not traditional air pollutants:

In [None]:
%%time
ghg_cols = [
    "state",
    "year",
    "operating_datetime_utc",
    "plant_id_eia",
    "facility_id",
    "unit_id_epa",
    "unitid",
    "operating_time_hours",
    "gross_load_mw",
    "heat_content_mmbtu",
    "co2_mass_tons",
]
epacems_dd = dd.read_parquet(
    epacems_path,
    filters=demo_filter,
    columns=ghg_cols,
)
epacems_df = epacems_dd.compute()
epacems_df.sample(10)

## Combining these selection tools
We can combine these methods to select a small subset of the large dataset very quickly. Let's just load data related to Xcel Energy's Comanche coal plant in Colorado. Since the EPA CEMS data only contains the EIA Plant ID, we need to look that up first.

In [None]:
comanche_colorado = (
    pudl_out.plants_eia860()
    .query("plant_name_eia=='Comanche'")
    .query("state=='CO'")
    .loc[:,["report_date", "plant_id_eia", "plant_name_eia", "utility_id_eia", "city", "state"]]
    .merge(pudl_out.utils_eia860()[["utility_id_eia", "utility_name_eia", "report_date"]])
)
comanche_colorado

Based on the above, we can see that Comanche has `plant_id_eia == 470` and we already know that it's in Colorado. So let's load all the years of data from Colorado, but only for Comanche, and calculate these additional values
* Gross Generation (MWh)
* Heat Rate (mmBTU / MWh)
* Gross CO2 intensity (tons / MWh)

In [None]:
%%time
only_colorado = pudl.output.epacems.year_state_filter(states=["CO"])
comanche_dd = (
    dd.read_parquet(
        epacems_path,
        filters=only_colorado,
        columns=ghg_cols,
    )
    .query("plant_id_eia==470")
)
comanche_df = (
    comanche_dd.compute()
    .assign(
        gross_generation_mwh=lambda x: x.operating_time_hours * x.gross_load_mw,
        heat_rate_mmbtu_mwh=lambda x: x.heat_content_mmbtu / x.gross_generation_mwh,
        gross_co2_intensity=lambda x: x.co2_mass_tons / x.gross_generation_mwh
    )
)
comanche_df.sample(10)

This has selected almost 500,000 rows and 50 MB of data out of ~1 billion rows and ~100 GB in a few seconds:

In [None]:
comanche_df.info(memory_usage="deep")

## Visualizing Plant Operations Through Time
Now that we've got a managable dataframe, we can visualize what's inside it! Here's the heat content of the fuel consumed each hour for the last 25 years in the three Comanche coal units.

### Fuel Consumption

In [None]:
unit_ids = comanche_df.unitid.unique()
fig, axs = plt.subplots(nrows=len(unit_ids), ncols=1, sharex=True, figsize=(20,15))
for n, unitid in enumerate(comanche_df.unitid.unique()):
    axs[n].scatter(
        comanche_df.query("unitid==@unitid").operating_datetime_utc,
        comanche_df.query("unitid==@unitid").heat_content_mmbtu,
        s=0.2, alpha=0.1,
    )
    axs[n].set_ylim(-100,10_000)
    axs[n].set_title(f"Comanche Unit {unitid}")
    axs[n].set_ylabel("Fuel consumed [mmbtu / hr]")
plt.show();

### Heat Rate

In [None]:
unit_ids = comanche_df.unitid.unique()
fig, axs = plt.subplots(nrows=len(unit_ids), ncols=1, sharex=True, figsize=(20,15))
for n, unitid in enumerate(comanche_df.unitid.unique()):
    axs[n].scatter(
        comanche_df.query("unitid==@unitid").operating_datetime_utc,
        comanche_df.query("unitid==@unitid").heat_rate_mmbtu_mwh,
        s=0.2, alpha=0.1,
    )
    axs[n].set_ylim(-1,20)
    axs[n].set_title(f"Comanche Unit {unitid}")
    axs[n].set_ylabel("Heat Rate [mmBTU / MWh]")
plt.show();

### Carbon Intensity

In [None]:
unit_ids = comanche_df.unitid.unique()
fig, axs = plt.subplots(nrows=len(unit_ids), ncols=1, sharex=True, figsize=(20,15))
for n, unitid in enumerate(comanche_df.unitid.unique()):
    axs[n].scatter(
        comanche_df.query("unitid==@unitid").operating_datetime_utc,
        comanche_df.query("unitid==@unitid").gross_co2_intensity,
        s=0.2, alpha=0.1,
    )
    axs[n].set_ylim(0.5,1.5)
    axs[n].set_title(f"Comanche Unit {unitid}")
    axs[n].set_ylabel("Gross CO2 Intensity [tons / MWh]")
plt.show();

### Gross Load

In [None]:
unit_ids = comanche_df.unitid.unique()
fig, axs = plt.subplots(nrows=len(unit_ids), ncols=1, sharex=True, figsize=(20,15))
for n, unitid in enumerate(comanche_df.unitid.unique()):
    axs[n].scatter(
        comanche_df.query("unitid==@unitid").operating_datetime_utc,
        comanche_df.query("unitid==@unitid").gross_load_mw,
        s=0.2, alpha=0.1,
    )
    axs[n].set_ylim(-100,1000)
    axs[n].set_title(f"Comanche Unit {unitid}")
    axs[n].set_ylabel("Gross Load [MW]")
plt.show();

# Aggregating / Downsampling EPA CEMS Data

Rather than selecting a small subset of a large dataset at its full resolution, we can aggregate records to reduce large datasets to a managable size. Now that you know what's available, you can pick the columns you'd like to work with, and aggregate rows if necessary. 

**Note** that in CEMS the `state` and `measurement_code` columns are categorical datatypes, meaning that they will overwhelm your computer's memory if included in the list of columns you'd like to groupby. In pandas, this is solved by including the statement `observed=True` in the groupby, but with Dask we'll solve this by changing the datatype to string. As mentioned previously, the dataset is very large. If the Dask dataframe you construct is too similar to the original dataset -- imagine the example below without the `groupby` -- the client will be unable to load it in pandas. The dataset below should load in a couple of minutes.

## Comparing Plant-level Emissions for Two Years

In [None]:
%%time
# Prepare CEMS data

# A list of the columns you'd like to include in your analysis
my_cols = [
    'year',
    'plant_id_eia', 
    'unitid',
    'so2_mass_lbs',
    'nox_mass_lbs',
    'co2_mass_tons'
]

# Select emissions data are grouped by state, plant_id and unit_id
# Remember to change the datatype for 'state' from category to string
my_cems_dd = (
    dd.read_parquet(epacems_path, columns=my_cols)
    .astype({
        'year': int,
    })
    .groupby(['year', 'plant_id_eia'])[[
        'so2_mass_lbs',
        'nox_mass_lbs',
        'co2_mass_tons'
    ]]
    .sum()
).reset_index()
cems_byplant = my_cems_dd.compute()

In [None]:
# Prepare EIA data for integration with CEMS
plants_df = (
    pudl_out.plants_eia860()
    .loc[:, ["plant_id_eia", "report_date", "plant_name_eia", "state", "longitude", "latitude" ]]
    .assign(year=lambda x: x.report_date.dt.year)
    .drop("report_date", axis="columns")
    .pipe(pudl.helpers.oob_to_nan, ["latitude"], lb=-90.0, ub=90.0)
    .pipe(pudl.helpers.oob_to_nan, ["longitude"], lb=-180.0, ub=180.0)
    .dropna()
    .query("state not in ('AK', 'HI')")
    .query("latitude < 50.0")
)

# Merge EIA plant data with CEMS aggregated plant data
merged_cems_df = (
    plants_df.merge(
        cems_byplant,
        on=["year", "plant_id_eia"],
        how="inner",
    )
)

# Make a geodataframe for mapping out of the merged plant data
cems_gdf = (
    gpd.GeoDataFrame(
        merged_cems_df,
        geometry=gpd.points_from_xy(merged_cems_df.longitude, merged_cems_df.latitude),
        crs="EPSG:4326",  # Geographic coordinates: Latitude / Longitude in degrees.
    )
    .drop(["latitude", "longitude"], axis="columns")
    .to_crs("EPSG:3857")
)
cems_gdf.sample(5)

In [None]:
cems_gdf[cems_gdf['year']==2009]

In [None]:
def map_point_source_emissions(gdf, year, states=None, scaling=1e4, alpha=0.25):
    """Map of emissions by plant coordinates in proportion to their annual amount."""
    import contextily as ctx
    gdf_to_plot = gdf.query("year==@year")
    if states is not None:
        gdf_to_plot = gdf_to_plot[gdf_to_plot.state.isin(states)]
        
    ax = gdf_to_plot.plot(
        figsize=(40,20),
        color="black",
        alpha=alpha,
        markersize=gdf_to_plot.co2_mass_tons / scaling,
    )
    ax.set_title(f'CO2 Emissions {year}', fontsize=50)
    ax.set_axis_off()
    ctx.add_basemap(ax)
    
    return ax

map_point_source_emissions(cems_gdf, 2009, states=None, scaling=1e4, alpha=0.5)
map_point_source_emissions(cems_gdf, 2019, states=None, scaling=1e4, alpha=0.5);

Now lets take a look at the difference. Of the plants that opperated both in 2009 and 2019, the red shows where there was an increase in CO2 emissions and the green shows where there was a decrease in CO2 emissions.

In [None]:
def calculate_emissions_difference(df):
    """Calculate the difference in emission between two years"""
    df['year_diff'] = (
        df.groupby('plant_id_eia')['co2_mass_tons']
        .diff()
    )
    df = df.dropna()

    #pos_df = df[df['co2_mass_tons']>0].copy()
    pos_df = df.loc[df.co2_mass_tons>=0]
    neg_df = (
        df.loc[df.year_diff<0]
        .assign(year_diff=lambda x: abs(x.year_diff))
    )
    
    return pos_df, neg_df

def map_emissions_difference(gdf, year_old, year_new, states=None, scaling=1e4, alpha=0.25):
    """Map of emissions by plant coordinates in proportion to their annual amount."""
    import contextily as ctx
    gdf_to_plot = gdf.query("year==[@year_old, @year_new]")
    if states is not None:
        gdf_to_plot = gdf_to_plot[gdf_to_plot.state.isin(states)]
        
    # Calculate emissions difference between years    
    pos_df, neg_df = calculate_emissions_difference(gdf_to_plot)
    
    # Plot figure
    fig, ax = plt.subplots(figsize=(40, 20))
    pos_df.plot(
        ax=ax,
        color="green",
        alpha=alpha,
        markersize=pos_df.year_diff / scaling,
    )
    neg_df.plot(
        ax=ax,
        color="red",
        alpha=alpha,
        markersize=neg_df.year_diff / scaling,
    )
    ax.set_axis_off()
    ax.set_title(f'CO2 Emissions Difference {year_old} - {year_new}', fontsize=50)
    ctx.add_basemap(ax)
    plt.show()
        
    return plt.show()

map_emissions_difference(cems_gdf, 2009, 2019, states=None, scaling=1e4, alpha=0.5)
