![Many pancakes vs single pancake](images/pancakes.png)

In [1]:
from csv import DictReader
import geopandas as gpd
import json
import numpy as np
import pandas as pd
import itertools

from camping.mocks.request import RequestsMock
from camping.util.scraper import Scraper
from camping.util.distance import distance_merge

def max_col_width(w=100):
    pd.set_option('display.max_colwidth', w)

ridb_facilities_url = "https://ridb.recreation.gov/api/v1/facilities"

### Scaling our prototype code

What are some aspects that might not scale?   
*   
  
Opportunities to parallelize?  
*   

#### Exploration code for reference

In [2]:
def transform_campsites(campsite_json):
    # First, translate the ATTRIBUTES list into a list of dict of {AttributeName: AttributeValue}
    for i in range(len(campsite_json)):
        campsite_json[i]['AttributeDict'] = [{item['AttributeName']: item['AttributeValue']} for item in campsite_json[i]['ATTRIBUTES']]

    # Next, convert the AttriuteDict to columns
    df = pd.DataFrame(campsite_json)[['ATTRIBUTES', 'CampsiteID', 'CampsiteName', 'FacilityID']]
    df['AttributeDict'] = df['ATTRIBUTES'].apply(lambda x: {item['AttributeName']: item['AttributeValue'] for item in x})
    norm = pd.json_normalize(df['AttributeDict'])
    df = df[['CampsiteID','CampsiteName','FacilityID']].join(norm)
    return df.replace({np.nan: ""})


ridb_facilities_url = "https://ridb.recreation.gov/api/v1/facilities"
params = {"activity_id":9, "state":"OR"}
headers = {"accept": "application/json", "apikey": "key"}
campground_info = pd.DataFrame()

# Get RIDB Facilities with camping
print("Getting facility data")
response = RequestsMock.get(ridb_facilities_url, params, headers=headers)
camping_json  = json.loads(response.text)
df_ridb_camping = pd.DataFrame(camping_json['RECDATA'])

# Get campsite data for each facility
print("Getting campsite data")
for facility in camping_json['RECDATA']:
    campground_url = f"{ridb_facilities_url}/{facility['FacilityID']}/campsites"
    resp = RequestsMock.get(campground_url, headers=headers)
    campsites = json.loads(resp.text)
    if len(campsites['RECDATA']) > 0:
        df_campsites = transform_campsites(campsites['RECDATA'])
        campground_info = campground_info.append(df_campsites.merge(df_ridb_camping, on='FacilityID', how='left'))

# Get NF webscraper data
print("Getting NF data")
nf_data = []
with open('../data/NF_sites/OR_sitelist.csv') as f:
    reader = DictReader(f)
    for row in reader:
        sc = Scraper(row['site_url'], row['site_name'])
        nf_data.append(sc.scrape())
nf_df = pd.DataFrame(nf_data)

print("Merging data")
merged_sites = distance_merge(nf_df, campground_info, 2000, 'ridb', 'nf')
merged_sites.drop(columns=['FacilityLatitude_nf', 'FacilityLongitude_nf', 'index_nf', 'FacilityLongitude_ridb', 'FacilityLatitude_ridb', 'FacilityName', 'geometry'], inplace=True)
combined = campground_info.merge(merged_sites, how='left', on=['FacilityID','CampsiteID'])
combined = combined.replace(np.nan, '')

Getting facility data
Getting campsite data
Getting NF data
Merging data


  return _prepare_from_string(" ".join(pjargs))
  return _prepare_from_string(" ".join(pjargs))



### Pipelines
Data pipelines split data processing into discrete steps.  
Pipelines enable scaling up of data processing:  
* Reduced latency through parallel processing
* Ability to take advantage of "near limitless" cloud compute
* Generalized pipeline steps can be reused 
* Resiliency - data can be saved to disk or cloud storage after a stage to allow retry 


To get a sense of how to break our code into steps, lets take a look at the data processing flow of our prototype:

![Prototype Flow](images/prototype_flow.png)


Data processing steps in our prototype:
* Extracting the data from source
* Transforming campsite data
* Merging data into a single table
* Storing the table in a database (not pictured)

Consider...
* What happens if there is an error in convert attributes for one campsite?
* How long will it take to run if a facility has a large number of campsites? $$


What can we parallelize ?
* Process campsite data in batches
* Data source extraction - NF web scraper can run independently of RIDB API calls
* Run multiple states in parallel

### Scaling up - data structures and formats

**Ask yourself** 
* Are the data structures I'm using optimized for large scale?
* The data pipeline is a means to an end - what is acting on the data being produced? For example, APIs surface json to front end frameworks, do you really need to put all of this information in a table?


##### Getting facilities data
We're converting the facilities to a dataframe so we can explore it, but its not necessary for the pipeline
```python
# Get RIDB Facilities with camping
response = RequestsMock.get(ridb_facilities_url, params, headers=headers)
camping_json  = json.loads(response.text)
> df_ridb_camping = pd.DataFrame(camping_json['RECDATA'])
```
  
---------  
  
##### Campsite transformation code
We're doing some expensive operations to convert the ATTRIBUTES list of dict into columns. This was really useful for data exploration, but unnecessarily expensive.  
  
Also, surfacing the attributes as table columns makes our system brittle. Consider if new attributes are added - if we had to convert these to columns it would require a database migraiton for our pipeline output.

```python
def transform_campsites(campsite_json):
>    df = pd.DataFrame(campsite_json)[['ATTRIBUTES', 'CampsiteID', 'CampsiteName', 'FacilityID']]
    
    # First, translate the ATTRIBUTES list into a list of dict of {AttributeName: AttributeValue}
>    df['AttributeDict'] = df['ATTRIBUTES'].apply(lambda x: {item['AttributeName']: item['AttributeValue'] for item in x})
    
    # Create a dataframe with the keys of AttributeDict as column names, values as rows
>    norm = pd.json_normalize(df['AttributeDict'])
    
    # Join the normalized data back to the rest of the campground data, dropping the ATTRIBUTES column
>    df = df[['CampsiteID','CampsiteName','FacilityID']].join(norm)
    return df.replace({np.nan: ""})
```

**Keep in mind**  
Keeping data in its original format will help your pipeline be robust to data source changes


In [3]:
# Eliminate expensive apply and join, improve resiliency to data source changes
def transform_campsites(campsite_json):
    for i in range(len(campsite_json)):
        campsite_json[i]['AttributeDict'] = [{item['AttributeName']: item['AttributeValue']} for item in campsite_json[i]['ATTRIBUTES']]
    return campsite_json
        

### Scaling up through parallel processing - Infrastructure

**Ask yourself:** What parts of my data processing flow are independent?

* We can process the NF web data in parallel with RIDB
* We can also process each state independently

The figure below shows independent pipelines running for Oregon and Washington

**Keep in mind**
* How will you track API use in a distributed system like this?

![RIDB pipeline](images/OR_WA_pipeline.png)

### Scaling up through parallel processing - Distributed data strucutres

**Ask yourself**  
What processes operate over a large number of the same data? For loops are a good place to look.

[Spark](https://spark.apache.org/) and [Dask](https://docs.dask.org/en/latest/) are examples of big data processing engines that have distributed data frame structures, enabling you to spread computation across many compute nodes to speed up runtime.  

In our case, campsite processing is a good candidate for this, as well as the NF web scraping. 

**Keep in mind**  
Similar to parallel infrastrucutre, keep in mind the impact of parallelizing calls to external services 

![Batch processing](images/parallel.png)

### Scaling up - building in resiliency

**Ask yourself** What are potential points of failure in my pipeline, and how could caching data help reduce time and expenses?

Consider what happens in our prototype if a merge fails - we need to go back to the beginning and regenrate all the data. This can be costly when working with large scale data, building in caching and retry logic can help.

When thinking about separating steps into independent pipeline tasks:
* Similar to thinking about object oriented principles when coding, think about single use and encapsulation when breaking data processing code into pipeline stages
* Consider persisting data that takes a long time to generate  

![Batch processing](images/retry_on_fail.png)

### Scaling up with configurable components

**Ask yourself** 
* If I were thinking about my data processing code as a function, what would make sense to parameterize?
* How might this system scale in breadth? For example, perhaps we want to find a place to camp that is at a facility that also has boating (`activity_id: 6`). It might be interesting to know what events are going on at the facility as well, availabile at the `/facilities/{facilityId}/events` endpoint. 

Workflow management tools like Apache [Airflow](https://airflow.apache.org/) can help you setup these kinds of operations

In [6]:
def get_ridb_data(url, headers, params={}):
    response = RequestsMock.get(url, params, headers)
    if response.status_code == 200:
        result = json.loads(response.text)
        return result['RECDATA']
    return {}

RIDB_FACILITIES_URL = "https://ridb.recreation.gov/api/v1/facilities"
HEADERS = {"accept": "application/json", "apikey": "key"}

pipeline_config = [
    {'label': 'OR', 'nf_sites': '../data/NF_sites/OR_sitelist.csv', 'params':{'state': 'OR', 'activity_id': '9,6'}},
    {'label': 'WA', 'nf_sites': '../data/NF_sites/WA_sitelist.csv', 'params':{'state': 'WA', 'activity_id': '9'}}]


# For demonstrating config
def run_pipeline(config):
    results = {}
    
    # Each entry in the config could spawn a new pipeline job
    for item in config:
        # Each of these method calls could be a pipeline step
        facilities = get_ridb_data(RIDB_FACILITIES_URL, HEADERS, item['params'])
        campsite_data = get_campsite_data(facilities)
        nf_data = get_nf_data(item['nf_sites'])
        
        # Additional steps to process other data sources 
        
        results[item['label']] = merge_data(facilities, campsite_data, nf_data)
    return results


def get_campsite_data(facilities):
    data = []
    for facility in facilities:
        url = f"RIDB_FACILITIES_URL/{facility['FacilityID']}/campsites"
        campsite_data = get_ridb_data(url, HEADERS)
        if campsite_data != []:
            data.append(transform_campsites(campsite_data))
    return data


# Keeping transformation code seperate makes it easier to test and modify without impacting
# extraction and loading code
def transform_campsites(campsite_json):
    for i in range(len(campsite_json)):
        campsite_json[i]['AttributeDict'] = [{item['AttributeName']: item['AttributeValue']} for item in campsite_json[i]['ATTRIBUTES']]
    return campsite_json
      
    
def get_nf_data(file_name):
    nf_data = []
    with open(file_name) as f:
        reader = DictReader(f)
        for row in reader:
            sc = Scraper(row['site_url'], row['site_name'])
            nf_data.append(sc.scrape())
    return pd.DataFrame(nf_data) 

def merge_data(facilities, campsite_data, nf_data):
    pass