# ETL_EDA
This file records the process of acquiring raw data, traforming them, and loading them into a MongoDB. The data are store (almost) in their raw form.

## 1. Raw Data from BPA 
The [dataset](https://transmission.bpa.gov/business/operations/Wind/baltwg.txt) is a continuously updated txt file in csv format. It contains the energy producation and load in the last 5 days. There will be some blank lines at the end for today. It can be retrieved simply by making `requests` without parameters. For more complicated API call, you may need to add query parameters.

In [7]:
import requests

url = "https://transmission.bpa.gov/business/operations/Wind/baltwg.txt"
req = requests.get(url, timeout=0.5)
req.raise_for_status()
text = req.text
print(text[:1000])
print('...')
print(text[-500:])

BPA Balancing Authority Load & Total Wind Generation
at 5-minute intervals, last 7 days
Dates: 07Nov2020 - 14Nov2020 (last updated 13Nov2020 23:06:00) Pacific Time
Based on 5-min MW readings from the BPA SCADA system for points 45583, 79687, 79682, 164377, 70681

This represents loads and resources in BPA's Balancing Authority (BA) including some that are not BPA's.
It does not include BPA loads served by transfer, scheduled out of region,
or scheduled to customers with their own BAs such as Seattle and Tacoma

BPA/Technical Operations (TOT-OpInfo@bpa.gov)

Date/Time       	Load	Wind	Hydro	Fossil/Biomass	Nuclear
11/07/2020 00:00	5455	952	5557	480	1162
11/07/2020 00:05	5464	935	5630	476	1162
11/07/2020 00:10	5443	904	5731	475	1163
11/07/2020 00:15	5428	873	5719	475	1166
11/07/2020 00:20	5442	870	5689	482	1165
11/07/2020 00:25	5378	873	5687	488	1169
11/07/2020 00:30	5401	880	5665	480	1167
11/07/2020 00:35	5383	868	5648	482	1167
11/07/2020 00:40	5375	881	5668	482	1167

## 2. Raw Data to Documents/Dicts
Using pandas, it is simple to parse a in-memory string. The first few lines of description need to be skipped. Datetime conversion is made and blank lines are dropped. Now the data can be easily converted to a list of dicts which is what we want for the MongoDB.

In [8]:
import pandas
from io import StringIO

df = pandas.read_csv(StringIO(text), skiprows=11, delimiter='\t')
df.columns = df.columns.str.strip()             # remove space in columns name
df['Datetime'] = pandas.to_datetime(df['Date/Time'])
df.drop(columns=['Date/Time'], axis=1, inplace=True)
df.dropna(inplace=True)  
df.head()

Unnamed: 0,Load,Wind,Hydro,Fossil/Biomass,Nuclear,Datetime
0,5455.0,952.0,5557.0,480.0,1162.0,2020-11-07 00:00:00
1,5464.0,935.0,5630.0,476.0,1162.0,2020-11-07 00:05:00
2,5443.0,904.0,5731.0,475.0,1163.0,2020-11-07 00:10:00
3,5428.0,873.0,5719.0,475.0,1166.0,2020-11-07 00:15:00
4,5442.0,870.0,5689.0,482.0,1165.0,2020-11-07 00:20:00


## 3. Upsert MongoDB
If we fetch the data frequently, there are lots of duplicate data entry between each run. The de-duplication happens at insertion. The MongoDB API to use is `collection.replace_one(filter=..., replacement=..., upsert=True)`. The statement matches a document in MongoDB with `filter`, replaces it with `replacement` if the document exists or inserts `replacement` into the database if `filter` matches nothing. 

In [9]:
import pymongo

client = pymongo.MongoClient()

In [10]:
db = client.get_database("energy")
collection = db.get_collection("energy")
update_count = 0
for record in df.to_dict('records'):
    result = collection.replace_one(
        filter={'Datetime': record['Datetime']},    # locate the document if exists
        replacement=record,                         # latest document
        upsert=True)                                # update if exists, insert if not
    if result.matched_count > 0:
        update_count += 1
print(f"rows={df.shape[0]}, update={update_count}, "
      f"insert={df.shape[0]-update_count}")

ServerSelectionTimeoutError: localhost:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 5faf82225c58919a06905d6a, topology_type: Single, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused',)>]>