# ETL: Extract, Transform, Load

Extract, Transform, Load is one of the key components of data engineering.The basic idea is getting data from the source(s), manipulating them to be in a useful, correct format for downstream consumers, and moving them to a destination, usually some sort of centralized repository like a *datalake* or *datawarehouse*.

This process can be done manually or scheduled to occur automatically, and there are several commercial software applications out there that can be leveraged as well, such as [snowflake](https://www.snowflake.com/guides/what-etl) or [databricks](https://www.databricks.com/glossary/extract-transform-load).

There is also a slightly different pipline pattern, **ELT** during which raw data is all loaded into a central repository *first* and can then be transformed to meet various business needs later on.

Example ETL use cases:
* improve performance of an analytics dashboard
    * pre-calculate as much as possible outside of tools like Tableau
    * decreases load on dashboard and codifies/documents your data prep steps
    * save space by limiting number of columns read in
* central repository for data
    * enforce data standards
    * all downstream consumers starting with the same source data
    * access data from multiple sources in one place

## Extract: gather source data

### Examples of Extraction Methods:
* Read in a downloaded file (e.g. json or csv)
* REST API requests
* connect to a database and run a SQL query
* connect to a cloud resource (e.g. streaming service, storage system)
* parse web pages ([python web scraping libraries](https://towardsdatascience.com/choose-the-best-python-web-scraping-library-for-your-application-91a68bc81c4f))
* connect to a sensor or other IOT data source

Today, we'll be extracting our data from a couple **API's**, or *application programming interface*. 

API's allow systems and computers to communicate. System A submits a *request* to System B, in a format System B can understand. Subsequently, System B will send a *response* containing the information requested by System A.

**REST** stands for "**RE**presentational **S**tate **T**ransfer" and is a common architectural style for formatting API requests. Requests are formatted in an HTTP URI, and responses are delivered in one of several HTTP formats (e.g. JSON, XML, etc.)

In [2]:
# pip install requests
# pip install pandas

In [3]:
import os
import requests
import pandas as pd
import csv
import sys
from io import StringIO

#### Read in a CSV file

In [4]:
cwd = os.getcwd()
cwd

'C:\\Users\\Hanqi-Xiao\\OneDrive - University of North Carolina at Chapel Hill\\Documents\\Python_scripts\\Pandas_and_matplot\\CDC_2022'

In [11]:
path = f"Flood_Health_Vulnerability.csv"

1. Loop through a directory and create dataframes for each file

In [12]:
raw_frames = []
for filename in os.listdir(path):
    f = os.path.join(path, filename)
    # checking if it is a file
    if os.path.isfile(f):
        #.gz file indicates it's compressed. If you unzipped already or have downloaded a regular .csv file
        ## you can omit the gzip argument
        # make everything a string to maintain all data (e.g. id numbers with leading zeros)
        df = pd.read_csv(f, compression='gzip', dtype=str)
        raw_frames.append(df)

NotADirectoryError: [WinError 267] The directory name is invalid: 'Flood_Health_Vulnerability.csv'

2. Merge the dataframes into one
NOTE: concat and merge are two different pandas methods, we'll look at both in this demo. Here, we're using .merge to essentially "stack" dataframes with identical schemas on top of one another

In [13]:
raw_frames[1]

IndexError: list index out of range

In [8]:
stormwater = pd.concat(raw_frames)

In [9]:
stormwater

Unnamed: 0,YEARMONTH,EPISODE_ID,EVENT_ID,LOCATION_INDEX,RANGE,AZIMUTH,LOCATION,LATITUDE,LONGITUDE,LAT2,LON2
0,202008,152218,919675,1,.89,SE,EMIT,35.72,-78.26,3543200,7815600
1,202008,152218,919676,1,2.76,N,MIDDLESEX,35.82,-78.22,3549200,7813200
2,202008,152218,919677,1,.56,W,(RWI)ROCKY MT WILSON,35.85,-77.91,3551000,7754600
3,202008,152218,919679,1,.56,W,TIMBERLAKE,36.28,-78.96,3616800,7857600
4,202008,152218,919680,1,.69,S,ROSEBORO,34.94,-78.53,3456400,7831800
...,...,...,...,...,...,...,...,...,...,...,...
24519,202203,167113,1011295,1,.91,NW,BRANNONVILLE,30.23,-85.6,3013800,8536000
24520,202203,167113,1011295,2,.62,NW,BRANNONVILLE,30.2257,-85.598,3013542,8535880
24521,202203,167113,1011295,3,.51,NW,BRANNONVILLE,30.2261,-85.5949,3013566,8535694
24522,202203,167113,1011295,4,.77,NNW,BRANNONVILLE,30.23,-85.5956,3013800,8535736


#### Requesting data from an API

Data Sources: 
* [EPA Envirofacts](https://www.epa.gov/enviro/about-data)
* [Census Bureau ACS](https://www.census.gov/data/developers/data-sets/acs-5year.html)

**EPA Greenhouse Gas Data**

We'll use the [requests](https://pypi.org/project/requests/) library to make our api calls:

In [10]:
gas_request = requests.get('https://data.epa.gov/efservice/PUB_DIM_GHG/EXCEL')

In [11]:
# metadata about the response
gas_request.headers

{'Date': 'Sat, 08 Oct 2022 15:29:11 GMT', 'Content-Type': 'application/vnd.ms-excel', 'Content-Length': '859', 'Connection': 'keep-alive', 'Server': 'nginx/1.21.6', 'Last-Modified': 'Sat, 08 Oct 2022 15:29:12 GMT', 'ETag': '"7c6a408139987d076cca29680efb5b58"', 'Content-Disposition': 'attachment;filename=EnvirofactsRestAPI.CSV', 'Accept-Ranges': 'bytes', 'X-XSS-Protection': '1; mode=block', 'X-Frame-Options': 'SAMEORIGIN', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains'}

In [12]:
# response returned in CSV format (this particular table from EPA, only has XML and EXCEL as format options, normally
## JSON is the preferred response format if it's available, 
## as there's a lot of json support in python: https://docs.python.org/3/library/json.html)
gas_request.text

'GAS_ID,GAS_CODE,GAS_NAME,GAS_LABEL\n"1","CO2","Carbon Dioxide","Carbon Dioxide (CO<sub>2</sub>)"\n"2","CH4","Methane","Methane (CH<sub>4</sub>)"\n"3","N2O","Nitrous Oxide","Nitrous Oxide (N<sub>2</sub>O)"\n"6","SF6","Sulfur Hexafluoride","Sulfur hexafluoride (SF<sub>6</sub>)"\n"7","CHF3","Fluoroform","HFC-23"\n"8","BIOCO2","Biogenic CO2","Biogenic Carbon Dioxide (CO<sub>2</sub>)"\n"9","NF3","Nitrogen Triflouride","Nitrogen trifluoride (NF<sub>3</sub>)"\n"10","HFC","HFCs","Hydrofluorocarbons (HFCs)"\n"11","PFC","PFCs","Perfluorocarbons (PFCs)"\n"12","HFE","HFEs","HFEs"\n"13","Other","Other","Other"\n"14","Other_L","Fluorinated GHG Production (CO<sub>2</sub>e)","Fluorinated GHG Production (CO<sub>2</sub>e)"\n"15","Very_Short","Very Short-lived Compounds","Very Short-lived Compounds"\n"16","Other_Full","Other Fully Fluorinated GHGs","Other Fully Fluorinated GHGs"\n'

In [13]:
gases = pd.read_csv(StringIO(gas_request.text))

In [14]:
gases

Unnamed: 0,GAS_ID,GAS_CODE,GAS_NAME,GAS_LABEL
0,1,CO2,Carbon Dioxide,Carbon Dioxide (CO<sub>2</sub>)
1,2,CH4,Methane,Methane (CH<sub>4</sub>)
2,3,N2O,Nitrous Oxide,Nitrous Oxide (N<sub>2</sub>O)
3,6,SF6,Sulfur Hexafluoride,Sulfur hexafluoride (SF<sub>6</sub>)
4,7,CHF3,Fluoroform,HFC-23
5,8,BIOCO2,Biogenic CO2,Biogenic Carbon Dioxide (CO<sub>2</sub>)
6,9,NF3,Nitrogen Triflouride,Nitrogen trifluoride (NF<sub>3</sub>)
7,10,HFC,HFCs,Hydrofluorocarbons (HFCs)
8,11,PFC,PFCs,Perfluorocarbons (PFCs)
9,12,HFE,HFEs,HFEs


In [15]:
# pull data from other ghg tables
sector_request = requests.get('https://data.epa.gov/efservice/PUB_DIM_SECTOR/EXCEL')
sectors = pd.read_csv(StringIO(sector_request.text))
sub_sector_request = requests.get('https://data.epa.gov/efservice/PUB_DIM_SECTOR/EXCEL')
sub_sector = pd.read_csv(StringIO(sub_sector_request.text))

In [16]:
sectors

Unnamed: 0,SECTOR_ID,SECTOR_CODE,SECTOR_NAME,SECTOR_TYPE,SECTOR_COLOR,SORT_ORDER
0,2,WASTE,Waste,E,#9B9EBB,6.0
1,3,POWERPLANTS,Power Plants,E,#FFBD59,1.0
2,4,REFINERIES,Refineries,E,#FFFF7F,3.0
3,5,CHEMICALS,Chemicals,E,#99D8F5,4.0
4,6,METALS,Metals,E,#B9B9CF,7.0
5,7,PULPANDPAPER,Pulp and Paper,E,#D7D7E4,9.0
6,8,MINERALS,Minerals,E,#DDEDF4,8.0
7,14,OTHER,Other,E,#C0E6E6,5.0
8,9,COAL_TO_LIQUIDS_SUP,Coal-based Liquid Fuel Supply,S,,1.0
9,10,PETROLEUM_SUP,Petroleum Product Suppliers,S,,2.0


As we're going through the tables, this starts to feel repetitive... any repetitive action can usually be streamlined into a **LOOP**:

1. Create list of our table names
2. Loop through the list, making the requests, and storing the responses in a pandas dataframe

In [17]:
tables = ['PUB_DIM_FACILITY', 'PUB_FACTS_SUBP_GHG_EMISSION',
         'PUB_DIM_SUBPART', 'PUB_DIM_GHG', 'PUB_FACTS_SECTOR_GHG_EMISSION',
         'PUB_DIM_SECTOR', 'PUB_DIM_SUBSECTOR']
# by storing these variables, I can easily update the code if the api
## location (url) or pipeline requirements change in the future
base_url = 'https://data.epa.gov/efservice/'
response_format = 'EXCEL'

In [18]:
# mapping table names to shorter, more convenient variables
table_map = {'PUB_DIM_FACILITY': 'facility', 
             'PUB_FACTS_SUBP_GHG_EMISSION': 'join_subpart_ghg',
             'PUB_DIM_SUBPART': 'subpart', 
             'PUB_DIM_GHG': 'ghg', 
             'PUB_FACTS_SECTOR_GHG_EMISSION': 'join_sector_ghg',
             'PUB_DIM_SECTOR': 'sector', 
             'PUB_DIM_SUBSECTOR': 'subsector'}

In [19]:
# dictionary to store our dataframes
dataframes = {}

for table in tables:
    table_request = requests.get(f'{base_url}/{table}/{response_format}')
    dataframes[table_map[table]] = pd.read_csv(StringIO(table_request.text), dtype=str)

In [20]:
# now all seven dataframes are available to us in the "dataframes" dictionary, via the mapped variable names
dataframes['sector']

Unnamed: 0,SECTOR_ID,SECTOR_CODE,SECTOR_NAME,SECTOR_TYPE,SECTOR_COLOR,SORT_ORDER
0,2,WASTE,Waste,E,#9B9EBB,6.0
1,3,POWERPLANTS,Power Plants,E,#FFBD59,1.0
2,4,REFINERIES,Refineries,E,#FFFF7F,3.0
3,5,CHEMICALS,Chemicals,E,#99D8F5,4.0
4,6,METALS,Metals,E,#B9B9CF,7.0
5,7,PULPANDPAPER,Pulp and Paper,E,#D7D7E4,9.0
6,8,MINERALS,Minerals,E,#DDEDF4,8.0
7,14,OTHER,Other,E,#C0E6E6,5.0
8,9,COAL_TO_LIQUIDS_SUP,Coal-based Liquid Fuel Supply,S,,1.0
9,10,PETROLEUM_SUP,Petroleum Product Suppliers,S,,2.0


**Census Bureau American Community Survey 5-Year data**

In [27]:
mhi_request = requests.get('https://api.census.gov/data/2020/acs/acs5?get=NAME,B19013_001E&for=county:*')

In [28]:
mhi_text = mhi_request.text

In [29]:
mhi_text

'[["NAME","B19013_001E","state","county"],\n["Autauga County, Alabama","57982","01","001"],\n["Baldwin County, Alabama","61756","01","003"],\n["Barbour County, Alabama","34990","01","005"],\n["Bibb County, Alabama","51721","01","007"],\n["Blount County, Alabama","48922","01","009"],\n["Bullock County, Alabama","33866","01","011"],\n["Butler County, Alabama","44850","01","013"],\n["Calhoun County, Alabama","50128","01","015"],\n["Lewis County, Kentucky","29844","21","135"],\n["Lincoln County, Kentucky","42231","21","137"],\n["Livingston County, Kentucky","52795","21","139"],\n["Logan County, Kentucky","48912","21","141"],\n["Lyon County, Kentucky","49286","21","143"],\n["McCracken County, Kentucky","47011","21","145"],\n["McCreary County, Kentucky","29499","21","147"],\n["Madison County, Kentucky","51649","21","151"],\n["Marion County, Kentucky","43587","21","155"],\n["Marshall County, Kentucky","57348","21","157"],\n["Mason County, Kentucky","46241","21","161"],\n["Meade County, Kentuc

In [23]:
mhi_text = mhi_text.replace('\n', '')

In [24]:
import ast

In [25]:
mhi_list = ast.literal_eval(mhi_text)

In [19]:
mhi = pd.DataFrame(mhi_list[1:], columns = mhi_list[0], dtype=str)

In [20]:
population_request = requests.get('https://api.census.gov/data/2020/acs/acs5?get=NAME,B01003_001E&for=county:*')
pop_text = population_request.text
pop_text = pop_text.replace('\n', '')
pop_list = ast.literal_eval(pop_text)

In [21]:
population = pd.DataFrame(pop_list[1:], columns = pop_list[0], dtype=str)

**CHALLENGE**: write a loop to make requests for multiple census variables at once (see the census api documentation for information on the variables available: https://www.census.gov/data/developers/data-sets/acs-5year.html#:~:text=2020%20ACS%20Detailed%20Tables%20Variables)

## Transform

Now that we have extracted data from a couple of sources, we want to combine and clean them so they are useful to us or other downstream consumers.

**Common transformations**
* select columns to load (i.e. drop unnecessary columns)
* cast to correct datatype (we start with all strings to maintain all the raw data)
* aggregate data (e.g. sum, count, minimum/maximum, etc.)
* join data sources
* derive new columns (e.g. split, concatenate, calculate new values, etc.) 
* validate data (fill in nulls, reject rows, fix incorrect values, etc.)

Current dataset variables: dataframes (dict), mhi, population

In [22]:
dataframes.keys()

dict_keys(['facility', 'join_subpart_ghg', 'subpart', 'ghg', 'join_sector_ghg', 'sector', 'subsector'])

In [23]:
mhi

Unnamed: 0,NAME,B19013_001E,state,county
0,"Autauga County, Alabama",57982,01,001
1,"Baldwin County, Alabama",61756,01,003
2,"Barbour County, Alabama",34990,01,005
3,"Bibb County, Alabama",51721,01,007
4,"Blount County, Alabama",48922,01,009
...,...,...,...,...
3216,"Renville County, Minnesota",58542,27,129
3217,"Roseau County, Minnesota",62304,27,135
3218,"Sherburne County, Minnesota",88671,27,141
3219,"Steele County, Minnesota",68172,27,147


In [24]:
population

Unnamed: 0,NAME,B01003_001E,state,county
0,"Autauga County, Alabama",55639,01,001
1,"Baldwin County, Alabama",218289,01,003
2,"Barbour County, Alabama",25026,01,005
3,"Bibb County, Alabama",22374,01,007
4,"Blount County, Alabama",57755,01,009
...,...,...,...,...
3216,"Renville County, Minnesota",14572,27,129
3217,"Roseau County, Minnesota",15259,27,135
3218,"Sherburne County, Minnesota",96015,27,141
3219,"Steele County, Minnesota",36710,27,147


#### Create new columns to join dataframes

Combine census dataframes
1. Create FIPS column -- county identifier for census data AND also a column in the facility GHG table
2. Join population and mhi data

In [25]:
dataframes['facility']

Unnamed: 0,FACILITY_ID,LATITUDE,LONGITUDE,CITY,STATE,ZIP,COUNTY_FIPS,COUNTY,ADDRESS1,ADDRESS2,...,PUBLIC_XML_XML,REPORTED_INDUSTRY_TYPES,FACILITY_TYPES,SUBMISSION_ID,UU_RD_EXEMPT,REPORTING_STATUS,PROCESS_STATIONARY_CML,COMMENTS,RR_MRV_PLAN_URL,RR_MONITORING_PLAN_FILENAME
0,1009051,37.795963,-122.022317,San Ramon,CA,94583,06013,CONTRA COSTA,6001 Bollinger Canyon Road,,...,,W-ONSH,Onshore Oil & Gas Production,111373,,,,,,
1,1008942,33.07567,-96.80594,Plano,TX,75024,48085,COLLIN,5320 Legacy Drive,,...,,W-ONSH,Onshore Oil & Gas Production,110481,,,,,,
2,1008914,32.75619,-97.33441,Fort Worth,TX,76102,48439,TARRANT COUNTY,"100 Throckmorton St., #1200",,...,,W-ONSH,Onshore Oil & Gas Production,112161,,,,,,
3,1008688,32.00316,-102.07945,Midland,TX,79701,48329,MIDLAND COUNTY,600 North Marienfeld Street Suite 301,,...,,W-ONSH,Onshore Oil & Gas Production,65905,,,,,,
4,1008621,35.46734,-97.51406,Oklahoma City,OK,73102,40109,Oklahoma,333 West Sheridan Ave,,...,,W-ONSH,Onshore Oil & Gas Production,66924,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9996,1008897,42.45766,-83.65611,South Lyon,MI,48178,26125,OAKLAND COUNTY,400 South McMunn,,...,,C,Direct Emitter,41587,,,,,,
9997,1008673,40.7164495,-80.1205256,Cranberry Township,PA,16066,,,"260 Executive Drive, Suite 100",,...,,W-ONSH,Onshore Oil & Gas Production,109255,,,,,,
9998,1001085,39.4492,-84.4611,TRENTON,OH,45100,39017,Butler,2100 WOODSDALE RD,,...,,"C,D",Direct Emitter,46466,,,,,,
9999,1007870,28.4272,-81.5803,LAKE BUENA VISTA,FL,32830,12095,Orange,1375 Buena Vista Drive,,...,,"C,D,NN-LDC","Supplier, Direct Emitter",51890,,,,,,


In [26]:
population['FIPS'] = population['state']+population['county']
population.head()

Unnamed: 0,NAME,B01003_001E,state,county,FIPS
0,"Autauga County, Alabama",55639,1,1,1001
1,"Baldwin County, Alabama",218289,1,3,1003
2,"Barbour County, Alabama",25026,1,5,1005
3,"Bibb County, Alabama",22374,1,7,1007
4,"Blount County, Alabama",57755,1,9,1009


In [27]:
mhi['FIPS'] = mhi['state']+mhi['county']
mhi.head()

Unnamed: 0,NAME,B19013_001E,state,county,FIPS
0,"Autauga County, Alabama",57982,1,1,1001
1,"Baldwin County, Alabama",61756,1,3,1003
2,"Barbour County, Alabama",34990,1,5,1005
3,"Bibb County, Alabama",51721,1,7,1007
4,"Blount County, Alabama",48922,1,9,1009


#### JOIN resources:
* types of joins: https://www.w3schools.com/sql/sql_join.asp
* joins/merges in python pandas: https://realpython.com/pandas-merge-join-and-concat/

In [28]:
census_data = population.merge(mhi, how='inner', on='FIPS')

In [29]:
# sanity check that no data was lost on our join
print(len(population), len(mhi), len(census_data))

3221 3221 3221


In [30]:
census_data

Unnamed: 0,NAME_x,B01003_001E,state_x,county_x,FIPS,NAME_y,B19013_001E,state_y,county_y
0,"Autauga County, Alabama",55639,01,001,01001,"Autauga County, Alabama",57982,01,001
1,"Baldwin County, Alabama",218289,01,003,01003,"Baldwin County, Alabama",61756,01,003
2,"Barbour County, Alabama",25026,01,005,01005,"Barbour County, Alabama",34990,01,005
3,"Bibb County, Alabama",22374,01,007,01007,"Bibb County, Alabama",51721,01,007
4,"Blount County, Alabama",57755,01,009,01009,"Blount County, Alabama",48922,01,009
...,...,...,...,...,...,...,...,...,...
3216,"Renville County, Minnesota",14572,27,129,27129,"Renville County, Minnesota",58542,27,129
3217,"Roseau County, Minnesota",15259,27,135,27135,"Roseau County, Minnesota",62304,27,135
3218,"Sherburne County, Minnesota",96015,27,141,27141,"Sherburne County, Minnesota",88671,27,141
3219,"Steele County, Minnesota",36710,27,147,27147,"Steele County, Minnesota",68172,27,147


In [31]:
# only keep the necessary columns
## rename columns to be more meaningful
census = census_data[['NAME_x', 'FIPS', 'B01003_001E', 'B19013_001E']]
census.columns = ['name', 'fips', 'population', 'mhi']

In [32]:
census

Unnamed: 0,name,fips,population,mhi
0,"Autauga County, Alabama",01001,55639,57982
1,"Baldwin County, Alabama",01003,218289,61756
2,"Barbour County, Alabama",01005,25026,34990
3,"Bibb County, Alabama",01007,22374,51721
4,"Blount County, Alabama",01009,57755,48922
...,...,...,...,...
3216,"Renville County, Minnesota",27129,14572,58542
3217,"Roseau County, Minnesota",27135,15259,62304
3218,"Sherburne County, Minnesota",27141,96015,88671
3219,"Steele County, Minnesota",27147,36710,68172


**Join dataframes from different sources**

In [33]:
print(len(census), len(dataframes['facility']))

3221 10001


Keep all data for now

In [34]:
census_facility = dataframes['facility'].merge(census, how='left', left_on='COUNTY_FIPS', right_on='fips')

In [35]:
len(census_facility)

10001

Investigate facilities that don't have census data

In [36]:
census_facility[census_facility['fips'].isna()]

Unnamed: 0,FACILITY_ID,LATITUDE,LONGITUDE,CITY,STATE,ZIP,COUNTY_FIPS,COUNTY,ADDRESS1,ADDRESS2,...,UU_RD_EXEMPT,REPORTING_STATUS,PROCESS_STATIONARY_CML,COMMENTS,RR_MRV_PLAN_URL,RR_MONITORING_PLAN_FILENAME,name,fips,population,mhi
6,1008536,35.53376,-97.52976,Fort Worth,TX,76102,,,301 Commerce Dr. Suite 3701,,...,,,,,,,,,,
49,1012511,32.91606,-96.773085,Dallas,TX,75251,,,12377 Merit Dr. Suite 1200,,...,,,,,,,,,,
77,1011451,13.463579,144.678216,Piti,GU,96915,66010,GUAM,180 Cabras Highway,,...,,,,,,,,,,
101,1010718,39.7541032,-105.0002242,Denver,CO,80202,,,1615 Wynkoop Street,,...,,,,,,,,,,
190,1009797,28.979049,-91.4727,Offshore,LA,00000,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9821,1002569,28.15402604,-89.10355357,Offshore,LA,00000,,,,,...,,,,,,,,,,
9856,1000719,29.7765065,-95.4201377,Houston,TX,77007,,,"55 Waugh Drive, Suite 700",,...,,,,,,,,,,
9978,1009244,40.1064831,-108.8670246,Rangely,CO,81648,,,100 Chevron Road,,...,,,,,,,,,,
9995,1008670,40.49683,-79.8759,Pittsburgh,PA,15220,,,"651 Holiday Drive, Suite 300",,...,,,,,,,,,,


In [37]:
census_facility[(census_facility['fips'].isna()) & ~(census_facility['COUNTY_FIPS'].isna())]

Unnamed: 0,FACILITY_ID,LATITUDE,LONGITUDE,CITY,STATE,ZIP,COUNTY_FIPS,COUNTY,ADDRESS1,ADDRESS2,...,UU_RD_EXEMPT,REPORTING_STATUS,PROCESS_STATIONARY_CML,COMMENTS,RR_MRV_PLAN_URL,RR_MONITORING_PLAN_FILENAME,name,fips,population,mhi
77,1011451,13.463579,144.678216,Piti,GU,96915,66010,GUAM,180 Cabras Highway,,...,,,,,,,,,,
912,1008001,17.750141,-64.714793,CHRISTIANSTED,VI,820,78010,ST. CROIX ISLAND,1 Penitentiary Lane,,...,,,,,,,,,,
3044,1004611,13.541741,144.807727,Dededo,GU,96929,66010,GUAM,NCS,,...,,STOPPED_REPORTING_VALID_REASON,,,,,,,,
8237,1006451,17.7102,-64.7544,CHRISTIANSTED,VI,820,78010,ST. CROIX ISLAND,1 ESTATE HOPE,,...,,,,,,,,,,
9470,1005781,61.084857,-146.388531,VALDEZ,AK,99686,2261,VALDEZ-CORDOVA CENSUS AREA,300 DAYVILLE ROAD,,...,,,,,,,,,,
9625,1008001,17.750141,-64.714793,CHRISTIANSTED,VI,820,78010,ST. CROIX ISLAND,1 Penitentiary Lane,,...,,,,,,,,,,


At this point we can either decide to drop those rows, or try to fill them in somehow (e.g. assign them a county based on zip code)

*NOTE: we would probably still drop the 5 non-continental US records (i.e. Guam and St. Croix island) and the "offshore" counties with 00000 zipcodes, even if we did try to fill in the other values*

For now we'll just drop them. There's a couple of ways to do this:
1. inner join
2. remove rows based on a condition

In [38]:
len(census_facility) - len(census_facility[census_facility['fips'].isna()])

9770

In [39]:
census_facility_inner = dataframes['facility'].merge(census, how='inner', left_on='COUNTY_FIPS', right_on='fips')

In [40]:
len(census_facility_inner)

9770

In [41]:
# alternatively, drop rows that do not have a FIPS value
census_facility_no_na = census_facility[~census_facility['fips'].isna()]

In [42]:
census_facility_no_na

Unnamed: 0,FACILITY_ID,LATITUDE,LONGITUDE,CITY,STATE,ZIP,COUNTY_FIPS,COUNTY,ADDRESS1,ADDRESS2,...,UU_RD_EXEMPT,REPORTING_STATUS,PROCESS_STATIONARY_CML,COMMENTS,RR_MRV_PLAN_URL,RR_MONITORING_PLAN_FILENAME,name,fips,population,mhi
0,1009051,37.795963,-122.022317,San Ramon,CA,94583,06013,CONTRA COSTA,6001 Bollinger Canyon Road,,...,,,,,,,"Contra Costa County, California",06013,1147788,103997
1,1008942,33.07567,-96.80594,Plano,TX,75024,48085,COLLIN,5320 Legacy Drive,,...,,,,,,,"Collin County, Texas",48085,1006038,100541
2,1008914,32.75619,-97.33441,Fort Worth,TX,76102,48439,TARRANT COUNTY,"100 Throckmorton St., #1200",,...,,,,,,,"Tarrant County, Texas",48439,2077153,70306
3,1008688,32.00316,-102.07945,Midland,TX,79701,48329,MIDLAND COUNTY,600 North Marienfeld Street Suite 301,,...,,,,,,,"Midland County, Texas",48329,171238,83217
4,1008621,35.46734,-97.51406,Oklahoma City,OK,73102,40109,Oklahoma,333 West Sheridan Ave,,...,,,,,,,"Oklahoma County, Oklahoma",40109,792668,55519
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9994,1009436,43.621474,-84.767905,Mount Pleasant,MI,48858,26073,ISABELLA COUNTY,1425 S Mission Rd,,...,,,,,,,"Isabella County, Michigan",26073,70363,46783
9996,1008897,42.45766,-83.65611,South Lyon,MI,48178,26125,OAKLAND COUNTY,400 South McMunn,,...,,,,,,,"Oakland County, Michigan",26125,1255340,81587
9998,1001085,39.4492,-84.4611,TRENTON,OH,45100,39017,Butler,2100 WOODSDALE RD,,...,,,,,,,"Butler County, Ohio",39017,382129,69023
9999,1007870,28.4272,-81.5803,LAKE BUENA VISTA,FL,32830,12095,Orange,1375 Buena Vista Drive,,...,,,,,,,"Orange County, Florida",12095,1373784,61416


Notice that the number of rows for the inner join dataframe and the "no na" dataframe are the same! Checking the number of records throughout your transformation processes is a good way to quickly double check the transformations are successful and doing what you intended.

**Drop columns**

We saw how to select only the columns we want, but you can also drop columns. A lot of pandas functions have an "inplace" argument, which if set to *True* lets you avoid creating a whole new dataframe.

In [43]:
census_facility_inner.drop(columns=['ADDRESS1', 'ADDRESS2', 'COMMENTS', 'PROCESS_STATIONARY_CML',
                                   'RR_MRV_PLAN_URL', 'RR_MONITORING_PLAN_FILENAME'], inplace=True)

In [44]:
census_facility_inner.columns

Index(['FACILITY_ID', 'LATITUDE', 'LONGITUDE', 'CITY', 'STATE', 'ZIP',
       'COUNTY_FIPS', 'COUNTY', 'FACILITY_NAME', 'STATE_NAME', 'NAICS_CODE',
       'YEAR', 'BAMM_USED_DESC', 'EMISSION_CLASSIFICATION_CODE',
       'PROGRAM_NAME', 'PROGRAM_SYS_ID', 'FRS_ID', 'CEMS_USED', 'CO2_CAPTURED',
       'REPORTED_SUBPARTS', 'BAMM_APPROVED', 'EMITTED_CO2_SUPPLIED',
       'TRIBAL_LAND_ID', 'EGGRT_FACILITY_ID', 'PUBLIC_XML', 'PARENT_COMPANY',
       'PUBLIC_XML_XML', 'REPORTED_INDUSTRY_TYPES', 'FACILITY_TYPES',
       'SUBMISSION_ID', 'UU_RD_EXEMPT', 'REPORTING_STATUS', 'name', 'fips',
       'population', 'mhi'],
      dtype='object')

When deciding whether to drop unnecessary or keep columns you want, it doesn't really matter so you'll typically just want to go with whichever is a shorter list. For example, it's easier to just drop a column or two than it is to select a whole bunch, and vice versa if you only need a couple of columns, it might be easier to just create a new dataframe that includes them, then trying to list out all the columns you want to drop.

In [45]:
census_facility = census_facility_inner[['FACILITY_ID','LATITUDE','LONGITUDE','CITY','ZIP',
                                   'COUNTY_FIPS','FACILITY_NAME',
                                   'YEAR','REPORTED_SUBPARTS','REPORTED_INDUSTRY_TYPES',
                                   'FACILITY_TYPES','name', 'population', 'mhi']]

**Aggregate total emissions for a facility for each year**

Note: this transformations section got a little sketchy... made some assumptions for the sake of time and just demonstrating types of transformations, but in a real-world scenario we'd need to document those assumptions, and dig a little deeper to understand what's really happening to ensure our final results are correct

In [48]:
print(len(dataframes['join_sector_ghg']['FACILITY_ID'].unique()), len(census_facility.FACILITY_ID.unique()))
print(len(census_facility), len(dataframes['join_sector_ghg']))
print(census_facility.columns)
print(dataframes['join_sector_ghg'].columns)

3291 5583
9770 10001
Index(['FACILITY_ID', 'LATITUDE', 'LONGITUDE', 'CITY', 'ZIP', 'COUNTY_FIPS',
       'FACILITY_NAME', 'YEAR', 'REPORTED_SUBPARTS', 'REPORTED_INDUSTRY_TYPES',
       'FACILITY_TYPES', 'name', 'population', 'mhi'],
      dtype='object')
Index(['FACILITY_ID', 'YEAR', 'SECTOR_ID', 'SUBSECTOR_ID', 'GAS_ID',
       'CO2E_EMISSION'],
      dtype='object')


In [193]:
df = census_facility.merge(dataframes['join_sector_ghg'], how='inner', on=('FACILITY_ID', 'YEAR'))

In [194]:
df

Unnamed: 0,FACILITY_ID,LATITUDE,LONGITUDE,CITY,ZIP,COUNTY_FIPS,FACILITY_NAME,YEAR,REPORTED_SUBPARTS,REPORTED_INDUSTRY_TYPES,FACILITY_TYPES,name,population,mhi,SECTOR_ID,SUBSECTOR_ID,GAS_ID,CO2E_EMISSION
0,1003610,37.938779,-122.396453,RICHMOND,94801,06013,CHEVRON PRODS.CO. RICHMOND REFY,2012,"C,P,PP,Y","C,P,PP,Y","Supplier, Direct Emitter","Contra Costa County, California",1147788,103997,13,73,,
1,1006843,33.774469,-118.290696,WILMINGTON,90744,06037,Phillips 66 Los Angeles Refinery - Wilmington ...,2015,"C,P,PP,Y","C,P,PP,Y","Supplier, Direct Emitter","Los Angeles County, California",10040682,71358,13,73,,
2,1006629,33.8117,-118.16328,Long Beach,90806,06037,City of Long Beach Gas and Oil Department,2016,NN,NN-LDC,Supplier,"Los Angeles County, California",10040682,71358,11,50,1,471075.5
3,1007870,28.4272,-81.5803,LAKE BUENA VISTA,32830,12095,Reedy Creek Improvement District/Walt Disney W...,2018,"C,D,NN","C,D,NN-LDC","Supplier, Direct Emitter","Orange County, Florida",1373784,61416,11,50,1,105509.5
4,1006109,40.382251,-79.549157,Greensburg,15601,42129,Delmont,2011,"C,W","C,W-NGTC",Direct Emitter,"Westmoreland County, Pennsylvania",350722,61398,15,55,3,63.176
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
395,1005343,37.539714,-105.105336,"LA VETA, 4.7 MI W OF",81055,08055,Tabula Rasa Energy - LA VETA GAS P,2012,PP,PP,Supplier,"Huerfano County, Colorado",6769,40255,13,74,1,237898.9
396,1002001,39.20052,-83.59857,Hillsboro,45133,39071,Pike Natural Gas Company,2016,NN,NN-LDC,Supplier,"Highland County, Ohio",43080,47973,11,50,1,41735.8
397,1003719,48.8743194,-102.5457778,Lignite,58752,38013,Lignite Gas Plant,2015,"NN,PP,UU","NN-FRAC,PP,UU","Supplier, CO2 Injection","Burke County, North Dakota",2142,79405,11,51,1,99524.7
398,1003719,48.8743194,-102.5457778,Lignite,58752,38013,Lignite Gas Plant,2015,"NN,PP,UU","NN-FRAC,PP,UU","Supplier, CO2 Injection","Burke County, North Dakota",2142,79405,13,73,,


A lot less data than we'd expect... My guess looking at the model is facilities are *either* in subparts or sectors. If we were building out a pipeline we would probably have someone like a data scientist or business analyst who knows the data better than we do who could probably help guide the joins process better, or we'd want to investigate this further to see why so many facilities are left out of this join.

In [178]:
missing_facilities = []
for facility in dataframes['join_sector_ghg']['FACILITY_ID'].unique():
    if facility not in set(census_facility['FACILITY_ID']):
        missing_facilities.append(facility)

In [198]:
facilities = list(census_facility['FACILITY_ID'].unique())

In [200]:
missing_facilities.sort()
facilities.sort()

In [201]:
facilities

['1000001',
 '1000003',
 '1000004',
 '1000010',
 '1000011',
 '1000015',
 '1000016',
 '1000019',
 '1000022',
 '1000024',
 '1000025',
 '1000026',
 '1000027',
 '1000029',
 '1000030',
 '1000031',
 '1000032',
 '1000033',
 '1000035',
 '1000037',
 '1000038',
 '1000039',
 '1000040',
 '1000041',
 '1000043',
 '1000045',
 '1000046',
 '1000047',
 '1000049',
 '1000050',
 '1000055',
 '1000056',
 '1000057',
 '1000058',
 '1000059',
 '1000061',
 '1000065',
 '1000066',
 '1000067',
 '1000068',
 '1000072',
 '1000073',
 '1000074',
 '1000075',
 '1000076',
 '1000077',
 '1000078',
 '1000080',
 '1000081',
 '1000082',
 '1000084',
 '1000085',
 '1000089',
 '1000090',
 '1000091',
 '1000092',
 '1000093',
 '1000094',
 '1000098',
 '1000099',
 '1000101',
 '1000102',
 '1000103',
 '1000104',
 '1000107',
 '1000108',
 '1000109',
 '1000110',
 '1000114',
 '1000116',
 '1000121',
 '1000122',
 '1000123',
 '1000124',
 '1000125',
 '1000126',
 '1000129',
 '1000131',
 '1000132',
 '1000133',
 '1000134',
 '1000135',
 '1000139',
 '10

In [202]:
missing_facilities

['1000002',
 '1000005',
 '1000006',
 '1000008',
 '1000020',
 '1000021',
 '1000023',
 '1000028',
 '1000051',
 '1000052',
 '1000054',
 '1000060',
 '1000064',
 '1000069',
 '1000070',
 '1000071',
 '1000083',
 '1000086',
 '1000087',
 '1000095',
 '1000096',
 '1000097',
 '1000100',
 '1000105',
 '1000106',
 '1000111',
 '1000112',
 '1000113',
 '1000117',
 '1000118',
 '1000119',
 '1000120',
 '1000127',
 '1000128',
 '1000130',
 '1000136',
 '1000137',
 '1000138',
 '1000141',
 '1000144',
 '1000152',
 '1000169',
 '1000205',
 '1000213',
 '1000245',
 '1000292',
 '1000335',
 '1000350',
 '1000355',
 '1000381',
 '1000435',
 '1000439',
 '1000479',
 '1000529',
 '1000530',
 '1000531',
 '1000533',
 '1000536',
 '1000539',
 '1000542',
 '1000544',
 '1000546',
 '1000550',
 '1000552',
 '1000558',
 '1000559',
 '1000565',
 '1000569',
 '1000797',
 '1000803',
 '1000842',
 '1000854',
 '1000857',
 '1000869',
 '1000872',
 '1000894',
 '1000900',
 '1000903',
 '1000904',
 '1000907',
 '1001057',
 '1001068',
 '1001069',
 '10

Digging in a little further, for this dataset having a "year" for the facility just leads to duplicates. For our case we just want one row per facility, if they're name changed, or anything like that it doesn't really matter for us. **THIS IS A MAJOR ASSUMPTION**. 


NOTE: An extension of this exercise could be pulling census data from multiple years, to have a more robust representation of the population, mhi, etc. for a given point in time. 

This was some tricky python! Couldn't just use regular groupby, because we lost the indicies. Here's the stackoverflow post I used: https://stackoverflow.com/questions/15705630/get-the-rows-which-have-the-max-value-in-groups-using-groupby

In [49]:
max_year = census_facility.groupby('FACILITY_ID')['YEAR'].transform(max) == census_facility['YEAR']
unique_facilities = census_facility[max_year].drop(columns=['YEAR'])

In [50]:
len(census_facility.FACILITY_ID.unique())

5583

In [51]:
len(unique_facilities)

5583

In [52]:
df = unique_facilities.merge(dataframes['join_sector_ghg'], how='inner', on='FACILITY_ID')

In [53]:
df

Unnamed: 0,FACILITY_ID,LATITUDE,LONGITUDE,CITY,ZIP,COUNTY_FIPS,FACILITY_NAME,REPORTED_SUBPARTS,REPORTED_INDUSTRY_TYPES,FACILITY_TYPES,name,population,mhi,YEAR,SECTOR_ID,SUBSECTOR_ID,GAS_ID,CO2E_EMISSION
0,1007390,38.0251,-122.0639,MARTINEZ,94553,06013,TESORO REFINING AND MARKETING COMPANY GOLDEN E...,"C,P,PP,Y","C,P,PP,Y","Supplier, Direct Emitter","Contra Costa County, California",1147788,103997,2015,13,73,,
1,1007390,38.0251,-122.0639,MARTINEZ,94553,06013,TESORO REFINING AND MARKETING COMPANY GOLDEN E...,"C,P,PP,Y","C,P,PP,Y","Supplier, Direct Emitter","Contra Costa County, California",1147788,103997,2012,13,73,,
2,1007390,38.0251,-122.0639,MARTINEZ,94553,06013,TESORO REFINING AND MARKETING COMPANY GOLDEN E...,"C,P,PP,Y","C,P,PP,Y","Supplier, Direct Emitter","Contra Costa County, California",1147788,103997,2014,13,73,,
3,1008961,37.75832,-121.95969,San Ramon,94583,06013,Chevron MCA 220 Gulf Coast Basin,,,,"Contra Costa County, California",1147788,103997,2014,15,53,1,21826
4,1008961,37.75832,-121.95969,San Ramon,94583,06013,Chevron MCA 220 Gulf Coast Basin,,,,"Contra Costa County, California",1147788,103997,2014,15,53,2,15831.25
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6133,1000855,41.4514,-97.095,SCHUYLER,68661,31037,Cargill Meat Solutions Corporation,"C,II","C,II",Direct Emitter,"Colfax County, Nebraska",10681,64269,2020,14,24,1,46319
6134,1010242,36.8358,-83.0331,St. Charles,24282,51105,"Lone Mountain Processing, Inc.","C,FF","C,FF",Direct Emitter,"Lee County, Virginia",23723,35006,2016,14,61,1,.1
6135,1009436,43.621474,-84.767905,Mount Pleasant,48858,26073,Muskegon Development Company,W,W-ONSH,Onshore Oil & Gas Production,"Isabella County, Michigan",70363,46783,2014,15,53,1,50224.9
6136,1009436,43.621474,-84.767905,Mount Pleasant,48858,26073,Muskegon Development Company,W,W-ONSH,Onshore Oil & Gas Production,"Isabella County, Michigan",70363,46783,2014,15,53,2,26883


More data at least, but again we're making some calls here we'd want to be really sure about in the real world.

Original question: Aggregate total emissions for a facility for each year (i.e. all gases)

#### Cast to correct datatypes

All our columns are strings right now. In order to calculate we need the emissions column to be numeric, and might want other values (e.g. mhi, population, year) numeric as well

Python datatypes: https://docs.python.org/3/library/stdtypes.html


In [57]:
df.dtypes

FACILITY_ID                object
LATITUDE                   object
LONGITUDE                  object
CITY                       object
ZIP                        object
COUNTY_FIPS                object
FACILITY_NAME              object
REPORTED_SUBPARTS          object
REPORTED_INDUSTRY_TYPES    object
FACILITY_TYPES             object
name                       object
population                 object
mhi                        object
YEAR                       object
SECTOR_ID                  object
SUBSECTOR_ID               object
GAS_ID                     object
CO2E_EMISSION              object
dtype: object

In [61]:
df_final = df.astype({'YEAR': 'int32', 'mhi': 'float', 'population': 'int', 'CO2E_EMISSION': 'float'})

In [62]:
df_final.dtypes

FACILITY_ID                 object
LATITUDE                    object
LONGITUDE                   object
CITY                        object
ZIP                         object
COUNTY_FIPS                 object
FACILITY_NAME               object
REPORTED_SUBPARTS           object
REPORTED_INDUSTRY_TYPES     object
FACILITY_TYPES              object
name                        object
population                   int32
mhi                        float64
YEAR                         int32
SECTOR_ID                   object
SUBSECTOR_ID                object
GAS_ID                      object
CO2E_EMISSION              float64
dtype: object

In [64]:
df_final.groupby(['FACILITY_ID', 'YEAR']).agg({'CO2E_EMISSION':['sum']})

Unnamed: 0_level_0,Unnamed: 1_level_0,CO2E_EMISSION
Unnamed: 0_level_1,Unnamed: 1_level_1,sum
FACILITY_ID,YEAR,Unnamed: 2_level_2
1000001,2017,350890.100
1000003,2017,80158.800
1000004,2017,57851.200
1000010,2017,107562.000
1000011,2013,28696.000
...,...,...
1011828,2016,13.410
1011987,2018,26644.800
1012682,2016,19.072
1012685,2016,14.304


**CHALLENGE:**
1. Join the sector and ghg tables to add context for the emissions data
2. calculate the % each gas type makes up for a facilitity (total and/or per year)

## Load

The final step of ETL is **Load** - aka putting the data in a final destination.

Examples:
* write to a local file (e.g. JSON, CSV)
* insert into a database
* write to a file system (e.g. AWS S3 bucket)
* upload to central repository
    * datalake: raw data (unstructured or semi-structured)
    * datawarehouse: all data; has usually been processed to some extent
    * datamart: datawareshouse subset curated for a particular business use case

Here we'll be writing our final dataframe to a local csv file:

In [68]:
path = f'{os.getcwd()}\\data'
if not os.path.exists(path):
    os.makedirs(path) 

In [69]:
df.to_csv('data/census_ghg.csv')

### Sources
* https://restfulapi.net/
* https://www.ibm.com/cloud/learn/etl
* https://www.redhat.com/en/topics/api/what-is-a-rest-api
* https://www.ibm.com/cloud/blog/cloud-data-lake-vs-data-warehouse-vs-data-mart