# Environment Set up

Download and Install Apache Sedona \
Code adopted and modified from: https://colab.research.google.com/drive/15cLZWf8Jnk2CvJePAgfjz-t00JPLDTSP?usp=sharing

## Install Java

In [159]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## Download Spark

In [160]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

## Set Environment Variables

In [161]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = "/content/spark-3.5.3-bin-hadoop3/python"

## Install PySpark

In [162]:
!pip install findspark
import findspark
findspark.init()



In [163]:
!pip install pyspark



## Install Apache Sedona

In [164]:
!pip install apache-sedona[spark]



## Start Sedona

In [165]:
from sedona.spark import *
config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.6.1,'
           'org.datasyslab:geotools-wrapper:1.6.1-28.2'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
sedona = SedonaContext.create(config)

## Start Spark Session

In [166]:
# reference: https://spark.apache.org/docs/latest/sql-getting-started.html

from pyspark.sql import SparkSession
# Creating a SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark") \
    .getOrCreate()

# Regular 3D Spatio-temporal Data Processing

Milestone I

## Read the input data

In [167]:
import json
import pandas as pd

# Read the input data
# We will be working with Twitter Dataset: https://drive.google.com/file/d/1wt2YNiimTe_7-ig9Vf73SJ6N2fLsDug3/view
input_file = "2017-07-22_09-02-53.txt"

In [168]:
# Parse the spatio-temporal data from the text corpus
spatio_temporal_data = []

# Loop through corpus
with open(input_file, 'r', encoding='utf-8') as f:
  for line in f:
    twitter = json.loads(line)

    # Get twitter ID (identifier)
    # id = twitter.get("id")
    # Choose str for interpretation purpose
    id = twitter.get("id_str")

    # If applicable, get country name
    place = twitter.get("place")
    if place is not None:
      country = place.get("country")
    else:
      country = None

    # Get the bounding box, the box should consists of four coordinates
    if place is not None:
      bounding_box = place.get("bounding_box")
      if bounding_box is not None:
        bounding_box_coordinates = bounding_box.get("coordinates")[0]
      else:
        bounding_box_coordinates = None
    else:
      bounding_box_coordinates = None

    # If applicable, get coordinates
    geo = twitter.get("geo")
    if geo is not None:
      coordinates = geo.get("coordinates")
      latitude = coordinates[0]
      longitude = coordinates[1]
    else:
      # We will pick the middle of the bounding box as coordinate
      if bounding_box_coordinates is not None:
        # Somehow it is reversed
        # Bounding box coordinates [longitude, latitude]
        longitude = (bounding_box_coordinates[0][0] + bounding_box_coordinates[2][0]) / 2
        latitude = (bounding_box_coordinates[0][1] + bounding_box_coordinates[1][1]) / 2
      else:
        latitude = None
        longitude = None

    # Get the timestamp
    timestamp_ms = twitter.get("timestamp_ms")

    # Append all necessary information into the list
    spatio_temporal_data.append({
        "twitter_id": id,
        "latitude": latitude,
        "longitude": longitude,
        "country": country,
        "bounding_box_coordinates": bounding_box_coordinates,
        # Was thinking changing to int, but it will be converted to string anyways
        # When applying RDD, so we will keep it in the type of string
        "timestamp_ms": timestamp_ms
    })

# Convert the data into Pandas Dataframe
df = pd.DataFrame(spatio_temporal_data)

In [169]:
# Create another column of type geometry: (latitude, longtitude)
# reference: https://automating-gis-processes.github.io/CSC18/lessons/L1/Geometric-Objects.html
from shapely.geometry import Point
df["geometry"] = df.apply(lambda x: Point(x["latitude"], x["longitude"]), axis=1)

In [170]:
# Check the dataframe
df.head()

Unnamed: 0,twitter_id,latitude,longitude,country,bounding_box_coordinates,timestamp_ms,geometry
0,888685771020009472,53.456952,-2.23348,United Kingdom,"[[-2.319934, 53.343623], [-2.319934, 53.570282...",1500714173451,POINT (53.4569525 -2.23348)
1,888685771208613889,28.377247,129.493744,日本,"[[129.343186, 28.193773], [129.343186, 28.5307...",1500714173496,POINT (28.377247 129.493744)
2,888685771602878465,10.496,-66.8983,Venezuela,"[[-66.998581, 10.440464], [-66.998581, 10.5266...",1500714173590,POINT (10.496 -66.8983)
3,888685767148544001,22.869936,113.419725,People's Republic of China,"[[109.664659, 20.221264], [109.664659, 25.5186...",1500714172528,POINT (22.869936000000003 113.4197245)
4,888685770881368069,35.701657,139.709181,日本,"[[139.673228, 35.673404], [139.673228, 35.7299...",1500714173418,POINT (35.701657 139.7091805)


## Partition and Index the data

reference: https://sedona.apache.org/1.6.1/tutorial/rdd/ \
https://sedona.apache.org/1.0.0-1.2.0-incubating/tutorial/core-python/ \
https://sparkbyexamples.com/spark/spark-read-options/

In [171]:
# Convert pandas DataFrame to spark DataFrame
spark_df = spark.createDataFrame(df)

In [172]:
# Check Spark DataFrame
spark_df.show()

+------------------+------------------+-------------------+--------------------+------------------------+-------------+--------------------+
|        twitter_id|          latitude|          longitude|             country|bounding_box_coordinates| timestamp_ms|            geometry|
+------------------+------------------+-------------------+--------------------+------------------------+-------------+--------------------+
|888685771020009472|        53.4569525|           -2.23348|      United Kingdom|    [[-2.319934, 53.3...|1500714173451|POINT (53.4569525...|
|888685771208613889|         28.377247|         129.493744|                日本|    [[129.343186, 28....|1500714173496|POINT (28.377247 ...|
|888685771602878465|            10.496|           -66.8983|           Venezuela|    [[-66.998581, 10....|1500714173590|POINT (10.496 -66...|
|888685767148544001|22.869936000000003|        113.4197245|People's Republic...|    [[109.664659, 20....|1500714172528|POINT (22.8699360...|
|88868577088136

In [173]:
# Create a temporary SQL view from Spark Dataframe
spark_df.createOrReplaceTempView("twitter")

In [174]:
# Create SedonaSQL DataFrame
# We will only take Spatio-temporal data and an identifier for each data point
spatial_df = spark.sql(
    """
        SELECT twitter_id, geometry, timestamp_ms
        FROM twitter
    """
)
spatial_df.printSchema()

root
 |-- twitter_id: string (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- timestamp_ms: string (nullable = true)



In [175]:
# We will use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD
# SpatialRDD carry non-spatial attributes, twitter_id and timestamp_ms are in the UserData, separated by "\t"
from sedona.utils.adapter import Adapter
spatial_rdd = Adapter.toSpatialRdd(spatial_df, "geometry")
spatial_rdd.analyze()

# spatial_rdd.boundaryEnvelope

True

In [176]:
# We can check our UserData() field here
spatial_rdd.rawSpatialRDD.map(lambda x: x.getUserData()).take(1)

['888685771020009472\t1500714173451']

In [177]:
# Use spatialPartitioning function to partition the data across the nodes
# GridType does not have RTREE, we use default KDBTREE
from sedona.core.enums import GridType
spatial_rdd.spatialPartitioning(GridType.KDBTREE)
# Check if partition is successful
print(spatial_rdd.getPartitioner())

SpatialPartitioner(name='KDBTreePartitioner', jvm_partitioner=JavaObject id=o313)


In [178]:
# Build an R-Tree index on each data partition
# RDD will be indexed based on spatial coordinates
from sedona.core.enums import IndexType
# According to Laila, we don't need to run join query
build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
spatial_rdd.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

## Run Range query with time filter on the data

reference: https://sedona.apache.org/1.0.0-1.2.0-incubating/tutorial/core-python/

In [179]:
# We will first run query to find a spatial region
# Defined by bounding rectangle (Envelope here)
from sedona.core.spatialOperator import RangeQuery
from sedona.core.geom.envelope import Envelope
import timeit

# Envelope (x_min, x_max, y_min, y_max)
# We will try cover China
# Reference: https://gist.github.com/graydon/11198540
range_query_window = Envelope(18.198, 53.459, 73.675, 135.026)
# Only return gemeotries fully covered by the window
consider_boundary_intersection = False
using_index = True

spatio_start = timeit.default_timer()
query_result = RangeQuery.SpatialRangeQuery(
    spatial_rdd,
    range_query_window,
    consider_boundary_intersection,
    using_index
)
spatio_end = timeit.default_timer()
# Get query time for querying through spatio data
spatio_time = spatio_end - spatio_start

In [180]:
# Transforme query result to GeoPandas GeoDataFrame for better interpretation
import geopandas as gpd
geo_df = gpd.GeoDataFrame(
    # We will split the userData back to twitter_id and timestamp_ms
    # And here, we will change timestamp_ms's type to int
    query_result.map(lambda x: [x.userData.split("\t")[0], x.geom, int(x.userData.split("\t")[1])]).collect(),
    columns=["twitter_id", "geometry", "timestamp_ms"],
    geometry="geometry"
)

In [181]:
geo_df.head()

Unnamed: 0,twitter_id,geometry,timestamp_ms
0,888685888955457536,POINT (34.282 74.465),1500714201569
1,888689577564786688,POINT (34.585 77.471),1500715081002
2,888688490145304576,POINT (34.278 77.604),1500714821741
3,888690796622381057,POINT (34.278 77.604),1500715371648
4,888685927278792704,POINT (35.666 134.539),1500714210706


In [182]:
geo_df.shape

(5373, 3)

In [183]:
temporal_start = timeit.default_timer()

# Run query to apply time filters on the candidate set
final_result_df = geo_df[(geo_df["timestamp_ms"] <= 1500715081002) & (geo_df["timestamp_ms"] >= 1500714201569)]
temporal_end = timeit.default_timer()
# Get query time for querying through temporal data
temporal_time = temporal_end - temporal_start

In [184]:
final_result_df.shape

(1675, 3)

In [185]:
print(f"Time used to query the spatio data: {spatio_time} seconds")
print(f"Time used to query the temporal data: {temporal_time} seconds")
print(f"Total time needed to query through spatio-temporal data is: {spatio_time + temporal_time} seconds")

Time used to query the spatio data: 0.018559520998678636 seconds
Time used to query the temporal data: 0.003120638999462244 seconds
Total time needed to query through spatio-temporal data is: 0.02168015999814088 seconds


# Hilbert Curve Based 2D Spatio-temporal Data Processing

We will read in the data similarly as we did in regular approach

## Read the input data

In [186]:
import json
import pandas as pd

# Read the input data
# We will be working with Twitter Dataset: https://drive.google.com/file/d/1wt2YNiimTe_7-ig9Vf73SJ6N2fLsDug3/view
input_file = "2017-07-22_09-02-53.txt"

In [187]:
# Parse the spatio-temporal data from the text corpus
spatio_temporal_data = []

# Loop through corpus
with open(input_file, 'r', encoding='utf-8') as f:
  for line in f:
    twitter = json.loads(line)

    # Get twitter ID (identifier)
    # id = twitter.get("id")
    # Choose str for interpretation purpose
    id = twitter.get("id_str")

    # If applicable, get country name
    place = twitter.get("place")
    if place is not None:
      country = place.get("country")
    else:
      country = None

    # Get the bounding box, the box should consists of four coordinates
    if place is not None:
      bounding_box = place.get("bounding_box")
      if bounding_box is not None:
        bounding_box_coordinates = bounding_box.get("coordinates")[0]
      else:
        bounding_box_coordinates = None
    else:
      bounding_box_coordinates = None

    # If applicable, get coordinates
    geo = twitter.get("geo")
    if geo is not None:
      coordinates = geo.get("coordinates")
      latitude = coordinates[0]
      longitude = coordinates[1]
    else:
      # We will pick the middle of the bounding box as coordinate
      if bounding_box_coordinates is not None:
        # Somehow it is reversed
        # Bounding box coordinates [longitude, latitude]
        longitude = (bounding_box_coordinates[0][0] + bounding_box_coordinates[2][0]) / 2
        latitude = (bounding_box_coordinates[0][1] + bounding_box_coordinates[1][1]) / 2
      else:
        latitude = None
        longitude = None

    # Get the timestamp
    timestamp_ms = twitter.get("timestamp_ms")

    # Append all necessary information into the list
    spatio_temporal_data.append({
        "twitter_id": id,
        "latitude": latitude,
        "longitude": longitude,
        "country": country,
        "bounding_box_coordinates": bounding_box_coordinates,
        # Was thinking changing to int, but it will be converted to string anyways
        # When applying RDD, so we will keep it in the type of string
        "timestamp_ms": timestamp_ms
    })

# Convert the data into Pandas Dataframe
df = pd.DataFrame(spatio_temporal_data)

In [188]:
# Create another column of type geometry: (latitude, longtitude)
# reference: https://automating-gis-processes.github.io/CSC18/lessons/L1/Geometric-Objects.html
from shapely.geometry import Point
df["geometry"] = df.apply(lambda x: Point(x["latitude"], x["longitude"]), axis=1)

In [189]:
# Check the dataframe
df.head()

Unnamed: 0,twitter_id,latitude,longitude,country,bounding_box_coordinates,timestamp_ms,geometry
0,888685771020009472,53.456952,-2.23348,United Kingdom,"[[-2.319934, 53.343623], [-2.319934, 53.570282...",1500714173451,POINT (53.4569525 -2.23348)
1,888685771208613889,28.377247,129.493744,日本,"[[129.343186, 28.193773], [129.343186, 28.5307...",1500714173496,POINT (28.377247 129.493744)
2,888685771602878465,10.496,-66.8983,Venezuela,"[[-66.998581, 10.440464], [-66.998581, 10.5266...",1500714173590,POINT (10.496 -66.8983)
3,888685767148544001,22.869936,113.419725,People's Republic of China,"[[109.664659, 20.221264], [109.664659, 25.5186...",1500714172528,POINT (22.869936000000003 113.4197245)
4,888685770881368069,35.701657,139.709181,日本,"[[139.673228, 35.673404], [139.673228, 35.7299...",1500714173418,POINT (35.701657 139.7091805)


In [190]:
# Remove rows if the column "geometry" is NULL
df = df[df["twitter_id"].notnull()]

## Milestone II: apply Hilbert Curve (Existing Hilbert Curve Library)

reference: https://pypi.org/project/hilbertcurve/
https://github.com/jokergoo/HilbertCurve

In [191]:
!pip install hilbertcurve



In [192]:
# Extract the latitude and longitude information from the original dataframe
points = df[["latitude", "longitude"]].values
# Check the first data point
print(points[0])

[53.4569525 -2.23348  ]


In [193]:
# reference: https://docs.python.org/3/library/decimal.html
from decimal import Decimal, ROUND_HALF_UP

# Using the Hilbert Curve method, all data points must be integer and satisfy [>=0, >=0]
# Thus we have to normalize our geometric data (for having negative and decimal points)
def normalize(point):
  # Input will be in form: (latitude, longitude)
  # We will use decimal library to bypass the python digit precision error
  # ROUND_HALF_UP is to keep consistence on the rounding method
  # Since latitude ranges from [-90, 90], we will increment it by 90
  # Since longitude ranges from [-180, 180], we will increment it by 180
  # We times it by 100 to idicate that we will keep two decimal places for all the coordinates
  latitude = point[0]
  longitude = point[1]
  latitude = int(100 * float(Decimal(latitude).quantize(Decimal("0.01"), rounding = ROUND_HALF_UP) + Decimal("90.00")))
  longitude = int(100 * float(Decimal(longitude).quantize(Decimal("0.01"), rounding = ROUND_HALF_UP) + Decimal("180.00")))
  return latitude, longitude

In [194]:
# Apply normalization on each data point
for i in range(len(points)):
  points[i] = normalize(points[i])

# Check that the first data point is updated successfully
print(points[0])

[14346. 17777.]


In [195]:
# We will be utilizing the existing python library: https://pypi.org/project/hilbertcurve/
from hilbertcurve.hilbertcurve import HilbertCurve

# Number of iterations
# Hilbert Curve must be N x N grid, and the side for our grid will be 2^p
# I choose 16 because 2^16 = 65536
# Longitude has higher range of [-180, 180], if we normalize it, it will become [0, 360]
# We will keep 2 decimals for each coordinates, thus we need at least 36000 points on each axis
p = 16
# Number of dimension (2, since we are dealing with geometric data)
n = 2
# Call Hilbert Curve class
hilbert_curve = HilbertCurve(p, n)

In [196]:
# Sample run to ensure accuracy of the approach
sample_points = [(-90, -180), (-90, -179.99), (-89.99, -179.99), (-89.99, -180), (90, 180), (89.99, 180), (90, 179.99), (89.99, 179.99)]
# normalize sample_points
for i in range(len(sample_points)):
  sample_points[i] = normalize(sample_points[i])

# Calculate distances along the Hilbert curve for all points
distances = hilbert_curve.distances_from_points(sample_points)

# Verify results:
for point, dist in zip(sample_points, distances):
  print(f"Point: {point}, Distance: {dist}")

Point: (0, 0), Distance: 0
Point: (0, 1), Distance: 3
Point: (1, 1), Distance: 2
Point: (1, 0), Distance: 1
Point: (18000, 36000), Distance: 2051464618
Point: (17999, 36000), Distance: 2051464789
Point: (18000, 35999), Distance: 2051461717
Point: (17999, 35999), Distance: 2051461546


The results are totally expected and proves that the Hilbert Curve method works.

In [197]:
# Calculate distances along the Hilbert curve for all points
distances = hilbert_curve.distances_from_points(points)

# Assign distances/Hilbert Value back to dataset
df["hilbert_value"] = distances

# Group with timestamp_ms to create new geometry column
df["hilb_geometry"] = df.apply(lambda x: Point(x["hilbert_value"], x["timestamp_ms"]), axis=1)

In [198]:
# Check our new dataframe
df.head()

Unnamed: 0,twitter_id,latitude,longitude,country,bounding_box_coordinates,timestamp_ms,geometry,hilbert_value,hilb_geometry
0,888685771020009472,53.456952,-2.23348,United Kingdom,"[[-2.319934, 53.343623], [-2.319934, 53.570282...",1500714173451,POINT (53.4569525 -2.23348),896614247,POINT (896614247 1500714173451)
1,888685771208613889,28.377247,129.493744,日本,"[[129.343186, 28.193773], [129.343186, 28.5307...",1500714173496,POINT (28.377247 129.493744),823587783,POINT (823587783 1500714173496)
2,888685771602878465,10.496,-66.8983,Venezuela,"[[-66.998581, 10.440464], [-66.998581, 10.5266...",1500714173590,POINT (10.496 -66.8983),147830008,POINT (147830008 1500714173590)
3,888685767148544001,22.869936,113.419725,People's Republic of China,"[[109.664659, 20.221264], [109.664659, 25.5186...",1500714172528,POINT (22.869936000000003 113.4197245),838232297,POINT (838232297 1500714172528)
4,888685770881368069,35.701657,139.709181,日本,"[[139.673228, 35.673404], [139.673228, 35.7299...",1500714173418,POINT (35.701657 139.7091805),821689672,POINT (821689672 1500714173418)


## Milestone II: apply Hilbert Curve (Custom Hilbert Curve Library)

In [199]:
# Extract the latitude and longitude information from the original dataframe to a list
points = df[["latitude", "longitude"]].values.tolist()
# Check the first data point
print(points[0])

[53.4569525, -2.23348]


In [200]:
# reference: https://docs.python.org/3/library/decimal.html
from decimal import Decimal, ROUND_HALF_UP

# Using the Hilbert Curve method, all data points must be integer and satisfy [>=0, >=0]
# Thus we have to normalize our geometric data (for having negative and decimal points)
def normalize(point):
  # Input will be in form: (latitude, longitude)
  # We will use decimal library to bypass the python digit precision error
  # ROUND_HALF_UP is to keep consistence on the rounding method
  # Since latitude ranges from [-90, 90], we will increment it by 90
  # Since longitude ranges from [-180, 180], we will increment it by 180
  # We times it by 100 to idicate that we will keep two decimal places for all the coordinates
  latitude = point[0]
  longitude = point[1]
  latitude = int(100 * float(Decimal(latitude).quantize(Decimal("0.01"), rounding = ROUND_HALF_UP) + Decimal("90.00")))
  longitude = int(100 * float(Decimal(longitude).quantize(Decimal("0.01"), rounding = ROUND_HALF_UP) + Decimal("180.00")))
  return latitude, longitude

In [201]:
# Apply normalization on each data point
for i in range(len(points)):
  points[i] = normalize(points[i])

# Make sure all points are in form of list (not tuple) to match our implementation
for i in range(len(points)):
  points[i] = list(points[i])

# Check that the first data point is updated successfully
print(points[0])

[14346, 17777]


Python implementation, given coordinates, return the Hilbert Value along the 2^p X 2^p Hilbert Curve\
Code based on paper: "Programming the Hilbert curve" by John Skilling \
Reference: https://stackoverflow.com/questions/499166/mapping-n-dimensional-value-to-a-point-on-hilbert-curve \
Refernece: https://github.com/galtay/hilbertcurve \
Reference: https://pubs.aip.org/aip/acp/article-abstract/707/1/381/719611/Programming-the-Hilbert-curve?redirectedFrom=fulltext \
Reference: https://www.grant-trebbin.com/2017/03/calculating-hilbert-curve-coordinates.html

In [202]:
# Number of iterations
# Hilbert Curve must be N x N grid, and the side for our grid will be 2^p
# I choose 16 because 2^16 = 65536
# Longitude has higher range of [-180, 180], if we normalize it, it will become [0, 360]
# We will keep 2 decimals for each coordinates, thus we need at least 36000 points on each axis
p = 16

# Function to return the Hilbert Value, given iteration p and coordiante
# Since we are only dealing with coordinates, the dimension is fixed 2
def Get_Hilbert_value(point, bits):
  # Initialize the parameter
  hilbert_value = 0
  # Copy the input coordinates
  X = point[:]
  # Get the highest bit mask
  M = 1 << (bits - 1)
  # Number of dimension, since we will work on geometric data, it is 2
  n = 2

  # Inverse Undo
  Q = M
  while Q > 1:
    P = Q - 1
    for i in range(n):
      # If top bit of X[i] is OFF
      if X[i] & Q:
        # Invert
        # More particularly, invert low bits of X_0
        X[0] ^= P
      else:
        # If top bit of X[i] is ON
        # Exchange
        # More particularly, exchange low bits of X_0 with X_i
        t = (X[0] ^ X[i]) & P
        X[0] ^= t
        X[i] ^= t
    # Shift bit to the right
    Q >>= 1

  # Gray encode
  # Applying Gray code to improve spatial locality
  # The key is to: apply Gray code to all bits of the Hilbert integer at once
  # Every coordinate XOR with previous one (except first one)
  for i in range(1, n):
    X[i] ^= X[i - 1]
  # Check on the last coordinate
  t = 0
  Q = M
  while Q > 1:
    if X[n - 1] & Q:
      t ^= Q - 1
    Q >>= 1
  # Final adjustment to ensure encoding correctness
  for i in range(n):
    X[i] ^= t

  # X now stores the transposed Hilbert Indexes
  # Decode the transposed Hilbert Index
  # This is achieved by interleavingly join the bits among transposed Hilbert Index
  # Get the binary representation of the coordinates, fill up to bits
  # reference: https://stackoverflow.com/questions/10411085/converting-integer-to-binary-in-python
  X_binary = [bin(x)[2:].zfill(bits) for x in X]
  # Form new binary to interleavingly join the bits
  bi_hilbert_value = ""
  # We start with left most bit from first coordinate
  for i in range (bits):
    for j in range(len(X_binary)):
      bi_hilbert_value += X_binary[j][i]
  # Convert the Hilbert index to integer, which is our Hilbert Value
  hilbert_value = int(bi_hilbert_value, 2)

  return hilbert_value

In [203]:
# Sample run to ensure accuracy of the approach
sample_points = [(-90, -180), (-90, -179.99), (-89.99, -179.99), (-89.99, -180), (90, 180), (89.99, 180), (90, 179.99), (89.99, 179.99), (53.4569525, -2.23348)]
# normalize sample_points
for i in range(len(sample_points)):
  sample_points[i] = list(normalize(sample_points[i]))

# Calculate distances along the Hilbert curve for all points
distances = []
for point in sample_points:
  distances.append(Get_Hilbert_value(point, p))

# Verify results:
for point, dist in zip(sample_points, distances):
  print(f"Point: {point}, Distance: {dist}")

Point: [0, 0], Distance: 0
Point: [0, 1], Distance: 3
Point: [1, 1], Distance: 2
Point: [1, 0], Distance: 1
Point: [18000, 36000], Distance: 2051464618
Point: [17999, 36000], Distance: 2051464789
Point: [18000, 35999], Distance: 2051461717
Point: [17999, 35999], Distance: 2051461546
Point: [14346, 17777], Distance: 896614247


The results are totally expected, it matches the existing hilbertcurve library and proves that the Hilbert Curve method works.

In [204]:
# Calculate distances along the Hilbert curve for all points
distances = []
for point in points:
  distances.append(Get_Hilbert_value(point, p))

# Assign distances/Hilbert Value back to dataset
df["hilbert_value"] = distances

# Group with timestamp_ms to create new geometry column
df["hilb_geometry"] = df.apply(lambda x: Point(x["hilbert_value"], x["timestamp_ms"]), axis=1)

In [205]:
df.head()

Unnamed: 0,twitter_id,latitude,longitude,country,bounding_box_coordinates,timestamp_ms,geometry,hilbert_value,hilb_geometry
0,888685771020009472,53.456952,-2.23348,United Kingdom,"[[-2.319934, 53.343623], [-2.319934, 53.570282...",1500714173451,POINT (53.4569525 -2.23348),896614247,POINT (896614247 1500714173451)
1,888685771208613889,28.377247,129.493744,日本,"[[129.343186, 28.193773], [129.343186, 28.5307...",1500714173496,POINT (28.377247 129.493744),823587783,POINT (823587783 1500714173496)
2,888685771602878465,10.496,-66.8983,Venezuela,"[[-66.998581, 10.440464], [-66.998581, 10.5266...",1500714173590,POINT (10.496 -66.8983),147830008,POINT (147830008 1500714173590)
3,888685767148544001,22.869936,113.419725,People's Republic of China,"[[109.664659, 20.221264], [109.664659, 25.5186...",1500714172528,POINT (22.869936000000003 113.4197245),838232297,POINT (838232297 1500714172528)
4,888685770881368069,35.701657,139.709181,日本,"[[139.673228, 35.673404], [139.673228, 35.7299...",1500714173418,POINT (35.701657 139.7091805),821689672,POINT (821689672 1500714173418)


## Partition and Index the data

In [206]:
# Convert pandas DataFrame to spark DataFrame
spark_df = spark.createDataFrame(df)

In [207]:
# Check Spark DataFrame
spark_df.show()

+------------------+------------------+-------------------+--------------------+------------------------+-------------+--------------------+-------------+--------------------+
|        twitter_id|          latitude|          longitude|             country|bounding_box_coordinates| timestamp_ms|            geometry|hilbert_value|       hilb_geometry|
+------------------+------------------+-------------------+--------------------+------------------------+-------------+--------------------+-------------+--------------------+
|888685771020009472|        53.4569525|           -2.23348|      United Kingdom|    [[-2.319934, 53.3...|1500714173451|POINT (53.4569525...|    896614247|POINT (896614247 ...|
|888685771208613889|         28.377247|         129.493744|                日本|    [[129.343186, 28....|1500714173496|POINT (28.377247 ...|    823587783|POINT (823587783 ...|
|888685771602878465|            10.496|           -66.8983|           Venezuela|    [[-66.998581, 10....|1500714173590|POI

In [208]:
# Create a temporary SQL view from Spark Dataframe
spark_df.createOrReplaceTempView("twitter")

In [209]:
# Create SedonaSQL DataFrame
# We will only take Spatio-temporal data and an identifier for each data point
# We have now converted spatiotemporal point (xy, t) into 2D point (h_value, t)
spatial_df = spark.sql(
    """
        SELECT twitter_id, hilb_geometry
        FROM twitter
    """
)
spatial_df.printSchema()

root
 |-- twitter_id: string (nullable = true)
 |-- hilb_geometry: geometry (nullable = true)



In [210]:
# We will use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD
# SpatialRDD carry non-spatial attributes, twitter_id and timestamp_ms are in the UserData, separated by "\t"
spatial_rdd = Adapter.toSpatialRdd(spatial_df, "hilb_geometry")
spatial_rdd.analyze()

# spatial_rdd.boundaryEnvelope

True

In [211]:
# We can check our UserData() field here
# Should only have an data point identifier, which is the twitter ID
spatial_rdd.rawSpatialRDD.map(lambda x: x.getUserData()).take(1)

['888685771020009472']

In [212]:
# Use spatialPartitioning function to partition the data across the nodes
# GridType does not have RTREE, we use default KDBTREE
spatial_rdd.spatialPartitioning(GridType.KDBTREE)
# Check if partition is successful
print(spatial_rdd.getPartitioner())

SpatialPartitioner(name='KDBTreePartitioner', jvm_partitioner=JavaObject id=o371)


In [213]:
# Build an R-Tree index on each data partition
# RDD will be indexed based on spatial coordinates
# According to Laila, we don't need to run join query
build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
spatial_rdd.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

## Run Range query with time lter on the data

### Filter Step: Enlarge the query, convert corners to Hilbert Value and run range query

In [214]:
# We want to match our previous range query
# Which has:
# Latitude range
# x_min: 18.198, x_max: 53.459
# Longitude range
# y_min: 73.675, y_max = 135.026
# Timestamp range
# t_min: 1500714201569, t_max: 1500715081002

# Since our grid are with two decimal space
# The smallest enclosing rectangle should have corner:
high_point = (53.46, 135.03)
low_point = (18.19, 73.67)

In [215]:
# Normalize the points
high_point = list(normalize(high_point))
low_point = list(normalize(low_point))

In [216]:
print("For normalized high point:", high_point, ", its corresponding Hilbert Value is:", Get_Hilbert_value(high_point, p))
print("For normalized low point:", low_point, ", its corresponding Hilbert Value is:", Get_Hilbert_value(low_point, p))

For normalized high point: [14346, 31502] , its corresponding Hilbert Value is: 807774776
For normalized low point: [10819, 25367] , its corresponding Hilbert Value is: 852204314


In [217]:
# Envelope (x_min, x_max, y_min, y_max)
# We will try cover China
# We will replace range with:
# x: Hilbert Value, y: timestamp
range_query_window = Envelope(807774776, 852204314, 1500714201569, 1500715081002)
# Only return gemeotries fully covered by the window
consider_boundary_intersection = False
using_index = True

query_start = timeit.default_timer()
query_result = RangeQuery.SpatialRangeQuery(
    spatial_rdd,
    range_query_window,
    consider_boundary_intersection,
    using_index
)
query_end = timeit.default_timer()
# Get query time for querying through spatiotemporal data
query_time = query_end - query_start

In [218]:
# Transforme query result to GeoPandas GeoDataFrame for better interpretation
new_geo_df = gpd.GeoDataFrame(
    # We will split the userData back to twitter_id and timestamp_ms
    # And here, we will change timestamp_ms's type to int
    query_result.map(lambda pt: [pt.userData, int(pt.geom.x), int(pt.geom.y)]).collect(),
    columns=["twitter_id", "hilb_value", "timestamp"]
)

In [219]:
new_geo_df.head()

Unnamed: 0,twitter_id,hilb_value,timestamp
0,888685908450353152,833258361,1500714206217
1,888685913240293377,833002040,1500714207359
2,888685915018809345,831409330,1500714207783
3,888685889349705729,821705912,1500714201663
4,888685903706640384,823427275,1500714205086


In [220]:
new_geo_df.shape

(9970, 3)

We can see that the results it provided does not exactly match our previous result, so we will need a refinement/validation step to remove all the false positives

### Refinement Step: Convert result values back and remove false positive

In [221]:
# Denormalize the input coordinates
# Input will be in form: (latitude, longitude)
# We will use this function to reverse the normalize step
def denormalize(point):
  # Decrement latitude by 90
  # Decrement longtitude by 180
  latitude = point[0]
  longitude = point[1]
  latitude = float(Decimal(latitude / 100).quantize(Decimal("0.01"), rounding = ROUND_HALF_UP) - Decimal("90.00"))
  longitude = float(Decimal(longitude / 100).quantize(Decimal("0.01"), rounding = ROUND_HALF_UP) - Decimal("180.00"))
  return latitude, longitude

#### Convert Hilbert Value back to Coordinates using existing Python Library

In [222]:
# Using existing Hilbert Curve Library to convert Hilbert Value back to Coordinates
# reference: https://pypi.org/project/hilbertcurve/
hilbert_curve = HilbertCurve(16, 2)

In [223]:
# Check the correctness
distances = (807774776, 852204314)
points = hilbert_curve.points_from_distances(distances)
for point, dist in zip(points, distances):
  print(f"Point: {point}, Distance: {dist}")

# For normalized high point: [14346, 31502] , its corresponding Hilbert Value is: 807774776
# For normalized low point: [10819, 25367] , its corresponding Hilbert Value is: 852204314

Point: [14346, 31502], Distance: 807774776
Point: [10819, 25367], Distance: 852204314


In [224]:
# Retrieve Hilbert Value from the Dataframe
hilb_values = new_geo_df["hilb_value"]

convertion_start = timeit.default_timer()
# Convert the Hilbert Values to Points
points = hilbert_curve.points_from_distances(hilb_values)

latitude_list = []
longitude_list = []
# Loop through the points list
for point in points:
  # Denormalize point
  point = denormalize(point)
  # Append the coordinates to lists
  latitude_list.append(point[0])
  longitude_list.append(point[1])
convertion_end = timeit.default_timer()
# Get time for converting data back from 2D to 3D
convertion_time = convertion_end - convertion_start

# Add the lists as columns to the Dataframe
new_geo_df["latitude"] = latitude_list
new_geo_df["longitude"] = longitude_list

In [225]:
new_geo_df.head()

Unnamed: 0,twitter_id,hilb_value,timestamp,latitude,longitude
0,888685908450353152,833258361,1500714206217,-6.92,107.77
1,888685913240293377,833002040,1500714207359,-7.98,112.62
2,888685915018809345,831409330,1500714207783,6.49,124.85
3,888685889349705729,821705912,1500714201663,35.3,138.94
4,888685903706640384,823427275,1500714205086,31.47,130.98


#### Convert Hilbert Value back to Coordinates using custom function

In [226]:
# Main reference: https://pubs.aip.org/aip/acp/article-abstract/707/1/381/719611/Programming-the-Hilbert-curve?redirectedFrom=fulltext
# Function to return the point value, given iteration p and Hilbert Value
# Since we are only dealing with coordinates, the dimension is fixed 2
def Get_Coordinates(hilb_value, bits):
  # Number of dimension, since we will work on geometric data, it is 2
  n = 2
  # Number of iterations, same as bits
  p = bits
  N = 2 << (p - 1)

  # Convert the Hilbert Value back to Hilbert Index
  # First convert Hilbert Value to its binary representation, length should be n * p
  hilbert_binary = bin(hilb_value)[2:].zfill(n * p)
  # The Hilbert Index is formed by interleaving bits among the coordinates
  # Undo this interleaving step and get a list of binaries for the coordiantes (length should be n)
  # Initialize X_binary with n empty strings
  X_binary = [""] * n
  # We start with left most bit from first coordinate
  for i in range (len(hilbert_binary)):
    X_binary[i % n] += hilbert_binary[i]
  # Convert back to transposed Hilbert Indexes in integer form
  X = [int(x, 2) for x in X_binary]

  # Gray decode by H ^ (H/2)
  t = X[n - 1] >> 1
  # Range decrement from n-1 to > 0 (not >= 0 as written in original paper)
  # Error in original paper that "the X array is accessed using a negative index when the variable i becomes zero"
  # Reference of change: https://stackoverflow.com/questions/499166/mapping-n-dimensional-value-to-a-point-on-hilbert-curve
  for i in range(n-1, 0, -1):
    # XOR coordinates with previous one
    X[i] ^= X[i - 1]
  # Final XOR
  X[0] ^= t

  # Undo excess work
  Q = 2
  while Q != N:
    P = Q - 1
    # Range decrement from n-1 to >= 0
    for i in range(n-1, -1, -1):
      if X[i] & Q:
        # Invert Step
        X[0] ^= P
      else:
        # Exchange Step
        t = (X[0] ^ X[i]) & P
        X[0] ^= t
        X[i] ^= t
    Q <<= 1

  # Return the transformed Coordinates
  return X

In [227]:
# Test correctness of our implementation
print("Converting back the high point:", Get_Coordinates(807774776, 16))
print("Converting back the low point:", Get_Coordinates(852204314, 16))

# For normalized high point: [14346, 31502] , its corresponding Hilbert Value is: 807774776
# For normalized low point: [10819, 25367] , its corresponding Hilbert Value is: 852204314

Converting back the high point: [14346, 31502]
Converting back the low point: [10819, 25367]


In [228]:
# Retrieve Hilbert Value from the Dataframe
hilb_values = new_geo_df["hilb_value"]

convertion_start = timeit.default_timer()
points = []
# Convert the Hilbert Values to Points using our implemented function
for hilb_value in hilb_values:
  points.append(Get_Coordinates(hilb_value, 16))

latitude_list = []
longitude_list = []
# Loop through the points list
for point in points:
  # Denormalize point
  point = denormalize(point)
  # Append the coordinates to lists
  latitude_list.append(point[0])
  longitude_list.append(point[1])
convertion_end = timeit.default_timer()
# Get time for converting data back from 2D to 3D
convertion_time = convertion_end - convertion_start

# Add the lists as columns to the Dataframe
new_geo_df["latitude"] = latitude_list
new_geo_df["longitude"] = longitude_list

In [229]:
new_geo_df.head()

Unnamed: 0,twitter_id,hilb_value,timestamp,latitude,longitude
0,888685908450353152,833258361,1500714206217,-6.92,107.77
1,888685913240293377,833002040,1500714207359,-7.98,112.62
2,888685915018809345,831409330,1500714207783,6.49,124.85
3,888685889349705729,821705912,1500714201663,35.3,138.94
4,888685903706640384,823427275,1500714205086,31.47,130.98


#### Validation based on convert latitude/longitude, final observation

In [230]:
# Loop through Dataframe and check if:
# Latitude in range: [18.198, 53.459]
# Longitude in range: [73.675, 135.026]
# Timestamp in range: [1500714201569, 1500715081002]
# Only keep the valid points/rows
valid_rows = []
validation_start = timeit.default_timer()
for index, row in new_geo_df.iterrows():
  if row["latitude"] >= 18.198 and row["latitude"] <= 53.459:
    if row["longitude"] >= 73.675 and row["longitude"] <= 135.026:
      if row["timestamp"] >= 1500714201569 and row["timestamp"] <= 1500715081002:
        valid_rows.append(row)
validation_end = timeit.default_timer()
# Get time for validation
validation_time = validation_end - validation_start

# Create a new GeoDataframe with these validpoints
new_final_result_df = gpd.GeoDataFrame(valid_rows, columns=["twitter_id", "hilb_value", "timestamp", "latitude", "longitude"])

In [231]:
new_final_result_df.shape

(1284, 5)

In [232]:
# We can run another evaluation step
# See how many points in the new Dataframe is included in old Dataframe

# List of twitter ids from old Dataframe
old_ids = final_result_df["twitter_id"].tolist()
# List of twitter ids from new Dataframe
new_ids = new_final_result_df["twitter_id"].tolist()

count = 0
for id in new_ids:
  if id in old_ids:
    count += 1

print(count)

1284


From the validation, we have removed around 87% spurious points that does not belong in the range query (false positive). \
We can see all the remaining points are indeed included in the original dataframe result, but there are a few of them missing, and the missing fraction is about: 23%.

In [233]:
# Running time
print(f"Time used to query the spatiotemporal data: {query_time} seconds")
print(f"Time used to convert the 3D data back to 2D data: {convertion_time} seconds")
print(f"Time used to validate the queried result: {validation_time} seconds")
print(f"Total time needed to query through spatio-temporal data with the new method is: {query_time + convertion_time + validation_time} seconds")

Time used to query the spatiotemporal data: 0.01456455100014864 seconds
Time used to convert the 3D data back to 2D data: 0.5266086520005047 seconds
Time used to validate the queried result: 1.075355649998528 seconds
Total time needed to query through spatio-temporal data with the new method is: 1.6165288529991813 seconds


We can see the query time is significant shorter, but there is additional overhead when trying to validate the data, and cause the overful query time much longer. \
Maybe a more careful/better designed algorithm may reduce this overhead.