## Recreating the Master Unit List

#### setup/imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import pudl
import pudl.constants as pc
import pudl.extract.ferc1
import sqlalchemy as sa
import logging
import sys

In [None]:
from copy import deepcopy

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
logger.handlers = [handler]

#### defining a table grabbing object

In [None]:
class compile_tables(object):
    
    def __init__(self, pudl_engine, freq=None, start_date=None, end_date=None):
        """
        initializing a table compiler
        """
        self.pudl_engine = pudl_engine
        self.freq = freq

        if start_date is None:
            self.start_date = \
                pd.to_datetime(
                    '{}-01-01'.format(min(pc.working_years['eia923'])))
        else:
            # Make sure it's a date... and not a string.
            self.start_date = pd.to_datetime(start_date)

        if end_date is None:
            self.end_date = \
                pd.to_datetime(
                    '{}-12-31'.format(max(pc.working_years['eia923'])))
        else:
            # Make sure it's a date... and not a string.
            self.end_date = pd.to_datetime(end_date)

        if not pudl_engine:
            raise AssertionError('PudlTabl object needs a pudl_engine')
        self.pudl_engine = pudl_engine

        # grabing the metadata object for the sqlite db
        self.pt = pudl.output.pudltabl.get_table_meta(self.pudl_engine)
        
        self.pudl_out = pudl.output.pudltabl.PudlTabl(
            pudl_engine=pudl_engine, freq=self.freq)
        
        self._dfs = {
            'generation_fuel_eia923': None,
            'fuel_receipts_costs_eia923': None,
            'generators_eia860': None,
            'boiler_generator_assn_eia860': None,
            'generation_eia923': None,

            'fuel_cost': None,
        }

    
    def grab_the_table(self, table):
        if table is None:
            return 

        if self._dfs[table] is None:
            # this is going to try to see if the table is in the db
            # if pt[table] is not None:
            try:
                tbl = pt[table]
                print(f'   grabbing {table} from the sqlite db')
                select = sa.sql.select([tbl, ])
                if self.start_date is not None:
                    select = select.where(
                        tbl.c.report_date >= self.start_date)
                if self.end_date is not None:
                    select = select.where(
                        tbl.c.report_date <= self.end_date)
                self._dfs[table] = pd.read_sql(select, self.pudl_engine, parse_dates=['report_date'], index_col=['id'])
            # if is it not a database table, it is an output function
            # elif hasattr(pudl_out_eia, table):
            except KeyError:
                # getattr turns the string of the table into an attribute
                # of the object, so this runs the output function
                print(f'   grabbing {table} from the output object')
                self._dfs[table] = getattr(self.pudl_out, table)()
        return self._dfs[table]

In [None]:
def grab_denormalize_table(table, denorm_table=None, denorm_cols=None, indicator=False):
    """
    Grab and denormalize the table.
    
    Grab the table that you want, and merge it with another table based
    on the 'denorm_cols'.
    
    Args:
        table (string): a table name
        denorm_table (string): the name of the table you want to merge in
        denorm_cols (list): the columns to use to merge the tables
        indicator (bool): True of False for whether or not you want to
            include an indicator column in your merge that notes where
            each row came from.
    Returns:
        pandas.Dataframe
    """
    table_df = table_compiler.grab_the_table(table)
    if denorm_table:
        logger.info(f'   denormalizing {table}')
        # denormalize the plant granularity
        table_df = table_df.merge(table_compiler.grab_the_table(denorm_table),
                                  on=denorm_cols,
                                  how='outer',
                                  indicator=indicator)
    return table_df

In [None]:
plant_granularities = {
    'plant': {
        'id_cols': ['plant_id_eia'],
        'ag_tables': {
            'fuel_cost':{ 
                'denorm_table': None,
                'denorm_cols': None,
                'ag_cols':{
                    'fuel_cost_per_mwh': 'mean',
                    },
            },
            'generation_fuel_eia923': {
                'denorm_table': None,
                'denorm_cols': None,
                'ag_cols': {
                    'fuel_consumed_mmbtu': pudl.helpers.sum_na,
                    'net_generation_mwh': pudl.helpers.sum_na,
                    },
            },
            'fuel_receipts_costs_eia923': {
                'denorm_table': None,
                'denorm_cols': None,
                'ag_cols': {
                    'fuel_cost_per_mmbtu': pudl.helpers.sum_na,
                    },
            },
            'generators_eia860': {
                'denorm_table': None,
                'denorm_cols': None,
                'ag_cols': {            
                    'capacity_mw': pudl.helpers.sum_na,
                    },
            },
        }
    },
    'plant_unit': {
        'id_cols': ['plant_id_eia','unit_id_pudl'],
        # unit_id_pudl are associated with plant_id's and plant_id's/generator_id's
        'ag_tables': {
            'generators_eia860': {
                'denorm_table': 'boiler_generator_assn_eia860',
                'denorm_cols': ['plant_id_eia','generator_id', 'report_date'],
                'ag_cols': {
                    'capacity_mw': pudl.helpers.sum_na,
                },
            },
            'generation_fuel_eia923': {
                'denorm_table': 'boiler_generator_assn_eia860',
                'denorm_cols': ['plant_id_eia', 'report_date'],
                'ag_cols': {
                    'fuel_consumed_mmbtu': pudl.helpers.sum_na,
                    'net_generation_mwh': pudl.helpers.sum_na,
                },
            },
            'fuel_cost': {
                'denorm_table': 'boiler_generator_assn_eia860',
                'denorm_cols': ['plant_id_eia','generator_id', 'report_date'],
                'ag_cols': {
                    'fuel_cost_per_mwh': pudl.helpers.sum_na,
                },

            }

        },
    },
    'plant_prime_fuel': {
        'id_cols': ['plant_id_eia','energy_source_code_1'],
        'ag_tables': {
            
        }
    }
    
}

In [None]:
pudl_settings = pudl.workspace.setup.get_defaults()
pudl_engine = sa.create_engine(pudl_settings["pudl_db"])
pt = pudl.output.pudltabl.get_table_meta(pudl_engine)

In [None]:
table_compiler = compile_tables(pudl_engine, freq='AS')

In [None]:
compiled_dfs = {}
for plant_gran in plant_granularities:
    logger.info(f'compiling data for {plant_gran}')
    cols_to_grab = plant_granularities[plant_gran]['id_cols'] + ['report_date']
    all_the_stuff = pd.DataFrame(columns=cols_to_grab)
    for table, table_details in plant_granularities[plant_gran]['ag_tables'].items():
        # grab the table
        logger.info(f'   aggregating {table}')
        all_the_stuff = (
            # grab the table
            grab_denormalize_table(
                table,
                denorm_table=table_details['denorm_table'],
                denorm_cols=table_details['denorm_cols']).
            groupby(cols_to_grab).
            # use the groupby object to aggregate on the ag_cols
            # this runs whatever function we've defined in the 
            # ag_cols dictionary
            agg(table_details['ag_cols']).
            # reset the index because the output of the agg
            reset_index().
            # merge the new table into the compiled df
            merge(all_the_stuff, how='outer', on=cols_to_grab)
        ) 
    # add the df into a dictionary of dfs
    compiled_dfs[plant_gran] = all_the_stuff

## Playing with the table compiler

In [None]:
table_compiler.grab_the_table('generators_eia860')

In [None]:
grab_denormalize_table('generation_eia923',
                       denorm_table='boiler_generator_assn_eia860',
                       denorm_cols=['plant_id_eia','generator_id', 'report_date'],
                       indicator=True)

## Trying the grouby/aggregation

In [None]:
# you can change these inputs and run the following cell
# to see what an aggregated dataframe with these inputs will
# result in.
table = 'generation_fuel_eia923'
denorm_table = 'boiler_generator_assn_eia860'
denorm_cols = ['plant_id_eia', 'report_date']
# the id_cols + report_date
cols_to_grab = ['plant_id_eia', 'report_date']
ag_cols = {'fuel_consumed_mmbtu': pudl.helpers.sum_na,
           'net_generation_mwh': pudl.helpers.sum_na,}

In [None]:
(grab_denormalize_table(
    table,
    denorm_table=denorm_table,
    denorm_cols=denorm_cols).
 groupby(cols_to_grab).
 agg(ag_cols).reset_index())

## Playing with the compiled outputs 

In [None]:
# printing out the keys of the dictionary so you can see
# which 
compiled_dfs.keys()

In [None]:
plant_unit = compiled_dfs['plant_unit']

In [None]:
# if you want to look at an individaul plant
plant_unit[plant_unit['plant_id_eia'] == 3]

In [None]:
# selecting on two criteria (plant_id_eia and report_date)
plant_unit[(plant_unit['plant_id_eia'] == 3) & (plant_unit['report_date'] == '2017-01-01')]

In [None]:
# you can see where fields are empty
plant_unit[plant_unit['capacity_mw'].isnull()]

In [None]:
# you can see where fields are not empty
plant_unit[plant_unit['capacity_mw'].notnull()]