In [None]:
import os

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Create Spark config for our Kubernetes based cluster manager

sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("sie-555-final-project")
sparkConf.set("spark.kubernetes.container.image", "apache/spark-py:latest")
sparkConf.set("spark.kubernetes.namespace", "sie-555")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "sie-555-sa")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "sie-555-sa")
sparkConf.set("spark.driver.port", "29413")
sparkConf.set("spark.driver.host", "sie-555-deployment.sie-555.svc.cluster.local")

# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

In [None]:
def calc_quadkey(quadBbox, geomBbox, depth: int, limit: int) -> str:
  """
    Given an initial bounding box and the bounding box of your geometry in question,
    this function will return a string representation of the quad tree quadrant
    this geometry belongs to.
  """

  if depth >= limit:
    return ''
  halfWidth = (quadBbox[2] - quadBbox[0]) / 2.0
  halfHeight = (quadBbox[3] - quadBbox[1]) / 2.0

  newQuads = [
    (
      quadBbox[0],
      quadBbox[1],
      quadBbox[2] - halfWidth,
      quadBbox[3] - halfHeight
    ), (
      quadBbox[0] + halfWidth,
      quadBbox[1],
      quadBbox[2],
      quadBbox[3] - halfHeight
    ), (
      quadBbox[0],
      quadBbox[1] + halfHeight,
      quadBbox[2] - halfWidth,
      quadBbox[3]
    ), (
      quadBbox[0] + halfWidth,
      quadBbox[1] + halfHeight,
      quadBbox[2],
      quadBbox[3]
    )
  ]

  for i in range(4):
    if contains(newQuads[i], geomBbox):
      return str(i) + calc_quadkey(newQuads[i], geomBbox, depth, limit)
  return ''

def contains(bbox, geomBbox) -> bool:
  if bbox[0] > geomBbox[0]:
    return False
  if bbox[1] > geomBbox[1]:
    return False
  if bbox[2] < geomBbox[2]:
    return False
  if bbox[3] < geomBbox[3]:
    return False
  return True

In [None]:
from geopandas import gpd
from shapely.geometry import Polygon, box
from shapely.wkt import loads

midcoastLakes = gpd.read_file("MidCoastLakes")
queryGeom: Polygon = box(-69.2, 44.10, -69.0, 44.20)

data = []
for i, row in midcoastLakes[["OBJECTID", "geometry"]].iterrows():
  data.append(dict(
    OID = row.OBJECTID,
    bbox = row.geometry.bounds,
    wkt = row.geometry.wkt
  ))

In [None]:
def get_quadkey(bbox):
  """Wraps `calc_quadkey` with hard-coded parameters."""
  return calc_quadkey((-180, -180, 180, 180), bbox, 0, 20)

rdd = sc.parallelize(data)
rddWithQuadKey = rdd.map(
  lambda e: dict(
    OID = e['OID'],
    wkt = e['wkt'],
    quadKey = get_quadkey(e['bbox'])
  )
)

queryQuadKey = get_quadkey(queryGeom.bounds)
filteredRdd = rddWithQuadKey.filter(
  lambda e: e['quadKey'].startswith(queryQuadKey) or queryQuadKey.startswith(e['quadKey'])
)

dataOfInterest = filteredRdd.map(
  lambda e: dict(
    OID = e['OID'],
    wkt = e['wkt']
  )
).collect()

In [None]:
result = []
for doi in dataOfInterest:
  elem = dict(
    OID = doi['OID'],
    geometry = loads(doi['wkt'])
  )
  if queryGeom.intersects(elem['geometry']):
    result.append(elem)

print('There are ', len(result), 'intersecting geometries!')
gpd.GeoDataFrame(result).plot()

In [None]:
sc.stop()