In [1]:
import pandas as pd
import numpy as np
import os
import requests
import datetime as dt
import time
import json
from config import noaa_token as token

In [2]:
#variables
base = 'https://www.ncdc.noaa.gov/cdo-web/api/v2/data'
header = {'token':token,
          'Content-Type':'application/json'}

datasets = {'Daily':'GHCND',
           'Monthly':'GSOM'}

state_ids = {'PA':'FIPS:42',
            'OH':'FIPS:39',
            'MI':'FIPS:26',
            'IL':'FIPS:17',
            'WI':'FIPS:55',
            'MN':'FIPS:27',
            'IA':'FIPS:18',
            'NY':'FIPS:36'}

lake_st = {'Superior':['MI','MN','WI'],
          'Michigan':['IA','IL','MI','WI'],
          'Huron':['MI'],
          'Erie':['IA','NY','OH','PA'],
          'Ontario':['NY']}

files_dir = os.path.join('..','data_files','api_calls')

In [3]:
variables = pd.read_csv(os.path.join('..', 'data_files','prep_data','variables_to_use.csv'))
variables['dataset'] = ['GSOM','GSOM','GHCND','GHCND',np.nan,'GHCND',np.nan]
variable_ids = variables[['id','dataset']].set_index('id')
variable_ids

Unnamed: 0_level_0,dataset
id,Unnamed: 1_level_1
DT32,GSOM
DX32,GSOM
PRCP,GHCND
SNWD,GHCND
THIC,
TMIN,GHCND
WDMV,


In [4]:
phases_df = pd.read_csv(os.path.join('..', 'data_files','prep_data','data_collection_phases.csv'))
phases_df.head()

Unnamed: 0.1,Unnamed: 0,Start,End,Entries,Days in Range,% Coverage
0,0,1972-12-19,1973-04-07,18,109,16.51
1,1,1973-12-30,1974-05-03,20,124,16.13
2,2,1975-01-01,1975-04-25,19,114,16.67
3,3,1975-12-22,1976-04-21,20,121,16.53
4,4,1976-12-15,1977-05-05,23,141,16.31


To collect all of the calls needed to gather the data for a given lake and year range (called phases from here on), we've created a series of functions to create and group the calls in a series of nested lists.

The top list represents individual variables, as seen in the variables_id dataframe above.
The second layer list represents individual states that border the selected lake, identified in a dictionary in the second cell.
The third layer is comprised of dictionaries, one per phase given the state and variable in the above layers.

Each of these functions are named their layer (ex: variable, state, phase) followed by 'load' as the dictionaries will be used as payloads for the api calls.

In [5]:
#Returns list of calls, 1 call per phase for a specified state and data_id
def phase_loads(st,phase_start,phase_end,data_id):
    load = {}
    st_load = []
    for i in range(phase_start,phase_end):
        load = {'limit':1000,
        'offset':0,
        'includeAttributes':'true',
        'datatypeid': data_id,
        'datasetid': variable_ids.loc[data_id],
        'startdate': phases_df.iloc[i]['Start'],
        'enddate': phases_df.iloc[i]['End'],
        'locationid': state_ids[st]}
        st_load.append(load)
    return st_load

In [6]:
#Returns nested list of calls, st/phase
def state_loads(lake,phase_start,phase_end,data_id):
    """lake is a string name of a Great Lake wihtout the preceding 'Lake'.
    phase_start should be an index from the data_collection_phases.csv (int).
    phase_end is exclusive index from the same csv (int).
    data_id should be an ID from the variables_to_use.csv.
    Returns list of list of of dictionaries to be used for a series of api calls."""
    lk_load = []
    lake_borders = lake_st[lake]
    for e in lake_borders:
        lk_load.append(phase_loads(e,phase_start,phase_end,data_id))
    return lk_load

In [7]:
def var_loads(lake,phase_start,phase_end,dataset=None):
    var_load = []
    for index, row in variable_ids.iterrows():
        if isinstance(row['dataset'],str):
            if row['dataset'] == dataset or dataset is None:
                var_load.append(state_loads(lake,phase_start,phase_end,index))
    print(f'{lake} loads collected.')
    return var_load

To limit the number of api calls needed, we are saving batches at each level in specific directories.
To this end, we need to generate filenames in a standard way and then verify whether the file already exists.

In [8]:
def is_not_file(filename):
    if os.path.isfile(filename):
        print(f"There's already a file for this: {filename}")
        return False
    else:
        return True

The api limits the results within the json to a max of 1000 entries.
This means, for a given phase/state/variable, we need to make a series of calls, updating the offset to pull new data.

To know how far to go, we have a get_c function, to identify the total number of entries.
Then there is the function that makes the individual call, max size = 1000.

In [9]:
def get_c(load):
    load['limit'] = 5
    load['offset']=0
    c = requests.get(base,headers=header,params=load).json()['metadata']['resultset']['count']
    load['limit'] = 1000
    return c

In [10]:
def export(file_path,file,data):
    if is_not_file(os.path.join(file_path,file+'.csv')):
        data.to_csv(os.path.join(file_path,file+'.csv'))
    return

In [11]:
#Exports 1 call of max 1000 entries to csv in data_files/api_calls/individual_calls
def call(load,filename):
    r = requests.get(base,headers=header,params=load).json()
    results = r['results']
    data = pd.DataFrame(results)
    return data

To limit the number of api calls, here is where we conditionally call.
This function also provides progress percentage, to keep the user updated on where they are in the process.

In [12]:
def check(count,load,file):
    file = file + '_' + str(load['offset'])
    df = pd.DataFrame()
    if is_not_file(os.path.join(files_dir,'individual_calls',file)) & count>0:
        df = call(load,file)
    return df

Since we layered the calls in nested lists, we need to unpack them slowly in a controlled manner.
We share which phase/state/variable we are currently processing.

In [13]:
#Exports 1 call, all entries in a given time range to csv in data_files/api_calls/call_batches
#Takes dictionary
def phase_call(load,filebase,status=True):
    phase = phases_df.index[phases_df['Start']==load['startdate']].tolist()[0]
    file = filebase + '_' + str(phase)
    
    try:
        count = get_c(load)
    except:
        return pd.DataFrame()
    
    load['offset'] = 0
    total_data = pd.DataFrame()
    if status:
        print(f"Phase {phase} start: ")
    while load['offset'] < count:
        if status:
            print(f"Initializing call {(load['offset']//1000)+1} of {(count//1000)+1}")
        try:
            df = check(count,load,file)
        except:
            export(files_dir,f'FAILED_{dt.datetime.now()}_'+file,total_data)
            print(f"Failed at {load['offset']}, {phase}, {load['locationid']}, {load['datatypeid']}")
            return total_data
        
        total_data = total_data.append(df)
        load['offset'] = load['offset']+load['limit']
    
    if status:
        print(f"Phase {phase} complete.")
    return total_data

In [14]:
#Takes list of dictionaries
def state_calls(st_ls,filebase,status=True):
    file = filebase + '_' + st_ls[0]['locationid']
    total_data = pd.DataFrame()
    if status:
        print(f"State {st_ls[0]['locationid']} start.")
    for i in range(len(st_ls)):
        if status:
            print(f"Initializing call {i+1} of {len(st_ls)}")
        try:
            total_data = total_data.append(phase_call(st_ls[i],file,status))
        except:
            export(files_dir,f'FAILED_{dt.datetime.now()}_'+file,total_data)
            print(f"Failed at {load['locationid']}, {load['datatypeid']}")
            return total_data
        
    if status:
        print(f"State {st_ls[0]['locationid']} complete.")
    return total_data

In [15]:
#Exports a series call, all entries in a given time range for all states bordering a specific lake 
# to csv in data_files/api_calls
#Takes list of list of dictionaries
def variable_calls(var_ls,filebase,status=True):
    file = filebase + '_' + var_ls[0][0]['datatypeid']
    total_data = pd.DataFrame()
    if status:
        print(f"Variable {var_ls[0][0]['datatypeid']} start.")
    for i in range(len(var_ls)):
        if status:
            print(f"Initializing call {i+1} of {len(var_ls)}")
        try:
            total_data = total_data.append(state_calls(var_ls[i],file,status))
        except:
            export(files_dir,f'FAILED_{dt.datetime.now()}_'+file,total_data)
            print(f"Failed at {load['datatypeid']}")
            return total_data
        
    if status:
        print(f"Variable {var_ls[0][0]['datatypeid']} complete.")
    return total_data

In [16]:
#Takes list of list of list of dictionaries
def lake_calls(lk_ls,lake_name,status=True):
    phase_start = lk_ls[0][0][0]['startdate']
    phase_end = lk_ls[-1][-1][-1]['enddate']
    file = lake_name + '_' + phase_start + '_' + phase_end
    total_data = pd.DataFrame()
    if status:
        print(f"Lake {lake_name} start.")
    for lake in lk_ls:
        try:
            total_data = total_data.append(variable_calls(lake,file,status))
        except:
            export(files_dir,f'FAILED_{dt.datetime.now()}_'+file,total_data)
            print(f"Failed at {lake}")
            return total_data
    if status:
        print(f"Lake {lake_name} complete.")
    return total_data

In [20]:
def make_calls(lakes,start_phase,end_phase,dataset,status=True):
    total_data = pd.DataFrame()
    for lake in lakes:
        file = f'{lake}_{start_phase}-{end_phase}_{dataset}'
        if is_not_file(os.path.join(files_dir,file+'.csv')):
            total_data = total_data.append(
                lake_calls(
                    var_loads(
                        lake,start_phase,end_phase,dataset=dataset),
                    lake,status))
            export(files_dir,file,total_data)
        if status:
            print(f"{lake} finished at {dt.datetime.now()}")
    print(f'{len(total_data)} records gathered and exported.')
    return total_data

After all that, we're finally down to the action.
Here is where we'll generate the nested lists for our chosen lake and in the following cell, we'll use all the code you've skimmed past to pull, save, and export the data.
Just be careful, there's no error handling at all here, if you're not careful, you may stop the whole process!

In [23]:
start_phase = 22
end_phase = 27
lakes = ['Superior','Michigan','Huron','Erie','Ontario']
dataset='GSOM'

start = dt.datetime.now()
make_calls(lakes,start_phase,end_phase,dataset,status=False)
finish = dt.datetime.now()

Superior loads collected.
Failed at 0, 24, FIPS:26, DT32
Failed at 0, 25, FIPS:55, DT32
Failed at 0, 26, FIPS:55, DT32
Failed at 0, 22, FIPS:26, DX32
Failed at 0, 24, FIPS:26, DX32
Failed at 0, 25, FIPS:26, DX32
Failed at 0, 26, FIPS:26, DX32
Failed at 0, 23, FIPS:27, DX32
Failed at 0, 25, FIPS:27, DX32
Failed at 0, 25, FIPS:55, DX32


NameError: name 'data' is not defined

In [22]:
(finish-start).seconds/60

0.0