# Processing Global Geospatial Datasets from OpenStreetMap and NASA Sattelites 

<img src="https://hire.withgoogle.com/public_frame/jobs/azaveacom/logo/P_AAAAAADAABpCmr9NGZP2H_.png"
     alt="Azavea Logo"
     style="float: right; margin-right: 10px; width: 400px" />

- Eugene Cheipesh
- echeipesh@azavea.com
- www.azavea.com
- GitHub: @echeipesh

"I hope you like code and workbooks!" 

In [None]:
import geopyspark as gps
import numpy as np
import pyproj
import folium

from functools import partial
from shapely.geometry import Point, Polygon
from shapely.ops import transform
from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession

In [None]:
conf = gps.geopyspark_conf(appName="geopython-notebook-emr", master='local[*]')
conf.set('spark.default.parallelism', 8)
conf.set('spark.ui.enabled', True)
conf.set('spark.master.memory', '9500M')
conf.set('spark.driver.maxResultSize', '3G')
conf.set('spark.task.maxFailures', '33')
conf.set('spark.executor.extraJavaOptions', '-XX:+UseParallelGC')

sc = SparkContext(conf=conf)

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", '')
hadoopConf.set("fs.s3.awsSecretAccessKey", '')

pysc = gps.get_spark_context()
session = SparkSession.builder.config(conf=pysc.getConf()).enableHiveSupport().getOrCreate()

# Motivating Scenario


<img src="https://user-images.githubusercontent.com/1158084/40941771-d8530766-6800-11e8-8a29-ce611825a16c.jpg" 
    style="float: right; width: 200px; margin-right: 10px;" />
    
We are an agency responding to an introduction of an invasive species of the European Short-Horned beetle.

It has arrived in a series of lumber shipments a week ago and may spread through the road network as cargo or on clothing of people handling the shipments.

What are the candidate affected areas for initial treatment?


## Goal: Cost Distance Model

<img src="https://user-images.githubusercontent.com/1158084/40941770-d839880e-6800-11e8-86de-51070d1a1b61.jpg" 
    style="float: right; width: 200px; margin-right: 10px;" />
    
- Accumulated cost from starting point
- Cost described by friction layer
- Every pixel is eight connected
- Iterative algoirthm



## What is our friction surface?


<img src="https://user-images.githubusercontent.com/1158084/40941770-d839880e-6800-11e8-86de-51070d1a1b61.jpg" 
    style="float: right; width: 200px; margin-right: 10px;" />

- Traveling on roads is easy because of cars
- Traveling off roads requires walking
- Walking speed depends on slope
- Walking is easier in the city
- Walking is harder through forest

## Setting our workspace

In [None]:
# The URI that the raster layers will be saved/read from
layer_uri = "s3://geopyspark-demo/geopython/catalog/emr"

# The AttributeStore is a Key/Value metadata store for our layers
store = gps.AttributeStore(layer_uri)

# OpenStreetMap

"OSM is a free, editable map of the world, created and maintained by volunteers and available for use under an open license. In the 12 years of OSM’s existence, editors have created and modified several billion features (physical things on the ground like roads or buildings)."

https://registry.opendata.aws/osm/

## OpenStreeMap Schema

<img src="https://user-images.githubusercontent.com/1158084/36169819-5cf29190-10cb-11e8-9161-6f0386f14194.png" 
     style="float: right; margin-right: 10px;"/>
     
- **Node**
    - POI, Label, Stop
- **Way**
    - Road, River, Boundary
- **Relation**
    - Geographic, Semantic

In [None]:
session.read.orc("s3://osm-pds/planet/planet-latest.orc").printSchema()

## OSM Roads and Trails

In [None]:
# Read in the OSM data from an ORC file
file_uri = "s3://geotrellis-test/xterrain/continental-us.orc"
osm_dataframe = session.read.orc(file_uri)

In [None]:
# Get all of the ways as line features
osm = gps.vector_pipe.osm_reader.from_dataframe(osm_dataframe)
lines = osm.get_line_features_rdd()
highways = lines.filter(lambda feature: 'highway' in feature.properties.tags)

In [None]:
highways.take(1)

## Assign Road or Path Speed

In [None]:
path_tags = ['footway', 'steps', 'bridleway', 'path', 'cycleway', 'escalator']

# Filter out the highways into roads and paths

roads = highways.filter(
    lambda feature: 
        feature.properties.tags['highway'] not in path_tags)

paths = highways.filter(
    lambda feature: 
        feature.properties.tags['highway'] in path_tags)

In [None]:
# This cell contains the logic that assigns each section of road a
# speed based on the type of road that section is.

default_speeds = {
    'motorway': 65,
    'trunk': 45,
    'primary': 40,
    'secondary': 35,
    'tertiary': 30,
    'unclassified': 20,
    'residential': 20,
    'service': 15,
    'motorway_link': 45,
    'trunk_link': 40,
    'primary_link': 35,
    'secondary_link': 30,
    'tertiary_link': 25,
    'living_street': 5,
    'pedestrian': 5,
    'track': 15,
    'road': 30}

words = ['maxspeed', 'ambiguous', 'signals', 
         'none', 'walk', 'variable', 
         'national', 'fixme', 'unposted', 'implicit']

def is_number(s):
    try:
        float(s)
        return True
    except ValueError:
        return False

def default_speed(highway):
    if not highway in default_speeds:
        return default_speeds['road']
    else:
        return default_speeds[highway]

def get_maxspeed(speed, units, highway):
    speeds = speed.split(';|,-')
    maxspeed = 0
    for sp in speeds:
        sp = sp.replace(units, '')
        if (is_number(sp)):
            if units == 'kph':
                sp = float(sp) / 1.609344 
            elif units == 'knots':
                sp = 0.868976 * float(knots)
            else:
                sp = float(sp)
                
            if sp > maxspeed:
                maxspeed = sp
    if maxspeed > 0:
        speed = maxspeed
    else:
        speed = default_speed(highway)

    return speed

def get_highway_cellvalue(osm_feature):   
    highway = osm_feature.properties.tags['highway']
    speed = osm_feature.properties.tags.get('maxspeed', '')
                                
    speed = speed.lower().strip()
        
    # if we don't have a speed, give it a default
    if len(speed) == 0:
        speed = default_speed(highway)
    elif not is_number(speed):
        if 'kph' in speed:
            speed = get_maxspeed(speed, 'kph', highway)
        elif 'km/h' in speed:
            speed = get_maxspeed(speed, 'km/h', highway)
        elif 'kmh' in speed:
            speed = get_maxspeed(speed, 'kmh', highway)
        elif 'mph' in speed:
            speed = get_maxspeed(speed, 'mph', highway)
        elif 'knots' in speed:
            speed = get_maxspeed(speed, 'knots', highway)
        elif speed in words:
            speed = default_speed(highway)
        else:
            speed = get_maxspeed(speed, '', highway)            
    if float(speed) <= 0.0:
        speed = default_speed(highway)

    speed = float(speed)
    return gps.CellValue(speed, speed)

In [None]:
# Encode the the paths with the default walking speed
path_features = paths\
    .map(lambda feature: 
         gps.Feature(feature.geometry, gps.CellValue(3.74, 3.74)))\
    .persist(StorageLevel.MEMORY_AND_DISK_SER)

# Encode the road speeds based on road type, get_highway_cellvalue
road_features = roads\
    .map(
        lambda feature: 
            gps.Feature(feature.geometry, get_highway_cellvalue(feature)))\
    .persist(StorageLevel.MEMORY_AND_DISK_SER)

## Rasterize OSM Features

<img src="https://user-images.githubusercontent.com/11281373/29459223-acdb92f0-83f0-11e7-9dde-b13911986b7c.png" />

## Rasterize OSM Features

In [None]:
# Combine the roads, paths into one RDD and then rasterize
osm_raster = gps.geotrellis.rasterize_features(
    features = pysc.union([road_features, path_features]),
    crs = "EPSG:4326",
    zoom = 13,
    cell_type = gps.CellType.INT8RAW,
    partition_strategy = gps.SpatialPartitionStrategy(1000))\
.convert_data_type(gps.CellType.FLOAT32, -2147483648.0)

- **CRS**: Coordinate Reference System, `4326` is Lat-Long
- **Zoom**: Level of detail in a power of two tile pyramid
- **NoData**: Flag for absence of data, `-2147483648.0`

## Save the Rasterized OSM Features

In [None]:
tiled_osm = osm_raster\
    .tile_to_layout(layout = gps.GlobalLayout(), target_crs = "EPSG:3857")\
    .with_no_data(0.0)

<img src="https://user-images.githubusercontent.com/1158084/40943007-98d2b22c-6804-11e8-82ba-efebd5ceea34.jpg" style="height: 300px; align: center"/>

## Save the Rasterized OSM Features

In [None]:
osm_pyramid = tiled_osm.pyramid(
    partition_strategy = gps.SpatialPartitionStrategy(1000))

<img src="https://user-images.githubusercontent.com/1158084/40943639-bf40b1e6-6806-11e8-8dca-55a8dd46fc99.jpg" 
 style="height: 300px"/>

## Save the Rasterized OSM Features

In [None]:
osm_layer_name = "rasterized-osm-features"

# Save layer histogram for later use
osm_hist = osm_pyramid.get_histogram()
store.layer(osm_layer_name).write("histogram", osm_hist.to_dict())

# Save layer pyramid
for zoom, layer in sorted(osm_pyramid.levels.items(), reverse=True):
    print("Writing zoom", zoom)
    store.layer(osm_layer_name, zoom).delete("metadata")
    gps.write(layer_uri, osm_layer_name, layer)

## Displaying the Rasterized OSM Features

In [None]:
osm_layer_name = "rasterized-osm-features"
osm_hist = gps.Histogram.from_dict(
    store.layer(osm_layer_name).read("histogram"))
osm_color_map = gps.ColorMap.build(osm_hist, 'magma')

osm_tms = gps.TMS.build((layer_uri, osm_layer_name), osm_color_map)
osm_tms.bind("0.0.0.0", 56589)

In [None]:
osm_map = folium.Map()
folium.TileLayer(
    tiles='http://localhost:56589/tile/{z}/{x}/{y}.png',
    attr="GeoPySpark").add_to(osm_map)
osm_map.fit_bounds(bounds = [(32.8283, -98.5795)], max_zoom=4)

In [None]:
osm_map

## Save Pyramid Utility

In [None]:
def save_layer_pyramid(layer_name, layer):
    pyramid = layer.pyramid(partition_strategy = gps.SpatialPartitionStrategy(1000))    
    hist = pyramid.get_histogram()
    store.layer(layer_name).write("histogram", hist.to_dict())

    # Save layer pyramid
    for zoom, layer in sorted(osm_pyramid.levels.items(), reverse=True):
        print("Writing {0} at zoom {1}".format(layer_name, zoom))
        store.layer(layer_name, zoom).delete("metadata")
        gps.write(layer_uri, layer_name, layer)
    
    return pyramid

## Display Saved Pyramid Utility

In [None]:
def get_layer_tms(layer_name, color_map, port = None):
    hist = gps.Histogram.from_dict(store.layer(layer_name).read("histogram"))
    cm = gps.ColorMap.build(hist, color_map)
    tms = gps.TMS.build((layer_uri, layer_name), cm)
    tms.bind("0.0.0.0", port)
    return tms

In [None]:
try: bound_tms
except NameError:  bound_tms = {}

def get_layer_map(layer_name, color_map, port = None):
    key = (layer_name, color_map)
    if not (key) in bound_tms:
        bound_tms[key] = get_layer_tms(layer_name, color_map, port)
    tms = bound_tms[key]
    layer_map = folium.Map()
    folium.TileLayer(tiles = tms.url_pattern, attr = key).add_to(layer_map)
    layer_map.fit_bounds(bounds = [(28.8283, -98.5795)], max_zoom = 4)
    return layer_map

In [None]:
get_layer_map("rasterized-osm-features", "magma")

# Walking Friction

- Elevation
- Slope
- Land Cover

## National Elevation Dataset

"The National Elevation Dataset (NED) is the primary elevation data product of the USGS. The NED is a seamless dataset with the best available raster elevation data of the conterminous United States."

<img src="https://lta.cr.usgs.gov/sites/default/files/u19/ned1.gif"
    style="float: right; margin-right: 10px; width: 400px" />


https://nationalmap.gov/elevation.html

## Reading NED from GeoTiffs

In [None]:
ned = gps.geotiff.get(
    gps.LayerType.SPATIAL, 
    's3://azavea-datahub/raw/ned-13arcsec-geotiff/', 
    num_partitions=1000, max_tile_size=256)

In [None]:
tiled_ned = ned.tile_to_layout(
    layout = osm_raster.layer_metadata,
    partition_strategy = gps.SpatialPartitionStrategy(1000)
).convert_data_type(gps.CellType.FLOAT32, 0.0)\
.persist(StorageLevel.MEMORY_AND_DISK_SER)

save_layer_pyramid("ned-layer", tiled_ned)

In [None]:
get_layer_map("ned-layer", "viridis")

# Shuttle Radar Topography Mission

"Virtually all of the land surface between +/- 60 degrees latitude was mapped by SRTM."

<img src="https://www2.jpl.nasa.gov/srtm/images/bin/srtm_covmap_thu.jpg"
    style="float: right; margin-right: 10px; width: 300px" />


- `SRTMHGT/SRTMHGT` File Format
- Available in 1x1 degree tiles
- 30m resolution at equator

https://www2.jpl.nasa.gov/srtm

<img src="https://user-images.githubusercontent.com/1158084/40938454-c6d7c292-67f6-11e8-8eaf-30f7fbed6314.png" />

http://dwtkns.com/srtm30m/

## SRTM: Not a GeoTiff

- `GDAL` is swish army of raster formats
- `rasterio` wraps `GDAL` for Python

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/d/df/GDALLogoColor.svg/2000px-GDALLogoColor.svg.png" 
     style="float: right; margin-right: 10px; width: 200px"/>


## List some files on S3

In [None]:
import boto3
s3 = boto3.client('s3')
def get_raster_s3_objects(bucket, prefix, extension="hgt"):
    results = []
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
    for page in page_iterator:
        for item in page['Contents']:
            if item['Key'].endswith(extension):
                results.append(item)
    return results                

In [None]:
object_names = get_raster_s3_objects("mrgeo-source", "srtm-v3-30")
uris = list(map(lambda x: 's3://mrgeo-source/{}'.format(x['Key']), object_names))
uris[0:3]

In [None]:
# Lets peek and see what we're working with
windows = gps.rasterio._read_windows(
    uri = "s3://mrgeo-source/srtm-v3-30/N00E010.hgt", 
    xcols = 256, ycols = 256, 
    bands = None, 
    crs_to_proj4 = gps.rasterio.crs_to_proj4)

next(windows)

## Continental United States

The cartographic boundary files are simplified representations of selected geographic areas from the Census Bureau’s MAF/TIGER geographic database.

In [None]:
# Source: https://www.census.gov/geo/maps-data/data/cbf/cbf_nation.html

conus_bound = gps.shapefile\
    .get(uri='s3://geopyspark-demo/vector/us/cb_2017_us_nation_20m.shp')\
    .collect()[0].geometry.geoms[78]
conus_bound

## Filter SRTM to Continental United States

In [None]:
import shapely
import rasterio

def uri_and_bounds_in_conus(uri):
    with rasterio.open(uri) as data:
        x0,y0,x1,y1 = data.bounds
        poly = Polygon([(x0,y0), (x1,y0), (x1,y1), (x0,y1), (x0,y0)])
        return [(uri, poly)] if conus_bound.intersects(poly) else []
    
us_uris_and_bounds = pysc.parallelize(uris).flatMap(uri_and_bounds_in_conus)
us_uris = us_uris_and_bounds.map(lambda x: x[0])

## Tile and Save SRTM subset

In [None]:
us_rasters = gps.rasterio.get(us_uris)

raster_layer = gps.RasterLayer.from_numpy_rdd(
    layer_type = gps.LayerType.SPATIAL, 
    numpy_rdd = us_rasters)

tiled_raster_layer = raster_layer.tile_to_layout(
    layout = gps.GlobalLayout(), 
    target_crs = "EPSG:3857")

save_layer_pyramid("srtm", tiled_raster_layer)

In [None]:
get_layer_map("srtm", "viridis")

# National Land Cover Database

<img src="https://www.mrlc.gov/images/NLCD06_conus_lg.gif" 
     style="float: left; margin-left: 5px; margin-right: 5px; width: 600px" />

<img src="https://www.mrlc.gov/downloadfile.php?file=NLCD_Colour_Classification_Update.jpg" 
     style="float: left; margin-left: 50px; margin-right: 50px; width: 250px" />
     
https://www.mrlc.gov/nlcd2011.php

# Reading NLCD Data

In [None]:
nlcd = gps.geotiff.get(
    layer_type = gps.LayerType.SPATIAL, 
    uri = "s3://gt-rasters/nlcd/2011/tiles", 
    crs = "EPSG:4326",
    max_tile_size = 512, 
    num_partitions = 1000)

In [None]:
tiled_nlcd = nlcd.tile_to_layout(
    layout = osm_raster.layer_metadata, 
    target_crs = "EPSG:4326", 
    partition_strategy = gps.SpatialPartitionStrategy(1000))

## Reclassified NLCD Layer

In [None]:
# Reclassify the NLCD values based on estimated walking impact

nlcd_map = {
    11.0: 0.0, 12.0: 0.15, 21.0: 0.9, 22.0: 0.9, 23.0: 0.9, 24.0: 0.95,
    31.0: 0.1, 41.0: 0.7, 42.0: 0.65, 43.0: 0.75, 51.0: 0.75, 52.0: 0.75,
    71.0: 0.8, 81.0: 0.8, 82.0: 0.8, 90.0: 0.2, 95.0: 0.25 }

In [None]:
nlcd_pmts = tiled_nlcd\
    .convert_data_type(gps.CellType.FLOAT32, 0.0)\
    .reclassify(nlcd_map, float, gps.ClassificationStrategy.EXACT)

nlcd_wm = nlcd_pmts.tile_to_layout(
    layout = gps.GlobalLayout(), 
    target_crs = "EPSG:3857")

save_layer_pyramid("raclassified-nlcd", nlcd_wm)

In [None]:
get_layer_map("raclassified-nlcd", 'magma')

## Tobler Hicking Function

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/7/72/Tobler%27s_hiking_function.svg/600px-Tobler%27s_hiking_function.svg.png" />

# Tobler Hicking Function

In [None]:
# Calculate Slope from the NED layer
zfactor = gps.geotrellis.zfactor_lat_lng_calculator('Meters')
slope_raster = tiled_ned.slope(zfactor)

In [None]:
# From the Slope layer, calculate the Tobler walking speed
tobler_raster = slope_raster.tobler()

# Add the Tobler and Reclassified NLCD layers to adjusted the Tobler values
adjusted_tobler = tobler_raster * nlcd_pmts

In [None]:
# The friction layer is per pixel max between the adjusted Tobler and OSM values
friction_with_roads = adjusted_tobler.local_max(osm_raster)

## Reproject and Save Friction layer

In [None]:
friction_layer = friction_with_roads.tile_to_layout(
    target_crs = 3857,
    layout = gps.GlobalLayout(tile_size=256),
    resample_method = gps.ResampleMethod.MAX
).convert_data_type(gps.CellType.FLOAT32, 0.0)

save_layer_pyramid("us-friction-surface-layer-tms", friction_layer)

In [None]:
get_layer_map("us-friction-surface-layer-tms", "magma")

# Cost Distance over the Friction Layer

In [None]:
# The point of origin
point = Point(-75.15415012836456, 39.96134940667086)

In [None]:
# The point of origin needs to be reprojected to WebMercator
project = partial(
    pyproj.transform,  
    pyproj.Proj(init='epsg:4326'),
    pyproj.Proj(init='epsg:3857'))
reprojected_point = transform(project, point)

In [None]:
# Using the Quotient of the average walking speed
cost_distance = gps.cost_distance(
    friction_layer = 3.74 / friction_layer,
    geometries = [reprojected_point],
    max_distance = 50000)

save_layer_pyramid("cost-distance-2", cost_distance)

In [None]:
get_layer_map('cost-distance-2', 'viridis')