# Grid Creation

Requires local Nominatim server covering the continental United States at the minimum. Because tens of thousands of reverse geolocation queries must be made and then organized, the code below does many operations asynchronously. See https://ipython-books.github.io/59-distributing-python-code-across-multiple-cores-with-ipython/ and https://ipython-books.github.io/510-interacting-with-asynchronous-parallel-tasks-in-ipython/ for setup and more information.

In [1]:
import math
import time
import _pickle as pickle  # using cPickle
import urllib
import datetime
import ipyparallel
import ipywidgets

import numpy as np
import pandas as pd

from geopy import Point, distance
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderServiceError, GeocoderTimedOut
from IPython.display import clear_output, display

## Coordinate to Zip

In [None]:
def coord_to_zip(coord):
    """
    Returns zip code from entered coordinate. Zip code
    is easier to map to fips code because otherwise requires
    both state and county and county names are often
    ambiguous and require manually editting to allow mapping
    to fips (e.g. St. to Saint or vice versa).
    coord: tuple consisting of (latitude, longitude)
    """
    try:
        time.sleep(0)
        raw = geolocator.reverse(coord, timeout=15).raw
        # occurs when coord is outside available region
        if 'error' in raw:
            return (coord, None)
        # occurs when coord is not in the United States
        if raw['address']['country_code'] != 'us':
            return (coord, None)
        return (coord, raw['address']['postcode'])
    # occurs when address, or postcode is not available
    except KeyError as e:
        return (coord, None)
    # catch all for potential geopy exceptions but ideally should not occur
    except Exception as e:
        return (coord, e)

In [None]:
def coordinate_generator(min_lat, max_lat, min_lon, max_lon, dist):
    """
    Generates (latitude, longitude) tuples bounded by the given
    minimum and maximum latitudes and longitudes.
    """
    start = Point(max_lat, min_lon)
    point = start
    
    def point_to_tuple(point):
        return (point.latitude, point.longitude)
    
    coord = point_to_tuple(point)
    ver_dist = distance.distance(miles=dist)
    hor_dist = distance.distance(miles=dist)
    coords = list() # list of coords forming the grid
    
    while True:
        # eastmost bound reached
        if coord[1] > max_lon:
            # go back to starting point and move south
            point = ver_dist.destination(point=start, bearing=180)
            # increment vertical distance
            ver_dist += distance.distance(miles=5)
        # southmost bound reached
        elif coord[0] < min_lat:
            break
        yield coord
        # move east
        point = hor_dist.destination(point=point, bearing=90)
        coord = point_to_tuple(point)

In [None]:
geolocator = Nominatim(domain='localhost:8080/nominatim', scheme='http')

In [None]:
geolocator.reverse((37.873522, -122.257692), timeout=15).raw

From https://en.wikipedia.org/wiki/List_of_extreme_points_of_the_United_States, for the 48 contiguous states:
* northernmost latitude: 49°23′04.1″
* southernmost latitude: 24°26.8′
* westermost longitude: -124°47.1′
* easternmost longitude: -66°56′49.3″

In [None]:
min_lat = 24
max_lat= 50
min_lon = -125
max_lon = -66
dist = 5
coords = list(coordinate_generator(min_lat, max_lat, min_lon, max_lon, dist))

In [None]:
%%capture
rc = ipyparallel.Client()
view = rc.load_balanced_view()

with rc[:].sync_imports():
    import time

rc[:].push(dict(
    geolocator=geolocator
))

In [None]:
ar = view.map_async(coord_to_zip, coords)

In [None]:
num_coords = len(coords)

In [None]:
# ctrl + enter on this cell to see progress without blocking kernel
# takes around 15 minutes with 12 logical processors (i7 8700k) and 16gb ram
print(f'Time Elapsed: {datetime.timedelta(seconds=math.ceil(ar.elapsed))}')
print(f'Coordinates Completed: {ar.progress}/{num_coords}')
print(f'Completed {round(ar.serial_time/ar.wall_time, 2)}x faster than serial computation' if ar.ready() else 'Still Running')

In [None]:
# will block the kernel; interrupt the kernel to unblock
ar.wait_interactive()
print(f'Completed {round(ar.serial_time/ar.wall_time, 2)}x faster than serial computation' if ar.ready() else 'Still Running')

In [None]:
coord_zip_dict = dict()
exception_coords = list()
for i, (coord, maybe_zip) in enumerate(ar):
    print(f'processing grid points: {i+1}/{num_coords}')
    clear_output(wait=True)
    if isinstance(maybe_zip, Exception):
        exception_coords.append(coord)
    elif maybe_zip is not None:
        coord_zip_dict[coord] = maybe_zip

In [None]:
# ideally none of the coords should raise exceptions
# in the case there are, try to manually rerun them
exception_coords

In [None]:
with open('pickles/coord_zip_dict.pkl', 'wb') as file:
    pickle.dump(coord_zip_dict, file)

### Extra Grids

In [None]:
# create slightly different grids for testing pipeline robustness
# min_lat = 24
# max_lon = -66
# max_lat_options = [50 + 0.1 * i for i in range(1, 11)]
# min_lon_options = [-125 - 0.1 * i for i in range(1, 11)]
# dist = 5
# exception_coords = list()

# for i, max_lat in enumerate(max_lat_options):
#     for j, min_lon in enumerate(min_lon_options):
#         print(f'generating grid: {i*10+j+1}/100')
#         coords = list(coordinate_generator(min_lat, max_lat, min_lon, max_lon, dist))
#         ar = view.map_async(coord_to_zip, coords)
#         view.shutdown()
#         break
# #         progress_bar(ar)
#         coord_zip_dict = dict()
#         num_coords = len(coords)
#         clear_output(wait=True)
#         for k, (coord, maybe_zip) in enumerate(ar):
#             print(f'grid: {i*10+j+1}/100\nprocessing grid points: {k+1}/{num_coords}')
#             clear_output(wait=True)
#             if isinstance(maybe_zip, Exception):
#                 exception_coords.append(coord)
#             elif maybe_zip is not None:
#                 coord_zip_dict[coord] = maybe_zip
#         with open(f'pickles/coord_zip_dict{i}{j}.pkl', 'wb') as file:
#             pickle.dump(coord_zip_dict, file)
#     break

In [None]:
# exception_coords

## Zip Coordinates to FIPS Codes

In [None]:
with open('pickles/coord_zip_dict.pkl', 'rb') as file:
    coord_zip_dict = pickle.load(file)

In [None]:
# filter bad zip codes and cut zip+4's
coord_zip_dict = {coord: zip_code if len(zip_code) == 5 else zip_code[:5] for coord, zip_code in coord_zip_dict.items() if len(zip_code) >= 5}

In [None]:
# this loop is fast and need not be pickled
# dict from county name to list of coordinates within said county
zip_coords_dict = dict()
for coord, z in coord_zip_dict.items():
    try:
        zip_coords_dict[z].append(coord)
    except KeyError:
        zip_coords_dict[z] = [coord]

In [None]:
# download from https://www.kaggle.com/danofer/zipcodes-county-fips-crosswalk/version/1#
# requires kaggle account
zip_fips_crosswalk = pd.read_csv('zipcodes-county-fips-crosswalk.zip', dtype={'ZIP': str, 'STCOUNTYFP': str})

In [None]:
zip_fips_crosswalk.head()

In [None]:
zip_fips_dict = dict()
for i, row in zip_fips_crosswalk.iterrows():
    zip_fips_dict[row['ZIP']] = row['STCOUNTYFP']

In [None]:
# remove bad(?) zips
for zip_code in list(zip_coords_dict.keys()):
    if zip_code not in zip_fips_dict:
        zip_coords_dict.pop(zip_code)

In [None]:
fips_coords_dict = dict()
for zip_code, coords in zip_coords_dict.items():
    fips = zip_fips_dict[zip_code]
    try:
        fips_coords_dict[fips].extend(coords)
    except KeyError:
        fips_coords_dict[fips] = coords

## FIPS Stations

In [3]:
stations_df = pd.read_fwf('https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt',
   colspecs=[(0,11), (12, 20), (21, 30), (31, 37), (38, 40), (41, 71), (72, 75), (76, 79), (80, 85)],
   header=None,
   names=['ID', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'STATE', 'NAME', 'GSN FLAG', 'HCN/CRN FLAG', 'WMO ID'])

In [4]:
# only includes HCN stations in the continental United States
stations_df = stations_df[stations_df['ID'].str.startswith('US') & (stations_df['HCN/CRN FLAG'] == 'HCN')].reset_index(drop=True)

In [5]:
with open('pickles/stations_df.pkl', 'wb') as file:
    pickle.dump(stations_df, file)

In [None]:
def add_stations_to_fips(fips):
    coords = fips_coords_dict[fips]
    fips_stations = list()
    time.sleep(0)
    for i, station in stations_df.iterrows():
        station_coord = (station['LATITUDE'], station['LONGITUDE'])
        time.sleep(0)
        for coord in coords:
            dist = distance(station_coord, coord).miles
            if dist <= 50:
                fips_stations.append(station['ID'])
                break
    return (fips, fips_stations)

In [None]:
fips_codes = list(fips_coords_dict.keys())

In [None]:
rc = ipyparallel.Client()
view = rc.load_balanced_view()

with rc[:].sync_imports():
    import time

rc[:].push(dict(
    distance=distance.distance,
    fips_coords_dict=fips_coords_dict,
    stations_df=stations_df
))

In [None]:
ar = view.map_async(add_stations_to_fips, fips_codes)

In [None]:
num_fips_codes = len(fips_codes)

In [None]:
# ctrl + enter on this cell to see progress without blocking kernel
print(f'Time Elapsed: {datetime.timedelta(seconds=math.ceil(ar.elapsed))}')
print(f'FIPS Codes Completed: {ar.progress}/{num_fips_codes}')
print(f'Completed {round(ar.serial_time/ar.wall_time, 2)}x faster than serial computation' if ar.ready() else 'Still Running')

In [None]:
# will block the kernel; interrupt the kernel to unblock
ar.wait_interactive()
print(f'Completed {round(ar.serial_time/ar.wall_time, 2)}x faster than serial computation' if ar.ready() else 'Still Running')

In [None]:
# TODO: what to do with counties with no stations?

In [None]:
# dict from coordinate to county in US, None if not in US, or an error object
fips_stations_dict = dict()
for fips, stations in ar:
    fips_stations_dict[fips] = stations

In [None]:
with open('pickles/fips_stations_dict.pkl', 'wb') as file:
    pickle.dump(fips_stations_dict, file)

In [None]:
# set index to ID for fast lookup
stations_df.set_index('ID', inplace=True)

In [None]:
# dict from fips to dict from station to inverse average distance
fips_stations_inverse_distances_dict = dict()
num_fips_codes = len(fips_stations_dict)
for i, (fips, stations) in enumerate(fips_stations_dict.items()):
    num_stations = len(stations)
    for j, station in enumerate(stations):
        print(f'fips: {i+1}/{num_fips_codes}\nstations: {j+1}/{num_stations}')
        clear_output(wait=True)
        distances = list()
        station_coord = (stations_df.loc[station]['LATITUDE'], stations_df.loc[station]['LONGITUDE'])
        for coord in fips_coords_dict[fips]:
            distances.append(distance.distance(station_coord, coord).miles)
        distances = np.array(distances, dtype=float)
        inverse_distances = np.reciprocal(distances)
        try:
            fips_stations_inverse_distances_dict[fips][station] = np.mean(inverse_distances)
        except KeyError:
            fips_stations_inverse_distances_dict[fips] = {station: np.mean(inverse_distances)}

In [None]:
with open('pickles/fips_stations_inverse_distances_dict.pkl', 'wb') as file:
    pickle.dump(fips_stations_inverse_distances_dict, file)