## Imports

In [1]:
# Main  Imports
import numpy as np
from IPython.core.display import display
import time

# Spark Imports
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql import SQLContext

# Sedona Imports
from sedona.utils import KryoSerializer, SedonaKryoRegistrator
from sedona.register import SedonaRegistrator

# Python shapely imports
import shapely.geometry as sg
import geog

## Spark Initialization
- Load Sample Dataset (One Month)
- Create a spark_df with the sample dataset

### **SOS!**
we have to reload dataset every time we reconect to CoLab


In [2]:
# full program timer
total_time = time.time()

test_sample_path = "../testData/ais_one_hour.csv"

# Create spark session
# spark = SparkSession.builder.appName("DistributedDistanceJoin_Test")\
#     .master("local")\
#     .getOrCreate()


# create sedona spark session
spark = SparkSession. \
    builder. \
    appName('appName'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.0'). \
    getOrCreate()

SedonaRegistrator.registerAll(spark)

# Load CSV File
df = spark.read.csv(header="True", inferSchema="True", path=test_sample_path)
df.show(10)



+---+--------------------+----------+---------+
|_c0|                 _id|         X|        Y|
+---+--------------------+----------+---------+
|  0|60039bef2bc024099...|  -4.50088| 48.35698|
|  1|60039b8f2bc024099...|-4.4970617|48.379555|
|  2|60039b9c2bc024099...|-4.4137635| 48.15062|
|  3|60039bef2bc024099...| -4.500963|48.356915|
|  4|60039bbe2bc024099...| -4.474253|48.341007|
|  5|60039bef2bc024099...| -4.501045|48.356842|
|  6|60039a6c2bc024099...| -4.497015|48.382206|
|  7|60039c032bc024099...|  -4.56926| 48.13976|
|  8|60039e032bc024099...| -4.476655| 48.38109|
|  9|60039bef2bc024099...| -4.501128|48.356773|
+---+--------------------+----------+---------+
only showing top 10 rows



## Create test RDD

### **SOS! BUG**
DEN PAIZEI KALA AMA FORTONO RDD OPOS EINAI STO SXOLIO

In [3]:
# Load As RDD
print("--------------- Load As RDD ---------------")


pointSet_A = spark.read.csv(path=test_sample_path).rdd
pointSet_B = spark.read.csv(path=test_sample_path).rdd


# Test preview Datasets
print("Pointset A stats, Count: {}, \n Sample Row: {}".format(pointSet_A.count(),pointSet_A.take(1)))
print("Pointset B stats, Count: {}, \n Sample Row: {}".format(pointSet_B.count(),pointSet_B.take(1)))

--------------- Load As RDD ---------------
Pointset A stats, Count: 112599, 
 Sample Row: [Row(_c0=None, _c1='_id', _c2='X', _c3='Y')]
Pointset B stats, Count: 112599, 
 Sample Row: [Row(_c0=None, _c1='_id', _c2='X', _c3='Y')]


# Declare Create MBB map function

This helper func is used on 1st map-reduce stage in order to create the Minimum Bounding Box for eatch point of the dataset. MBB by dafinition consists of 2 points (bottom left and upper right).

In order to **convert** point into MBB we use the target distance theta in order to expand point for theta/2 on eatch side of the box

This way when we check for intersection between 2 MBB evry pair of MBB's that intersets meand tha is highly posible that those 2 MMB's satisfing ***theta*** condition

In [4]:
def calculateMBB(Px, Py, d, polyPoints=5):
  """
    :param Px: target point x coordinate
    :param Py: target point y coordinate
    :param d: the distance that we want to expand point on each side (in km)
    :param polyPoints: the shape of the poly that we want to produce
           @SOS: for square polyPoints = 5 (4+1 in order to create polygon.)
    :return bottomLeftMBB: the bottom left point of the mbb
    :return upperRightMBB: the upper right point of the mbb
  """
  p = sg.Point(Px,Py)
  angles = np.linspace(0, 360, polyPoints)
  polygon = geog.propagate(p, angles, d*1000)

  # un zip x and y coords
  x_coordinates, y_coordinates = zip(*polygon)

  # ----------- Example To Rturn points ------------
  # bottomLeftMBB =  sg.Point(min(x_coordinates), min(y_coordinates))
  # upperRightMBB =  sg.Point(max(x_coordinates), max(y_coordinates))

  # return  bottomLeftMBB, upperRightMBB
  #
  #
  # ----------- Example To Rturn Polygon ------------
  return sg.Polygon(polygon)
  #
  # ----------- Example To Rturn Coord Tuples ------------
  # bottomLeftMBB = [min(x_coordinates), min(y_coordinates)]
  # upperRightMBB = [max(x_coordinates), max(y_coordinates)]

  # return [bottomLeftMBB, upperRightMBB]


# test
# bottomLeftMBB, upperRightMBB = calculateMBB(-4.50088, 48.35698, 10)

# print("bottomLeftMBB: {}".format(bottomLeftMBB))
# print("upperRightMBB: {}".format(upperRightMBB))


# Get MBB for each dataset
For test we will use 2 times the same dataset

### Provlimata ektelesis
- Den mporei na kanei map olokliro to dataset (1305201) skaei apo mnimi.
- mexri 1m ta katafernei
- eite xrisimopoiisoume row eite oxi den exei diafora sto na ta xoresei
- episis to map akoma kai me apli prosthesi metaksi ton 2 row den einai arketa grigoro.
****
### Gia to test tha paiksw me to one day sample
skopos einai na kanw map me tin func pou eftiaksa kai meta spatial join me intersect gia ta 2 datasets

- **Step 1** Ftiaxnw mia custom row sturct (den tin exw xriimopoiisi akoma)
- **Step 2** kanw apply to map sto pointset A
- **Step 3** kanw cache to pointSet A me ta MBB's

- **kanw ta idia kai gia to pointset B**
****

In [5]:
# timing procedure
start_time = time.time()

# step1
MBB_Stracture = Row("_id", "X", "Y", "geometry")

# step2
pointSet_A_MBB = df.rdd.map(lambda row: MBB_Stracture(row[1], row[2], row[3], calculateMBB(row[2], row[3], 10))).cache()
pointSet_B_MBB = df.rdd.map(lambda row: MBB_Stracture(row[1], row[2], row[3], calculateMBB(row[2], row[3], 10))).cache()

# Test preview Datasets
print("Pointset A stats, Count: {}, \n Sample Row: {}".format(pointSet_A_MBB.count(),pointSet_A_MBB.take(2)[1]))
print("Pointset B stats, Count: {}, \n Sample Row: {}".format(pointSet_B_MBB.count(),pointSet_B_MBB.take(2)[1]))

print("--- %s seconds ---" % (time.time() - start_time))

Pointset A stats, Count: 112598, 
 Sample Row: Row(_id='60039b8f2bc0240991848034', X=-4.4970617, Y=48.379555, geometry=<shapely.geometry.polygon.Polygon object at 0x11c4450a0>)
Pointset B stats, Count: 112598, 
 Sample Row: Row(_id='60039b8f2bc0240991848034', X=-4.4970617, Y=48.379555, geometry=<shapely.geometry.polygon.Polygon object at 0x11c445b20>)
--- 98.29065108299255 seconds ---


## Spark SQL Test

- Target Query 1: Perform a simple query
- Target Query 2: perform a join based on _id

### Methodology
- step1: We need to create a dataFrame from SQL context and our RDD
- setp2: We have to register this dataframe as a Temp Table
- step3: Then we can run  SQL queries on our dataframe.

In [6]:
sqlContext = SQLContext(spark)

pointSet_A_SQL = sqlContext.createDataFrame(pointSet_A_MBB)
display(pointSet_A_SQL.head(10))

pointSet_A_SQL.printSchema()

pointSet_A_SQL.registerTempTable("raw_data")

[Row(_id='60039bef2bc0240991b16857', X=-4.50088, Y=48.35698, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f6790>),
 Row(_id='60039b8f2bc0240991848034', X=-4.4970617, Y=48.379555, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f6850>),
 Row(_id='60039b9c2bc02409918a7d6d', X=-4.4137635, Y=48.15062, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f6910>),
 Row(_id='60039bef2bc0240991b16855', X=-4.500963, Y=48.356915, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f69d0>),
 Row(_id='60039bbe2bc02409919ab3fc', X=-4.474253, Y=48.341007, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f6a90>),
 Row(_id='60039bef2bc0240991b16853', X=-4.501045, Y=48.356842, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f6b50>),
 Row(_id='60039a6c2bc024099171a177', X=-4.497015, Y=48.382206, geometry=<shapely.geometry.polygon.Polygon object at 0x11e9f6c10>),
 Row(_id='60039c032bc0240991bb75e8', X=-4.56926, Y=48.13976, geometry=<shapely.geome

root
 |-- _id: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- geometry: geometry (nullable = true)



In [7]:
pointSet_A_SQL.orderBy('X', ascending=False).show(10)

+--------------------+-----------+---------+--------------------+
|                 _id|          X|        Y|            geometry|
+--------------------+-----------+---------+--------------------+
|600398d52bc024099...|  -0.066665| 48.03318|POLYGON ((0.06782...|
|600398d82bc024099...|  -0.066665|48.341614|POLYGON ((0.06863...|
|600398d52bc024099...|  -0.066665| 48.03318|POLYGON ((0.06782...|
|60039e512bc024099...|-0.26731667|49.616188|POLYGON ((-0.1285...|
|600398d52bc024099...| -0.7333317|48.083332|POLYGON ((-0.5987...|
|600398d52bc024099...| -0.7599033|48.148026|POLYGON ((-0.6251...|
|600398982bc024099...| -1.0425516|50.431885|POLYGON ((-0.9013...|
|600398ba2bc024099...|    -1.5356|50.056896|POLYGON ((-1.3955...|
|60039e312bc024099...|  -1.595915| 47.19171|POLYGON ((-1.4635...|
|60039e312bc024099...| -1.5959216| 47.19171|POLYGON ((-1.4635...|
+--------------------+-----------+---------+--------------------+
only showing top 10 rows



In [8]:
# write raw sql
x = sqlContext.sql("""
                           SELECT X
                           FROM raw_data
                           """)
x.show(5)

+----------+
|         X|
+----------+
|  -4.50088|
|-4.4970617|
|-4.4137635|
| -4.500963|
| -4.474253|
+----------+
only showing top 5 rows



In [9]:
# use select sql command on dataframe with where and order by
pointSet_A_SQL\
    .select("_id", "X", "Y")\
    .where(pointSet_A_SQL.Y > 49)\
    .orderBy("X")\
    .show(5)

+--------------------+----------+---------+
|                 _id|         X|        Y|
+--------------------+----------+---------+
|600398982bc024099...|-7.0706515|49.238274|
|600398982bc024099...| -7.070542|49.238495|
|600398982bc024099...|-6.9999585| 49.35194|
|600398982bc024099...|-6.9997835|49.352276|
|600398982bc024099...| -6.999658|49.352432|
+--------------------+----------+---------+
only showing top 5 rows



### Perform Join Between 2 DF
- Step 1 create df for 2nd RDD
- Step 2 create an alias to call tables with shorter name (optional)
- Step 3 perform Join on _id column


In [10]:
#Step 1
pointSet_B_SQL = sqlContext.createDataFrame(pointSet_A_MBB)
display(pointSet_B_SQL.head(10))

pointSet_B_SQL.printSchema()

pointSet_B_SQL.registerTempTable("raw_data")

[Row(_id='60039bef2bc0240991b16857', X=-4.50088, Y=48.35698, geometry=<shapely.geometry.polygon.Polygon object at 0x11c2121c0>),
 Row(_id='60039b8f2bc0240991848034', X=-4.4970617, Y=48.379555, geometry=<shapely.geometry.polygon.Polygon object at 0x11c22d130>),
 Row(_id='60039b9c2bc02409918a7d6d', X=-4.4137635, Y=48.15062, geometry=<shapely.geometry.polygon.Polygon object at 0x11c212e80>),
 Row(_id='60039bef2bc0240991b16855', X=-4.500963, Y=48.356915, geometry=<shapely.geometry.polygon.Polygon object at 0x11c22d190>),
 Row(_id='60039bbe2bc02409919ab3fc', X=-4.474253, Y=48.341007, geometry=<shapely.geometry.polygon.Polygon object at 0x11c212ac0>),
 Row(_id='60039bef2bc0240991b16853', X=-4.501045, Y=48.356842, geometry=<shapely.geometry.polygon.Polygon object at 0x11ea530d0>),
 Row(_id='60039a6c2bc024099171a177', X=-4.497015, Y=48.382206, geometry=<shapely.geometry.polygon.Polygon object at 0x11c212850>),
 Row(_id='60039c032bc0240991bb75e8', X=-4.56926, Y=48.13976, geometry=<shapely.geome

root
 |-- _id: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- geometry: geometry (nullable = true)



In [11]:
#Step 2
ta = pointSet_A_SQL.alias('ta')
tb = pointSet_B_SQL.alias('tb')

# Step 3
inner_join = ta.join(tb, ta._id == tb._id)
inner_join.show()

+--------------------+----------+---------+--------------------+--------------------+----------+---------+--------------------+
|                 _id|         X|        Y|            geometry|                 _id|         X|        Y|            geometry|
+--------------------+----------+---------+--------------------+--------------------+----------+---------+--------------------+
|600398982bc024099...| -4.993778|  47.6649|POLYGON ((-4.8602...|600398982bc024099...| -4.993778|  47.6649|POLYGON ((-4.8602...|
|600398982bc024099...|-5.2876334|47.815315|POLYGON ((-5.1537...|600398982bc024099...|-5.2876334|47.815315|POLYGON ((-5.1537...|
|600398982bc024099...| -5.374895|  48.3804|POLYGON ((-5.2394...|600398982bc024099...| -5.374895|  48.3804|POLYGON ((-5.2394...|
|600398982bc024099...| -5.335962| 48.55122|POLYGON ((-5.2001...|600398982bc024099...| -5.335962| 48.55122|POLYGON ((-5.2001...|
|600398982bc024099...|-7.3271866| 46.90078|POLYGON ((-7.1955...|600398982bc024099...|-7.3271866| 46.9007

## Perorm Spatial Join

- The idea here is to perform a spatial join by checking intersection between geometry property of 2 datasets
- In order to achive this we have

In [12]:
# timing procedure
start_time = time.time()

sampleA = pointSet_A_SQL.limit(1000)
sampleB = pointSet_B_SQL.limit(1000)

sampleA.createOrReplaceTempView("sampleA")
sampleB.createOrReplaceTempView("sampleB")

spatial_join_result = spark.sql(
    """
        SELECT A._id, count(*) as freq
        FROM sampleA AS A, sampleB AS B
        WHERE ST_Intersects(A.geometry, B.geometry)
        GROUP BY A._id
    """
)

spatial_join_result.show()
print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+----+
|                 _id|freq|
+--------------------+----+
|60039bbe2bc024099...| 816|
|60039bbe2bc024099...| 816|
|60039a922bc024099...| 816|
|60039dd32bc024099...| 816|
|600399ca2bc024099...| 816|
|60039a912bc024099...| 816|
|600398ee2bc024099...| 816|
|60039a6b2bc024099...| 816|
|60039bef2bc024099...| 816|
|60039b8e2bc024099...| 816|
|60039bef2bc024099...| 816|
|600398982bc024099...|  11|
|60039e0d2bc024099...|   7|
|60039bef2bc024099...| 816|
|60039bef2bc024099...| 816|
|600399bc2bc024099...| 106|
|60039b8e2bc024099...| 816|
|60039e642bc024099...| 816|
|60039a692bc024099...| 816|
|60039bef2bc024099...| 816|
+--------------------+----+
only showing top 20 rows

--- 162.53914880752563 seconds ---


In [13]:
print("--- TOTAL EXECUTION TIME ---\n--- %s seconds ---" % (time.time() - total_time))


--- TOTAL EXECUTION TIME ---
--- 321.267569065094 seconds ---
