In [None]:
# requirements
%pip install -q fiona folium geopandas numpy requests shapely

# third-party
import fiona
import folium
import geopandas
import numpy as np
import requests
from shapely import make_valid, MultiPolygon, Polygon
from shapely.geometry import mapping, shape

# standard
from datetime import datetime
import json
import os
from zipfile import ZipFile

1. Get Raw Zipcode Files
    - Download most recent data from census.gov
    - Unzip downloaded file
    - Make raw zipcodes directory
    - Write raw zipcode files using unzipped files

In [None]:
# constants
CENSUS_URL = 'https://www2.census.gov/geo/tiger/TIGER'
MB = 1 << 20
DOWNLOADS_DIRPATH = 'downloads/'
RAW_ZIPCODES_DIRPATH = 'raw_zipcodes/'
ZIPCODES_DIRPATH = 'Zipcodes/'
RAW_JSON_EXT = '_raw.json'

# helpers
def get_download_url():
    for YEAR in range((y := datetime.now().year), y - 20, -1):
        zip_url = f'{CENSUS_URL}{YEAR}/'
        response = requests.get(zip_url, stream=True)
        if response.status_code != 200: continue
        i = response.text.find('ZCTA5')
        j = response.text.find('/', i)
        ZCTA5 = response.text[i:j]
        download_url = f'{CENSUS_URL}{YEAR}/{ZCTA5}/tl_{YEAR}_us_{ZCTA5.lower()}.zip'
        return download_url
    else:
        raise Exception('no good response')

def progress_bar(count, total, tag='', normalize=None, length=32):
    quotient = count / total
    percent = f'{quotient * 100:.1f}%'
    progress = f'[{(int(quotient * length) * '=')[:length].ljust(length, '_')}]'
    counter = f'({tag} {total} | {count})' if normalize == None else f'({tag} {total / normalize:.1f} | {count / normalize:.1f})'
    return f'{percent} {progress} {counter}'

def download_zip_file(download_url):
    try:
        zip_filename = os.path.basename(download_url)
        zip_filepath = os.path.join(DOWNLOADS_DIRPATH, zip_filename)
        if os.path.exists(zip_filepath):
            print(f'zip file already downloaded: {zip_filepath}')
            return zip_filepath
        os.makedirs(DOWNLOADS_DIRPATH, exist_ok=True)
        response = requests.get(download_url, stream=True)
        response.raise_for_status()
        DOWNLOADED = 0
        DOWNLOAD_SIZE = int(response.headers.get('content-length', 0))
        with open(zip_filepath, 'wb') as FILE:
            for CHUNK in response.iter_content(chunk_size=MB):
                if not CHUNK: continue
                FILE.write(CHUNK)
                DOWNLOADED += len(CHUNK)
                PROGRESS_BAR = progress_bar(DOWNLOADED, DOWNLOAD_SIZE, 'MB', MB)
                print(f'\rDownloading `{zip_filename}`: {PROGRESS_BAR}', end='')
        print(f'\n`{zip_filename}` downloaded successfully.')
        return zip_filepath
    except requests.exceptions.RequestException as error:
        print(error)

def unzip_downloaded_file(zip_filepath):
    shape_filepath = zip_filepath.replace('.zip', '.shp')
    if os.path.exists(shape_filepath):
        print(f'shape file already unzipped: {shape_filepath}')
        return None
    with ZipFile(zip_filepath, 'r') as FILE:
        FILE.extractall(DOWNLOADS_DIRPATH)

def count_files(dirpath):
    return sum(len(files) for _, _, files in os.walk(dirpath))

def make_zipcode_directory(dirpath):
    os.makedirs(dirpath, exist_ok=True)
    for NUM in range(500, 99999, 500):
        sub_dirpath = os.path.join(dirpath, f'{NUM:05}-{NUM+499:05}')
        os.makedirs(sub_dirpath, exist_ok=True)

def get_zipcode_filepath(zipcode_filename):
    dirpath = RAW_ZIPCODES_DIRPATH if zipcode_filename.endswith(RAW_JSON_EXT) else ZIPCODES_DIRPATH
    floor = (int(zipcode_filename[:5]) // 500) * 500
    return os.path.join(dirpath, f'{floor:05d}-{floor+499:05d}', zipcode_filename)

def write_raw_zipcode_files(zip_filepath):
    shape_filepath = zip_filepath.replace('.zip', '.shp')
    with fiona.open(shape_filepath, 'r') as FILE:
        FEATURE_SIZE = len(FILE)

        if FEATURE_SIZE == count_files(RAW_ZIPCODES_DIRPATH):
            print(f'raw zipcode files already written: {FEATURE_SIZE} files')
            return None
        
        ZCTA5_KEY = None
        for COUNT, FEATURE in enumerate(FILE, start=1):
            ZCTA5_KEY = ZCTA5_KEY or next((key for key in dict(FEATURE.properties).keys() if key.startswith('ZCTA5')), None)
            zipcode = FEATURE.properties[ZCTA5_KEY]
            PROGRESS_BAR = progress_bar(COUNT, FEATURE_SIZE, 'files')
            print(f'\rWriting Zipcode `{zipcode}`: {PROGRESS_BAR}', end='')

            zipcode_filepath = get_zipcode_filepath(zipcode + RAW_JSON_EXT)
            zipcode_json = {
                'type': 'Feature',
                'properties': dict(FEATURE.properties),
                'geometry': dict(FEATURE.geometry),
            }

            with open(zipcode_filepath, 'w') as ZIPCODE_FILE:
                json.dump(zipcode_json, ZIPCODE_FILE)

In [None]:
download_url = get_download_url()
zip_filepath = download_zip_file(download_url)
unzip_downloaded_file(zip_filepath)
make_zipcode_directory(RAW_ZIPCODES_DIRPATH)
write_raw_zipcode_files(zip_filepath)

2. Simplify Raw Zipcodes
    - Make Zipcodes Directory
    - Simplify Raw Zipcodes

In [None]:
# constants
ANGLE_THRESHOLD = 1.0
PRECISION = 0.0001
SIGFIGS = min(len(f'{PRECISION:.7f}'.rstrip('0').split('.')[1]), 6) # restricts significant decimal figures equal to that of PRECISION
GEOJSON_EXT = '.geojson'

# helpers
def get_geometry_from_filepath(filepath):
    with open(filepath, 'r') as file:
        data = json.loads(file.read())
        return shape(data['geometry'])

def get_coordinates_from_geometry(geometry):
    return json.loads(json.dumps(mapping(geometry)['coordinates']))

def prune_coordinates(coordinates):

    def triplets(x): # generates triplets of sequentially adjacent elements with wrap-around
        return zip(*[x[i:] + x[:i] for i in range(-1, 2)])

    def angle(points): # returns the angle in degrees measured offset from perpendicular [0, 90]
        a, b, c = points # unpack points
        V, W = np.array(b) - np.array(a), np.array(b) - np.array(c) # compute vectors
        m = np.linalg.norm(V) * np.linalg.norm(W) # compute magnitude
        return 90.0 if m == 0 else abs(90.0 - np.arccos(np.clip(np.dot(V, W) / m, -1.0, 1.0)) * 180.0 / np.pi) # compute angle (degrees)

    n = len(coordinates); angles = [angle(points) for points in triplets(coordinates)]
    while n > 2: # stop if less than 3 points
        sorted_angles = sorted(enumerate(angles), key=lambda x: x[1]) # assign an ordered index and sort by angle size
        i, max_angle = sorted_angles[-1] # get max angle and index
        if max_angle < 90.0 - ANGLE_THRESHOLD: break # stop if max angle does not exceed angle threshold
        coordinates.pop(i); angles.pop(i); n -= 1 # update angles adjacent to removed index
        prev = (i - 1) % n; next = i % n # update angles adjacent to removed index
        angles[prev] = angle([coordinates[prev - 1], coordinates[prev], coordinates[next]])
        angles[next] = angle([coordinates[prev], coordinates[next], coordinates[(next + 1) % n]])
    return coordinates if len(coordinates) > 2 else []

def snap_to_grid(point):

    def grid_mod(x, y): # returns true if x >= y/2 (with modulus clipping)
        return x % (2 * y) > y 
    
    def point_line(x, y, m, b): # result is positive if point (x, y) is above linear equation y = m * x + b
        return y - x * m - b

    def adjusted_point(x_min, y_min, t, bool): # snaps point to grid by rounding based on proximity to PRECISION
        t_bool = t > 0
        return [
            round(x_min + PRECISION * ((t_bool + bool) % 2), SIGFIGS),
            round(y_min + PRECISION * t_bool, SIGFIGS),
        ]

    Px_MOD, Py_MOD = point[0] % PRECISION, point[1] % PRECISION # determine relative position using modulus of point x and y
    X_MIN, Y_MIN = point[0] - Px_MOD, point[1] - Py_MOD # compute lower X and Y
    BOOL = (grid_mod(point[0], PRECISION) + grid_mod(point[1], PRECISION)) % 2 # compute oddity
    m, b = BOOL * 2 - 1, PRECISION * (1 - BOOL) # compute slopy and y-intercept
    t = point_line(Px_MOD, Py_MOD, m, b)
    if t * 8 ** .5 > PRECISION: # zone classification for point in grid
        return adjusted_point(X_MIN, Y_MIN, t, BOOL)
    t = point_line(Px_MOD, Py_MOD, -m, PRECISION - b)
    return adjusted_point(X_MIN, Y_MIN, t, BOOL)

def polify(geoms):
    result = []
    for geom in geoms:
        if geom.geom_type.endswith('Polygon'):
            result.append(geom)
        elif geom.geom_type == 'LinearRing':
            result.append(Polygon(geom))
    return result

def coordinates_need_further_simplification(arr):

    def flatten_array(arr):
        if not isinstance(arr, list):
            return [arr]
        flattened = []
        for item in arr:
            flattened.extend(flatten_array(item))
        return flattened

    def num_decimals(value):
        return len(str(value).split('.')[1]) if '.' in str(value) else 0

    return any(num_decimals(num) > SIGFIGS for num in flatten_array(arr))

def union(geometries, exit=False):
    result = geopandas.GeoDataFrame(geometry=geometries).union_all()
    if result.geom_type == 'GeometryCollection':
        return union(polify(result.geoms))
    if not result.is_valid:
        return simplify(make_valid(result))
    coords = get_coordinates_from_geometry(result)
    if not exit and coordinates_need_further_simplification(coords):
        return simplify(result, exit=True)
    return result

def force_multipolygon(geometry):
    return geometry if geometry.geom_type == 'MultiPolygon' else MultiPolygon([geometry])

def simplify(geometry, exit=False):
    new_geometry = []
    geometry_coordinates = get_coordinates_from_geometry(force_multipolygon(geometry))
    for polygon in geometry_coordinates:
        new_polygon = []
        for coordinates in polygon:
            new_coordinates = prune_coordinates([snap_to_grid(point) for point in prune_coordinates(coordinates)])
            if not new_coordinates: continue
            new_polygon.append(new_coordinates)
        if not new_polygon: continue
        outer, *inner = new_polygon
        new_polygon = Polygon(outer, inner)
        if not new_polygon.is_valid:
            new_polygon = make_valid(new_polygon)
        new_geometry.append(new_polygon)
    return union(new_geometry, exit)

def generate_geojson(geometry, key_name, value):
    if not geometry.geom_type.endswith('Polygon'):
        raise Exception('Geometry Type not Polygon or MultiPolygon')
    return {
        'type': 'Feature',
        'properties': {
            key_name: value,
            'bounds': geometry.bounds,
        },
        'geometry': {
            'type': geometry.geom_type,
            'coordinates': get_coordinates_from_geometry(geometry),
        },
    }

def simplify_raw_zipcodes():
    TOTAL_FILES = count_files(RAW_ZIPCODES_DIRPATH)

    CONVERSION_RATE = count_files(ZIPCODES_DIRPATH) / TOTAL_FILES
    if CONVERSION_RATE > .9:
        print(f'zipcode files already written: {TOTAL_FILES} raw | {count_files(ZIPCODES_DIRPATH)} simplified | {CONVERSION_RATE * 100:.1f}% conversion rate')
        return None

    make_zipcode_directory(ZIPCODES_DIRPATH)
    COUNT = 0
    for raw_zipcode_dir in os.listdir(RAW_ZIPCODES_DIRPATH):
        raw_zipcode_dirpath = os.path.join(RAW_ZIPCODES_DIRPATH, raw_zipcode_dir)
        for raw_zipcode_filename in os.listdir(raw_zipcode_dirpath):
            zipcode = raw_zipcode_filename[:5]

            COUNT += 1
            PROGRESS_BAR = progress_bar(COUNT, TOTAL_FILES, 'files')
            print(f'\rSimplifying Zipcode `{zipcode}`: {PROGRESS_BAR}', end='')

            new_zipcode_filepath = get_zipcode_filepath(zipcode + GEOJSON_EXT)
            if os.path.exists(new_zipcode_filepath) and os.path.getsize(new_zipcode_filepath) > 0: continue # file already exists
            raw_geometry = get_geometry_from_filepath(get_zipcode_filepath(raw_zipcode_filename))
            new_geojson = generate_geojson(simplify(raw_geometry), 'zipcode', zipcode)

            with open(new_zipcode_filepath, 'w') as file:
                json.dump(new_geojson, file)

In [None]:
simplify_raw_zipcodes()

3. Merge Zipcodes and Plot

In [None]:
def remove_holes(geometry):
    if isinstance(geometry, Polygon):
        return Polygon(geometry.exterior)
    if isinstance(geometry, MultiPolygon):
        polygons = [Polygon(p.exterior) for p in geometry.geoms]
        return MultiPolygon(polygons) if len(polygons) > 1 else polygons[0]
    else:
        raise TypeError('geometry must be a Polygon or MultiPolygon')

def merge_geometry_filepaths_to_file(geometry_filepaths, new_filepath, property_name, property_value, no_holes=True):
    if os.path.exists(new_filepath) and os.path.getsize(new_filepath) > 0:
        return # file already exists
    geometries = []
    for filepath in geometry_filepaths:
        try: geometries.append(get_geometry_from_filepath(filepath))
        except: pass
    if geometries:
        merged_geometry = union(geometries)
        if no_holes:
            merged_geometry = remove_holes(merged_geometry)
        if geojson := generate_geojson(merged_geometry, property_name, property_value):
            with open(new_filepath, 'w') as file:
                json.dump(geojson, file)

def get_center(geojson):
    min_x, min_y, max_x, max_y = geojson['properties']['bounds']
    center = [(min_y + max_y) / 2, (min_x + max_x) / 2]
    return center

def get_zoom(geojson):
    min_x, _, max_x, _ = geojson['properties']['bounds']
    diff = (max_x - min_x) % 360
    return np.floor(10 - np.log2(diff))

In [None]:
# create merged geometry file
SELECTED_ZIPCODES = [f'{zipcode:05d}' for zipcode in range (1001, 1101)]
GEOMETRY_FILEPATHS = [get_zipcode_filepath(ZIPCODE + GEOJSON_EXT) for ZIPCODE in SELECTED_ZIPCODES]
OUTPUT_FILENAME = 'merged_geometry.test'
OUTPUT_FILEPATH = OUTPUT_FILENAME + GEOJSON_EXT
os.path.exists(OUTPUT_FILEPATH) and os.remove(OUTPUT_FILEPATH)
merge_geometry_filepaths_to_file(GEOMETRY_FILEPATHS, OUTPUT_FILEPATH, 'Region', OUTPUT_FILENAME)

# initialize map
with open(OUTPUT_FILEPATH, 'r') as file:
    region_geojson = json.load(file)
map = folium.Map(
    location=get_center(region_geojson),
    zoom_start=get_zoom(region_geojson),
)

# add individual zipcodes with their labels
for GEOMETRY_FILEPATH in GEOMETRY_FILEPATHS:
    try:
        with open(GEOMETRY_FILEPATH, 'r') as file:
            geojson = json.load(file)
        folium.GeoJson(
            geojson,
            style_function=lambda feature: {
                'weight': 1,
                'color': 'red',
                'opacity': 1.0,
                'fillColor': 'blue',
                'fillOpacity': 0.2,
            },
        ).add_to(map)
        folium.Marker(
            location=get_center(geojson),
            icon=folium.DivIcon(
                html=f'<div style="font-size: 12px; color: black; font-weight: bold;">{geojson['properties']['zipcode']}</div>'
            ),
        ).add_to(map)
    except: pass

# add merged region border on top
folium.GeoJson(
    region_geojson,
    style_function=lambda feature: {
        'weight': 3,
        'color': 'blue',
        'opacity': .5,
        'fillOpacity': 0.0,
    }
).add_to(map)

map