## FFE Part 2 geo-join 

This geo-join operation creates a set of edges for buildings where the distance between centers < N. 

Here, we take the wellington buildings data set prepare the data for stage 3  

In [1]:
import geomesa_pyspark

from pyspark.sql import Row
conf = geomesa_pyspark.configure().setAppName('Demo1')

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

# geomesa_pyspark.init_sql(spark) in later version 2.4 DOCS)
spark._jvm.org.apache.spark.sql.SQLTypes.init(spark._jwrapped)

In [2]:
## CONFIG
MAX_VERTICES = int(1e3)

In [3]:
filename = "file:///home/jovyan/DEMO/buildings_raw.csv"

df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(filename)

In [4]:
df.createOrReplaceTempView('df')

## Export asset dataset to parquet

The parquet format is richer thn CSV as it includes schema and partitioning info need be Spark jobs. 

In [10]:
%%time
qry = "SELECT df.*, st_point(df.X, df.Y) AS location " +\
    "FROM df where df._c0 < %s" % MAX_VERTICES

result = spark.sql(qry)
result.cache()
result.write.save("file:///geodata/ffe_%s_vertices.parquet" % MAX_VERTICES, format="parquet", mode="overwrite")
result.select('_c0', 'location', 'geometry', 'centroid').show()
#resultDataFrame.show()

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [6]:
## repartition makes spark more efficient, more smaller task makes the workload smoother.

newdf = df.repartition(72)
newdf.createOrReplaceTempView('df')

## Geo join => beating O(n^2)

In pyspark we must use the SQL approach to access the geomesa geo functions (st_point, st_distance).

Note that there's no performance differnce SQL and JDT API's - they both handled identically in Spark.Core.

In [7]:
%%time
qry = "SELECT df1._c0 as id, " +\
    "df2._c0 as near_id " +\
    "FROM df as df1, df as df2 " +\
    "WHERE st_distance(st_point(df1.X, df1.Y), st_point(df2.X, df2.Y)) < 50 "+\
    "AND df1._c0 <> df2._c0 " +\
    "AND df1._c0 < %s and df2._c0 < %s" % (MAX_VERTICES, MAX_VERTICES)
edges = spark.sql(qry)
count = result.count()
print("join + count &s took %s with %s edges" % (MAX_VERTICES, dt.utcnow()-t0, count))

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

## Export edges dataset to parquet

In [8]:
## export as parquet
edges.write.save("file://geodata/ffe_%s_edges.parquet" % MAX_VERTICES, format="parquet", mode="overwrite")

NameError: name 'edges' is not defined