## DEMO Part 02 - joins using Spark + Geomesa



In [1]:
import geomesa_pyspark
import random
from pyspark.sql import Row
from h5spark import read

conf = geomesa_pyspark.configure()\
    .setAppName('Demo1')

from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

# geomesa_pyspark.init_sql(spark) in later version 2.4 DOCS)
spark._jvm.org.apache.spark.sql.SQLTypes.init(spark._jwrapped)

In [2]:
%run "__DEMO Part 00.ipynb"

CPU times: user 220 ms, sys: 20 ms, total: 240 ms
Wall time: 242 ms
CPU times: user 468 ms, sys: 16 ms, total: 484 ms
Wall time: 481 ms


### Prepare spark dataframes 

In [3]:
#spark DF needs the raw geometry, not the POINT object
sites_df2 = sites_gdf.copy()
sites_df2["geometry_x"] = sites_gdf.geometry.apply(lambda geom: geom.x)
sites_df2["geometry_y"] = sites_gdf.geometry.apply(lambda geom: geom.y)

# def fix_null_int(x):
#     if x:
#         return int(x)
#     else:
#         return -1

# sites_df2["SiteDB_ID"] = sites_gdf.SiteDB_ID.apply(fix_null_int).astype(int)

# # more integrity with a schema
# from pyspark.sql.types import *
# site_schema = StructType([
#     StructField("POINT_ID", IntegerType(), nullable= True),
#     StructField("SiteDB_ID", IntegerType(), nullable=False),
#     StructField("geometry_x", DoubleType(), nullable=False),
#     StructField("geometry_y", DoubleType(), nullable=False),
#     StructField("FEATURE_NA", StringType(), nullable=True)
# ])

sites_spark_df = spark.createDataFrame(
    sites_df2.filter(items=["POINT_ID", 
        "SiteDB_ID",
        "geometry_x",
        "geometry_y",
        "FEATURE_NA"]))
#    schema=site_schema)

slip_rate_spark_df = spark.createDataFrame(
    slip_rate_df.filter(items=['Site Data_Site DB ID', 
                           'Site Data_Site Name', 
                           'Site Data_Easting (NZTM)', 
                           'Site Data_Northing (NZTM)']))

# set up SQL views on our two dataframes
sites_spark_df.createOrReplaceTempView("sites")
slip_rate_spark_df.createOrReplaceTempView("slip_rates")

sites_df2.filter(items=["POINT_ID", 
        "SiteDB_ID",
        "geometry_x",
        "geometry_y",
        "FEATURE_NA"])
print(sites_spark_df)
print(slip_rate_spark_df)

DataFrame[POINT_ID: bigint, SiteDB_ID: string, geometry_x: double, geometry_y: double, FEATURE_NA: string]
DataFrame[Site Data_Site DB ID: bigint, Site Data_Site Name: string, Site Data_Easting (NZTM): double, Site Data_Northing (NZTM): double]


In [4]:
#help(slip_rate_spark_df)
sites_spark_df.summary().show()

+-------+------------------+-------------------+------------------+------------------+-----------------+
|summary|          POINT_ID|          SiteDB_ID|        geometry_x|        geometry_y|       FEATURE_NA|
+-------+------------------+-------------------+------------------+------------------+-----------------+
|  count|               374|                261|               374|               374|               11|
|   mean|362.78877005347596|0.40229885057471265|1663582.7398980474| 5416410.452055819|             null|
| stddev|4039.8235403001986| 1.9342645399078247| 220095.5855532843|266732.74674936663|             null|
|    min|                 0|                  0|1191945.3343220146| 4877056.067123675|   Balfour Trench|
|    25%|                 0|                0.0|1475220.9210024634| 5200946.011786301|             null|
|    50%|                 0|                0.0|1690022.8332907828| 5439586.981779055|             null|
|    75%|                 0|                0.0| 184175

### Spark SQL join by ID

In [5]:
qry = "SELECT int(SiteDB_ID), `Site Data_Site DB ID` as SR_SiteID, geometry_x, geometry_y, "\
      " `Site Data_Easting (NZTM)` as easting, `Site Data_Northing (NZTM)` as northing"\
      " FROM sites, slip_rates"\
      " WHERE int(SiteDB_ID) = `Site Data_Site DB ID`"\
      " ORDER BY SiteDB_ID"

res = spark.sql(qry)
res.show()

+---------+---------+------------------+------------------+--------------+--------------+
|SiteDB_ID|SR_SiteID|        geometry_x|        geometry_y|       easting|      northing|
+---------+---------+------------------+------------------+--------------+--------------+
|        1|        1|1685349.9999996922| 5390752.999974101|     1685350.0| 5390752.99997|
|        2|        2|1686050.0317498427| 5392467.073217822| 1686050.03175| 5392467.07322|
|        3|        3|1688883.1686561191| 5397088.581135526|     1689691.0| 5398180.99996|
|        4|        4|1690022.8332907828| 5398338.833273707| 1690022.83329| 5398338.83327|
|        7|        7|1776505.3841642379| 5448136.479487686| 1777235.21908|  5448283.5495|
|        8|        8|1777132.1792704347| 5448409.051320459| 1777132.17927| 5448409.05132|
|        9|        9|1777343.5924081975| 5448532.222452894|1777343.592408|5448532.222453|
|       11|       11|1777968.1414732502|5448830.3397795465|1777968.141473| 5448830.33978|
+---------

### Spark SQL join by geometry rounding

In [None]:
qry = "SELECT int(SiteDB_ID), `Site Data_Site DB ID` as SR_SiteID, geometry_x, geometry_y, "\
      " `Site Data_Easting (NZTM)` as easting, `Site Data_Northing (NZTM)` as northing"\
      " FROM sites, slip_rates"\
      " WHERE abs(geometry_y - `Site Data_Northing (NZTM)`) < 0.00001"\
      " AND abs(geometry_x - `Site Data_Easting (NZTM)`) < 0.00001"\
      " ORDER BY SiteDB_ID"
    
res = spark.sql(qry)
res.show()

### Spark SQL join by geomesa st_distance

In [6]:
qry = "SELECT int(SiteDB_ID), `Site Data_Site DB ID` as SR_SiteID, geometry_x, geometry_y, "\
      " `Site Data_Easting (NZTM)` as easting, `Site Data_Northing (NZTM)` as northing"\
      " FROM sites, slip_rates"\
      " WHERE st_distance(st_point(`Site Data_Easting (NZTM)`, `Site Data_Northing (NZTM)`), "\
      "                   st_point(geometry_x, geometry_y)) < 0.001"\
      " ORDER BY SiteDB_ID"
res = spark.sql(qry)
res.show()

+---------+---------+------------------+------------------+--------------+--------------+
|SiteDB_ID|SR_SiteID|        geometry_x|        geometry_y|       easting|      northing|
+---------+---------+------------------+------------------+--------------+--------------+
|        1|        1|1685349.9999996922| 5390752.999974101|     1685350.0| 5390752.99997|
|        2|        2|1686050.0317498427| 5392467.073217822| 1686050.03175| 5392467.07322|
|        4|        4|1690022.8332907828| 5398338.833273707| 1690022.83329| 5398338.83327|
|        5|        3|1689690.9999994948| 5398180.999959535|     1689691.0| 5398180.99996|
|        8|        8|1777132.1792704347| 5448409.051320459| 1777132.17927| 5448409.05132|
|        9|        9|1777343.5924081975| 5448532.222452894|1777343.592408|5448532.222453|
|       11|       11|1777968.1414732502|5448830.3397795465|1777968.141473| 5448830.33978|
+---------+---------+------------------+------------------+--------------+--------------+



### Spark SQL join by ID with st_distance

In [7]:
#by ID
qry = "SELECT int(SiteDB_ID), `Site Data_Site DB ID` as SR_SiteID, geometry_x, geometry_y, "\
      " `Site Data_Easting (NZTM)` as easting, `Site Data_Northing (NZTM)` as northing " \
      " FROM sites, slip_rates"\
      " WHERE int(SiteDB_ID) = `Site Data_Site DB ID`"\
      " AND st_distance(st_makePoint(`Site Data_Easting (NZTM)`, `Site Data_Northing (NZTM)`), "\
      "                 st_makePoint(geometry_x, geometry_y)) > 1e-3"\
      " ORDER BY SiteDB_ID"
res = spark.sql(qry)
res.show()

+---------+---------+------------------+-----------------+-------------+-------------+
|SiteDB_ID|SR_SiteID|        geometry_x|       geometry_y|      easting|     northing|
+---------+---------+------------------+-----------------+-------------+-------------+
|        3|        3|1688883.1686561191|5397088.581135526|    1689691.0|5398180.99996|
|        7|        7|1776505.3841642379|5448136.479487686|1777235.21908| 5448283.5495|
+---------+---------+------------------+-----------------+-------------+-------------+

