# ETL_EDA
This file records the process of acquiring raw data, traforming them, and loading them into a MongoDB. The data are store (almost) in their raw form.

## 1. Raw Data from BPA 
The [dataset](https://transmission.bpa.gov/business/operations/Wind/baltwg.txt) is a continuously updated txt file in csv format. It contains the energy producation and load in the last 5 days. There will be some blank lines at the end for today. It can be retrieved simply by making `requests` without parameters. For more complicated API call, you may need to add query parameters.

In [19]:
import requests

url = "https://transmission.bpa.gov/business/operations/Wind/baltwg.txt"
req = requests.get(url, timeout=0.5)
req.raise_for_status()
text = req.text
print(text[:1000])
print('...')
print(text[-500:])

BPA Balancing Authority Load & Total Wind Generation
at 5-minute intervals, last 7 days
Dates: 19Nov2019 - 26Nov2019 (last updated 25Nov2019 17:15:47) Pacific Time
Based on 5-min MW readings from the BPA SCADA system for points 45583, 79687, 79682, 164377, 70681

This represents loads and resources in BPA's Balancing Authority (BA) including some that are not BPA's.
It does not include BPA loads served by transfer, scheduled out of region,
or scheduled to customers with their own BAs such as Seattle and Tacoma

BPA/Technical Operations (TOT-OpInfo@bpa.gov)

Date/Time       	Load	Wind	Hydro	Fossil/Biomass	Nuclear
11/19/2019 00:00	5391	710	5149	970	1165
11/19/2019 00:05	5318	791	4958	970	1160
11/19/2019 00:10	5305	861	4803	972	1160
11/19/2019 00:15	5255	910	4736	974	1160
11/19/2019 00:20	5261	908	4715	971	1160
11/19/2019 00:25	5245	928	4680	972	1160
11/19/2019 00:30	5241	914	4646	971	1161
11/19/2019 00:35	5246	930	4668	968	1159
11/19/2019 00:40	5317	909	4672	968	1161

## 2. Raw Data to Documents/Dicts
Using pandas, it is simple to parse a in-memory string. The first few lines of description need to be skipped. Datetime conversion is made and blank lines are dropped. Now the data can be easily converted to a list of dicts which is what we want for the MongoDB.

In [20]:
import pandas
from io import StringIO

df = pandas.read_csv(StringIO(text), skiprows=11, delimiter='\t')
df.columns = df.columns.str.strip()             # remove space in columns name
df['Datetime'] = pandas.to_datetime(df['Date/Time'])
df.drop(columns=['Date/Time'], axis=1, inplace=True)
df.dropna(inplace=True)  
df.head()

Unnamed: 0,Load,Wind,Hydro,Fossil/Biomass,Nuclear,Datetime
0,5391.0,710.0,5149.0,970.0,1165.0,2019-11-19 00:00:00
1,5318.0,791.0,4958.0,970.0,1160.0,2019-11-19 00:05:00
2,5305.0,861.0,4803.0,972.0,1160.0,2019-11-19 00:10:00
3,5255.0,910.0,4736.0,974.0,1160.0,2019-11-19 00:15:00
4,5261.0,908.0,4715.0,971.0,1160.0,2019-11-19 00:20:00


## 3. Upsert MongoDB
If we fetch the data frequently, there are lots of duplicate data entry between each run. The de-duplication happens at insertion. The MongoDB API to use is `collection.replace_one(filter=..., replacement=..., upsert=True)`. The statement matches a document in MongoDB with `filter`, replaces it with `replacement` if the document exists or inserts `replacement` into the database if `filter` matches nothing. 

In [14]:
import pymongo

client = pymongo.MongoClient()

In [21]:
db = client.get_database("energy")
collection = db.get_collection("energy")
update_count = 0
for record in df.to_dict('records'):
    result = collection.replace_one(
        filter={'Datetime': record['Datetime']},    # locate the document if exists
        replacement=record,                         # latest document
        upsert=True)                                # update if exists, insert if not
    if result.matched_count > 0:
        update_count += 1
print(f"rows={df.shape[0]}, update={update_count}, "
      f"insert={df.shape[0]-update_count}")

rows=1936, update=1928, insert=8
