# Geodata Manipulation

EDA and data manipulation of GIS data using H3, Geopandas, and Shapely.

In [9]:
import json
from typing import List

import h3
import pandas as pd
import dask.dataframe as dd

In [12]:
def get_geodata(filepath: str):
    gdf = read_file(filepath)
    return (gdf
            .astype({"INTPTLAT": float, "INTPTLON": float, "GEOID": "category"})
            .drop(["STATEFP", "COUNTYFP", "TRACTCE", "ALAND", "AWATER", "NAME", 
                   "NAMELSAD", "MTFCC", "FUNCSTAT"], axis=1)
            .rename({"INTPTLAT": "lat", "INTPTLON": "lon", "GEOID": "geoid"}, axis=1)
           )


def prepare_districts(gdf_districts):
    """Loads a geojson files of polygon geometries and features,
    swaps the latitude and longitude andstores geojson"""    
    return (gdf_districts
            .assign(geom_swap_geojson = lambda x: x["geometry"].map(lambda polygon: transform(
                       lambda x, y: (y, x), polygon)).apply(lambda y: mapping(y))))


def hex_fill_tract(geom_geojson: dict, res: int = 13, flag_swap: bool = False) -> set:
    """Fill a tract with small, res 13 hexagons.

    :param geom_geojson: The polygon to fill.
    :param res: The resolution to fill the polygons with.
    :param flag_swap: A flag indicating whether the polygon is geojson conformant or swapped.
    """
    try:
        set_hexagons = h3.compact(h3.polyfill(geom_geojson, res, geo_json_conformant = flag_swap))
    except ValueError:
        print(f"Error on data of type {geom_geojson['type']}. Continuing.")
        return set()
    return list(set_hexagons)


def hex_fill_df(gdf):
    """Fill the tracts with hexagons."""
    return gdf.assign(hex_fill = gdf["geom_swap_geojson"].apply(hex_fill_tract))

datadir = "../data/zipfiles"
zipfile = os.listdir(datadir)[0]
path = os.path.join(datadir, zipfile)

gdf = (get_geodata(path)
       .pipe(prepare_districts)
       .pipe(hex_fill_df)
      )

KeyboardInterrupt: 

In [2]:
all_tracts = []

for filename in os.listdir("../data/tract_polygons"):
    gdf = read_file(f"../data/tract_polygons/{filename}/{filename}.shp")
    # Unify the CT boundries
    union_poly = unary_union(gdf.geometry)
    
    # Convert to hexagon
    temp  = mapping(g)
    temp['coordinates']=[[[j[1],j[0]] for j in i] for i in temp['coordinates']]  
    gdf['hexes'] = h3.polyfill(temp, APERTURE_SIZE)
    all_tracts.append(gdf)
    
gdf = pd.concat(all_tracts)

In [9]:
APERTURE_SIZE = 3

gdf = read_file(f"../data/tract_polygons/tl_2020_01_tract/tl_2020_01_tract.shp")
union_poly = unary_union(gdf.geometry)
temp  = mapping(union_poly)
temp['coordinates']=[[[j[1],j[0]] for j in i] for i in temp['coordinates']]
hexes = h3.polyfill(temp, APERTURE_SIZE)

In [16]:
import os

import h3
import numpy as np
import pandas as pd
from shapely.ops import transform
from shapely.geometry import mapping
from dask.distributed import Client, LocalCluster
from dask_geopandas import read_parquet

from logging import getLogger

DATA_DIR = "../data"
ZIP_DIR = os.path.join(DATA_DIR, "zipfiles")
PARQUET_DIR = os.path.join(DATA_DIR, "tract_parquet")
TILED_CENSUS_DIR = os.path.join(DATA_DIR, "tiled_states")

LOGGER = getLogger(__name__)


def prepare_districts(gdf):
    """Loads a geojson files of polygon geometries and features,
    swaps the latitude and longitude and stores geojson"""
    return gdf.assign(
        geom_swap_geojson=lambda x: x["geometry"]
        .map(lambda polygon: transform(lambda x, y: (y, x), polygon))
        .apply(lambda y: mapping(y))
    )


def hex_fill_tract(geom_geojson: dict, res: int = 13, flag_swap: bool = False) -> set:
    """Fill a tract with small, res 13 hexagons.

    :param geom_geojson: The polygon to fill.
    :param res: The resolution to fill the polygons with.
    :param flag_swap: A flag indicating whether the polygon is geojson conformant or swapped.
    """
    try:
        set_hexagons = h3.compact(
            h3.polyfill(geom_geojson, res, geo_json_conformant=flag_swap)
        )
    except ValueError:
        LOGGER.debug("Error on data of type %s. Continuing.", geom_geojson["type"])
        return set()
    return list(set_hexagons)


def hex_fill_df(gdf):
    """Fill the tracts with hexagons."""
    return gdf.assign(hex_fill=gdf["geom_swap_geojson"].apply(hex_fill_tract)).drop(
        ["geometry", "geom_swap_geojson"], axis=1
    )


def tile_partition(df: pd.DataFrame):
    """Tile a single tract."""
    return df.pipe(prepare_districts).pipe(hex_fill_df)


cluster = LocalCluster(n_workers=8)
# Get the list of files to read
infiles = set(os.listdir(PARQUET_DIR))
donefiles = set(os.listdir(TILED_CENSUS_DIR))
files_todo = infiles.difference(donefiles)
LOGGER.info("%s states to tile.", len(files_todo))

dd = (
    read_parquet([os.path.join(PARQUET_DIR, file) for file in files_todo])
    .map_partitions(tile_partition, meta={
                "geoid": pd.CategoricalDtype,
                "lat": np.float64,
                "lon": np.float64,
                "hex_fill": object,
            })).compute()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40805 instead
  out[:] = [_pygeos_to_shapely(geom) for geom in data]
2022-11-18 18:07:11,113 - distributed.compatibility - ERROR - 
Traceback (most recent call last):
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/psutil/_common.py", line 443, in wrapper
    ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/utils.py", line 742, in wrapper
    return await func(*args, **kwargs)
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/worker_memory.py", line 196, in memory_monitor
    memory = worker.monitor.get_process_memory()
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/

2022-11-18 18:07:24,353 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f373d32f640>>, <Task finished name='Task-351161' coro=<ProfileTimePlot.trigger_update.<locals>.cb() done, defined at /home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/utils.py:740> exception=AttributeError("'NoneType' object has no attribute 'add_next_tick_callback'")>)
Traceback (most recent call last):
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/utils.py", line 742, in wrapper
    return

KeyboardInterrupt: 

Task exception was never retrieved
future: <Task finished name='Task-492733' coro=<Client._gather.<locals>.wait() done, defined at /home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/client.py:2119> exception=AllExit()>
Traceback (most recent call last):
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/client.py", line 2128, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-492735' coro=<Client._gather.<locals>.wait() done, defined at /home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/client.py:2119> exception=AllExit()>
Traceback (most recent call last):
  File "/home/evan.azevedo/.pyenv/versions/awsdx-hackathon/lib/python3.8/site-packages/distributed/client.py", line 2128, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished 