# ETL_EDA
This file records the process of acquiring raw data, traforming them, and loading them into a MongoDB. The data are store (almost) in their raw form.

## 1. Raw Data from BPA 
The [dataset](https://transmission.bpa.gov/business/operations/Wind/baltwg.txt) is a continuously updated txt file in csv format. It contains the energy producation and load in the last 5 days. There will be some blank lines at the end for today. It can be retrieved simply by making `requests` without parameters. For more complicated API call, you may need to add query parameters.

In [1]:
import requests

url = "https://transmission.bpa.gov/business/operations/Wind/baltwg.txt"
# this is a very simple url
# if not, concate and do the same thing
req = requests.get(url, timeout=0.5)
req.raise_for_status()
text = req.text
print(text[:1000])
print('...')
print(text[-500:])

BPA Balancing Authority Load & Total Wind Generation
at 5-minute intervals, last 7 days
Dates: 20Nov2019 - 27Nov2019 (last updated 26Nov2019 06:35:46) Pacific Time
Based on 5-min MW readings from the BPA SCADA system for points 45583, 79687, 79682, 164377, 70681

This represents loads and resources in BPA's Balancing Authority (BA) including some that are not BPA's.
It does not include BPA loads served by transfer, scheduled out of region,
or scheduled to customers with their own BAs such as Seattle and Tacoma

BPA/Technical Operations (TOT-OpInfo@bpa.gov)

Date/Time       	Load	Wind	Hydro	Fossil/Biomass	Nuclear
11/20/2019 00:00	5657	272	5261	970	1163
11/20/2019 00:05	5659	275	5348	971	1170
11/20/2019 00:10	5616	284	5360	971	1163
11/20/2019 00:15	5688	277	5402	975	1163
11/20/2019 00:20	5693	259	5403	973	1165
11/20/2019 00:25	5661	253	5407	975	1164
11/20/2019 00:30	5659	257	5400	976	1164
11/20/2019 00:35	5651	268	5377	975	1163
11/20/2019 00:40	5649	267	5389	975	1167

## 2. Raw Data to Documents/Dicts
Using pandas, it is simple to parse a in-memory string. The first few lines of description need to be skipped. Datetime conversion is made and blank lines are dropped. Now the data can be easily converted to a list of dicts which is what we want for the MongoDB.

In [2]:
import pandas
from io import StringIO

df = pandas.read_csv(StringIO(text), skiprows=11, delimiter='\t') # we need to check what delimiters are used in the data
df.columns = df.columns.str.strip()             # remove space in columns name
df['Datetime'] = pandas.to_datetime(df['Date/Time'])
df.drop(columns=['Date/Time'], axis=1, inplace=True)
df.dropna(inplace=True)  
df.head()

Unnamed: 0,Load,Wind,Hydro,Fossil/Biomass,Nuclear,Datetime
0,5657.0,272.0,5261.0,970.0,1163.0,2019-11-20 00:00:00
1,5659.0,275.0,5348.0,971.0,1170.0,2019-11-20 00:05:00
2,5616.0,284.0,5360.0,971.0,1163.0,2019-11-20 00:10:00
3,5688.0,277.0,5402.0,975.0,1163.0,2019-11-20 00:15:00
4,5693.0,259.0,5403.0,973.0,1165.0,2019-11-20 00:20:00


## 3. Upsert MongoDB
If we fetch the data frequently, there are lots of duplicate data entry between each run. The de-duplication happens at insertion. The MongoDB API to use is `collection.replace_one(filter=..., replacement=..., upsert=True)`. The statement matches a document in MongoDB with `filter`, replaces it with `replacement` if the document exists or inserts `replacement` into the database if `filter` matches nothing. 

In [3]:
import pymongo

client = pymongo.MongoClient()

In [21]:
db = client.get_database("energy")
collection = db.get_collection("energy")
update_count = 0
for record in df.to_dict('records'):
    result = collection.replace_one(
        filter={'Datetime': record['Datetime']},    # locate the document if exists
        replacement=record,                         # latest document
        upsert=True)                                # update if exists, insert if not
    if result.matched_count > 0:
        update_count += 1
print(f"rows={df.shape[0]}, update={update_count}, "
      f"insert={df.shape[0]-update_count}")

rows=1936, update=1928, insert=8
