# Install Java

In [73]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [74]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark

In [75]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

# Set Environment Variables

In [76]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = "/content/spark-3.5.3-bin-hadoop3/python"

# Install PySpark

In [77]:
!pip install findspark
import findspark
findspark.init()



# Install Apache Sedona

In [78]:
!pip install apache-sedona[spark]



# Start Sedona

In [79]:
from sedona.spark import SedonaContext
from pyspark.sql import SparkSession
from sedona.core.geom.envelope import Envelope
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.utils.adapter import Adapter
import geopandas as gpd
import time
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter, IndexType, GridType

config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.6.1,'
           'org.datasyslab:geotools-wrapper:1.6.1-28.2'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
sedona = SedonaContext.create(config)

# Example Reading Data
Using earthquake data from https://www.kaggle.com/datasets/warcoder/earthquake-dataset?select=earthquake_data.csv

In [80]:
# SQL approach
earthquakes = sedona.read.option("delimiter", ",").option("header", "true").csv("/content/drive/MyDrive/Colab Notebooks/CS236_FinalProject/earthquake_data.csv")
earthquakes.createOrReplaceTempView("earthquake")
earthquakes_info = sedona.sql(
      "SELECT latitude, longitude, date_time from earthquake"
)
earthquakes_info.show(5)

+--------+---------+----------------+
|latitude|longitude|       date_time|
+--------+---------+----------------+
| -9.7963|  159.596|22-11-2022 02:03|
| -4.9559|  100.738|18-11-2022 13:37|
|-20.0508| -178.346|12-11-2022 07:09|
|-19.2918| -172.129|11-11-2022 10:48|
|-25.5948|  178.278|09-11-2022 10:14|
+--------+---------+----------------+
only showing top 5 rows



In [81]:
# RDD (Resilient Distributed Database) approach
# more information here: https://sedona.apache.org/1.5.1/tutorial/rdd/
# First modify the file to be longitude, latitude, time format
df = sedona.read.csv('/content/drive/MyDrive/Colab Notebooks/CS236_FinalProject/earthquake_data.csv', header=True, inferSchema=True)
df = df[["latitude", "longitude", "date_time"]]
display(df)
df.write.csv("output", header=False, mode="overwrite")
df.show(5)
# the output is a directory, with part 1 file that contains the data

DataFrame[latitude: double, longitude: double, date_time: string]

+--------+---------+----------------+
|latitude|longitude|       date_time|
+--------+---------+----------------+
| -9.7963|  159.596|22-11-2022 02:03|
| -4.9559|  100.738|18-11-2022 13:37|
|-20.0508| -178.346|12-11-2022 07:09|
|-19.2918| -172.129|11-11-2022 10:48|
|-25.5948|  178.278|09-11-2022 10:14|
+--------+---------+----------------+
only showing top 5 rows



In [82]:
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter

data = PointRDD(sedona.sparkContext, '/content/output/part-00000-*.csv', 0, FileDataSplitter.CSV, True)
#before any partitioning the data is stored in rawSpatialRDD
all_records = data.rawSpatialRDD.collect()
print(type(all_records))

<class 'list'>


In [83]:
all_records[0:5]

[Geometry: Point userData: 22-11-2022 02:03,
 Geometry: Point userData: 18-11-2022 13:37,
 Geometry: Point userData: 12-11-2022 07:09,
 Geometry: Point userData: 11-11-2022 10:48,
 Geometry: Point userData: 09-11-2022 10:14]

# Twitter Project First Milestone

In [84]:
# 1.2 READ THE INPUT DATA

from sedona.spark import *
from pyspark.sql import functions as F
import json

# JSON file into spark dataframe
tweets_df = sedona.read.json('/content/drive/MyDrive/Colab Notebooks/CS236_FinalProject/2017-07-22_09-02-53.txt')

# if geo.coordinates is null, use center of bounding box instead (lat, long)
bounding_box_center = F.expr("""
    array(
        (place.bounding_box.coordinates[0][0][1] + place.bounding_box.coordinates[0][2][1]) / 2,
        (place.bounding_box.coordinates[0][0][0] + place.bounding_box.coordinates[0][2][0]) / 2
    )
""")

tweets_df = tweets_df.withColumn(
    "latitude",
    F.when(
        F.col("geo.coordinates").isNotNull(),
        F.col("geo.coordinates")[0]
    ).otherwise(bounding_box_center[0])
).withColumn(
    "longitude",
    F.when(
        F.col("geo.coordinates").isNotNull(),
        F.col("geo.coordinates")[1]
    ).otherwise(bounding_box_center[1])
)

# extract latitude, longitude, and timestamp_ms
tweets_df = tweets_df.select("latitude", "longitude", "timestamp_ms")

# remove rows where either latitude, longitude, or timestamp_ms is NULL
# important to remove such that the limit rows are removed
tweets_df = tweets_df.filter(
    F.col("latitude").isNotNull() & F.col("longitude").isNotNull() & F.col("timestamp_ms").isNotNull()
)

display(tweets_df)
display(tweets_df.count())
# display(type(tweets_df))

# first, merge all parts into one part, and then write to CSV
tweets_df.coalesce(1).write.csv("output", header=False, mode="overwrite")

# show first 5 records
tweets_df.show(5)

DataFrame[latitude: double, longitude: double, timestamp_ms: string]

98321

+------------------+-----------+-------------+
|          latitude|  longitude| timestamp_ms|
+------------------+-----------+-------------+
|        53.4569525|   -2.23348|1500714173451|
|         28.377247| 129.493744|1500714173496|
|            10.496|   -66.8983|1500714173590|
|22.869936000000003|113.4197245|1500714172528|
|         35.701657|139.7091805|1500714173418|
+------------------+-----------+-------------+
only showing top 5 rows



In [85]:
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter

data = PointRDD(sedona.sparkContext, '/content/output/part-00000*.csv', 0, FileDataSplitter.CSV, True)
#before any partitioning the data is stored in rawSpatialRDD
all_records = data.rawSpatialRDD.collect()
print(type(all_records))
all_records[0:5]

<class 'list'>


[Geometry: Point userData: 1500714173451,
 Geometry: Point userData: 1500714173496,
 Geometry: Point userData: 1500714173590,
 Geometry: Point userData: 1500714172528,
 Geometry: Point userData: 1500714173418]

In [86]:
# 1.3 PARTITION AND INDEX THE DATA

from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import GridType, IndexType
from sedona.core.spatialOperator import JoinQueryRaw

data.analyze()

# partition using KDB-Tree
data.spatialPartitioning(GridType.KDBTREE)

# build R-Tree index
build_on_spatial_partitioned_rdd = False
data.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

print("Number of partitions:", data.spatialPartitionedRDD.getNumPartitions())
print("Example record from first partition:", data.rawSpatialRDD.take(1))

Number of partitions: 3
Example record from first partition: [Geometry: Point userData: 1500714173451]


In [87]:
# 1.4 Run Range query with time filter on the data

import time
from sedona.core.spatialOperator import RangeQuery
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.core.geom.envelope import Envelope
from sedona.utils.adapter import Adapter
import geopandas as gpd

def range_query(xmin, ymin, xmax, ymax, t1, t2):
  print(f"xmin: {xmin}, ymin: {ymin}, xmax: {xmax}, ymax: {ymax}, t1: {t1}, t2: {t2}")

  # spatial query window
  spatial_query_window = Envelope(xmin, xmax, ymin, ymax)
  consider_boundary_intersection = False  ## Only return geometries fully covered by the window
  using_index = True

  start_time = time.time()

  # spatial query
  filtered_tweets_rdd = RangeQuery.SpatialRangeQuery(data, spatial_query_window, consider_boundary_intersection, using_index)
  # display(filtered_tweets_rdd)

  res = gpd.GeoDataFrame(
      filtered_tweets_rdd.map(lambda x: [x.geom, x.userData]).collect(),
      columns=["geom", "user_data"],
      geometry="geom"
  )
  res = res[res['user_data'].astype(int).between(t1, t2)] # within timeframe
  display(res[:])
  display(len(res))

  end_time = time.time()
  exec_time = end_time - start_time
  print(f"Execution time: {exec_time} seconds\n\n")

# RUN 3 EXAMPLES OF THE RANGE QUERY AND RECORD OUTPUT DATAFRAME AND EXECUTION TIME

# parameters for bounding rectangle (lat, long) and time frame
# Example 1
xmin, ymin, xmax, ymax = -50, -30, 50, 30
t1, t2 = 1500715000000, 1500716000000
range_query(xmin, ymin, xmax, ymax, t1, t2)

# Example 2
xmin, ymin, xmax, ymax = -10, -70, 30, 70
t1, t2 = 1500714000000, 1500714500000
range_query(xmin, ymin, xmax, ymax, t1, t2)

# Example 3
xmin, ymin, xmax, ymax = 60, 110, 80, 150
t1, t2 = 1500714500000, 1500715500000
range_query(xmin, ymin, xmax, ymax, t1, t2)

# Example Testing
xmin, ymin, xmax, ymax = -90, -180, 90, 180
t1, t2 = 0, 9500715500000
range_query(xmin, ymin, xmax, ymax, t1, t2)

xmin: -50, ymin: -30, xmax: 50, ymax: 30, t1: 1500715000000, t2: 1500716000000


Unnamed: 0,geom,user_data
2,POINT (49.21261 -2.1325),1500715527803
5,POINT (49.18759 -2.1006),1500715499666
8,POINT (48.4035 -4.94533),1500715082252
11,POINT (44.6667 -1.16667),1500715207741
14,POINT (44.8649 -1.13544),1500715442818
...,...,...
17606,POINT (41.86298 27.78297),1500715812746
17610,POINT (40.63598 29.98002),1500715683017
17611,POINT (40.76668 29.98815),1500715633597
17614,POINT (41.03064 29.22418),1500715688245


6426

Execution time: 2.362489700317383 seconds


xmin: -10, ymin: -70, xmax: 30, ymax: 70, t1: 1500714000000, t2: 1500714500000


Unnamed: 0,geom,user_data
0,POINT (18.45969 -69.94851),1500714375336
4,POINT (28.50891 -16.33779),1500714455903
8,POINT (28.13884 -16.72976),1500714334854
9,POINT (28.13884 -16.72976),1500714492404
24,POINT (25.37481 56.14553),1500714355535
...,...,...
3431,POINT (29.31885 48.03267),1500714195233
3432,POINT (29.31885 48.03267),1500714331503
3440,POINT (29.27021 48.03362),1500714409556
3441,POINT (29.27021 48.03373),1500714427778


796

Execution time: 1.939476728439331 seconds


xmin: 60, ymin: 110, xmax: 80, ymax: 150, t1: 1500714500000, t2: 1500715500000


Unnamed: 0,geom,user_data
1,POINT (63.285 118.366),1500714632965
2,POINT (63.285 118.366),1500714719585
3,POINT (63.285 118.366),1500715089979
11,POINT (62.066 129.731),1500714535066
12,POINT (62.066 129.731),1500714764128
13,POINT (62.066 129.731),1500714775659
14,POINT (62.066 129.731),1500714788402
15,POINT (62.066 129.731),1500714794424
16,POINT (62.066 129.731),1500714860852
17,POINT (62.066 129.731),1500714861713


27

Execution time: 1.474959135055542 seconds


xmin: -90, ymin: -180, xmax: 90, ymax: 180, t1: 0, t2: 9500715500000


Unnamed: 0,geom,user_data
0,POINT (37.781 -122.399),1500715127868
1,POINT (37.781 -122.399),1500715353312
2,POINT (37.781 -122.399),1500715477621
3,POINT (37.617 -122.385),1500714554131
4,POINT (37.621 -122.379),1500714902402
...,...,...
98316,POINT (-40.935 175.853),1500716231827
98317,POINT (-38.801 176.108),1500716331272
98318,POINT (-37.698 176.245),1500715658597
98319,POINT (-37.698 176.245),1500716896803


98321

Execution time: 7.607287168502808 seconds




# Twitter Project Second Milestone

In [88]:
# read data again

# JSON file into spark dataframe
tweets_df = sedona.read.json('/content/drive/MyDrive/Colab Notebooks/CS236_FinalProject/2017-07-22_09-02-53.txt')

# if geo.coordinates is null, use center of bounding box instead (lat, long)
bounding_box_center = F.expr("""
    array(
        (place.bounding_box.coordinates[0][0][1] + place.bounding_box.coordinates[0][2][1]) / 2,
        (place.bounding_box.coordinates[0][0][0] + place.bounding_box.coordinates[0][2][0]) / 2
    )
""")

tweets_df = tweets_df.withColumn(
    "latitude",
    F.when(
        F.col("geo.coordinates").isNotNull(),
        F.col("geo.coordinates")[0]
    ).otherwise(bounding_box_center[0])
).withColumn(
    "longitude",
    F.when(
        F.col("geo.coordinates").isNotNull(),
        F.col("geo.coordinates")[1]
    ).otherwise(bounding_box_center[1])
)

# extract latitude, longitude, and timestamp_ms
tweets_df = tweets_df.select("latitude", "longitude", "timestamp_ms")

# remove rows where either latitude, longitude, or timestamp_ms is NULL
# important to remove such that the limit rows are removed
tweets_df = tweets_df.filter(
    F.col("latitude").isNotNull() & F.col("longitude").isNotNull() & F.col("timestamp_ms").isNotNull()
)

display(tweets_df)
display(tweets_df.count())
# display(type(tweets_df))

# show first 5 records
tweets_df.show(5)

DataFrame[latitude: double, longitude: double, timestamp_ms: string]

98321

+------------------+-----------+-------------+
|          latitude|  longitude| timestamp_ms|
+------------------+-----------+-------------+
|        53.4569525|   -2.23348|1500714173451|
|         28.377247| 129.493744|1500714173496|
|            10.496|   -66.8983|1500714173590|
|22.869936000000003|113.4197245|1500714172528|
|         35.701657|139.7091805|1500714173418|
+------------------+-----------+-------------+
only showing top 5 rows



In [89]:
# 2.1 Obtain Hilbert Numbers

def compute_hilbert(lat, lon):
    return (lat+90) * 361 + (lon + 180)

hilbert_udf = F.udf(lambda lat, lon: compute_hilbert(lat, lon))

# add hilbert index to DataFrame
tweets_df = tweets_df.withColumn("hilbert_index", hilbert_udf(F.col("latitude"), F.col("longitude")))

temp_df = tweets_df.select(
    F.col("hilbert_index").cast("double").alias("hilbert_index"),
    F.col("timestamp_ms").cast("double").alias("timestamp_ms"),
    F.col("latitude").cast("string").alias("latitude"),
    F.col("longitude").cast("string").alias("longitude"),
)

display(temp_df)
temp_df.show(5)

# first, merge all parts into one part, and then write to CSV
temp_df.coalesce(1).write.csv("hilbert_output", header=False, mode="overwrite")


DataFrame[hilbert_index: double, timestamp_ms: double, latitude: string, longitude: string]

+------------------+-----------------+------------------+-----------+
|     hilbert_index|     timestamp_ms|          latitude|  longitude|
+------------------+-----------------+------------------+-----------+
|51965.726372499994|1.500714173451E12|        53.4569525|   -2.23348|
|      43043.679911|1.500714173496E12|         28.377247| 129.493744|
|36392.157699999996| 1.50071417359E12|            10.496|   -66.8983|
|     41039.4666205|1.500714172528E12|22.869936000000003|113.4197245|
|     45698.0073575|1.500714173418E12|         35.701657|139.7091805|
+------------------+-----------------+------------------+-----------+
only showing top 5 rows



In [90]:
data = PointRDD(sedona.sparkContext,'/content/hilbert_output/part-00000*.csv', 0, FileDataSplitter.CSV,True)
#before any partitioning the data is stored in rawSpatialRDD
all_records = data.rawSpatialRDD.collect()
print(type(all_records))
print(all_records[0:5])

<class 'list'>
[Geometry: Point userData: 53.4569525	-2.23348, Geometry: Point userData: 28.377247	129.493744, Geometry: Point userData: 10.496	-66.8983, Geometry: Point userData: 22.869936000000003	113.4197245, Geometry: Point userData: 35.701657	139.7091805]


In [91]:
# 2.2 Partition and Index the data

data.analyze()
data.spatialPartitioning(GridType.KDBTREE)
build_on_spatial_partitioned_rdd = False
data.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

In [105]:
# 2.3 Run Range query with time filter on the data

def compute_lat_lon(hilVal):
    lat = hilVal // 361 - 90
    lon = (hilVal % 361) - 180

    return lat, lon


def hilbert_range_query(x1, y1, x2, y2, t1, t2):
    # Filter step 1: enlarge query to grid boundaries
    # Filter step 2: determine hilbert values
    u1 = compute_hilbert(x1, y1)
    u2 = compute_hilbert(x2, y2)
    # print(u1, u2)
    # print(compute_lat_lon(u1), compute_lat_lon(u2))

    # spatial query with hilbert values and time ranges
    spatial_query_window = Envelope(min(u1, u2), max(u1, u2), t1, t2)
    consider_boundary_intersection = False
    using_index = True

    start_time = time.time()
    filtered_rdd = RangeQuery.SpatialRangeQuery(data, spatial_query_window, consider_boundary_intersection, using_index)

    res = gpd.GeoDataFrame(
      filtered_rdd.map(lambda x: [x.geom, x.userData]).collect(),
      columns=["geom", "user_data"],
      geometry="geom"
    )

    res[['latitude', 'longitude']] = res['user_data'].str.split('\t', expand=True).astype(float)
    res['timestamp_ms'] = res['geom'].y


    def lat_lon_filter(row):
      lat, lon = row['user_data'].split('\t')
      # print(x1, lat, x2, y1, lon, y2)
      # print(lat, lon)
      # print(x1 <= float(lat) <= x2 and y1 <= float(lon) <= y2)
      return x1 <= row['latitude'] <= x2 and y1 <= row['longitude'] <= y2

    res_filtered = res[res.apply(lat_lon_filter, axis=1)][["latitude", "longitude", "timestamp_ms"]]


    display(res_filtered[:])
    display(f"Number of Rows: {len(res_filtered)}")
    print(f"Fraction of Spurious Points: {len(res_filtered) / len(res)}")
    # display(res["geom"].x[0])
    # display(compute_lat_lon(res["geom"].x[0]))
    # display(compute_lat_lon(res["geom"].x))

    end_time = time.time()
    exec_time = end_time - start_time
    print(f"Execution time: {exec_time} seconds\n\n")

    return res_filtered

# RUN 3 EXAMPLES OF THE RANGE QUERY AND RECORD OUTPUT DATAFRAME AND EXECUTION TIME

# parameters for bounding rectangle (lat, long) and time frame
# Example 1
xmin, ymin, xmax, ymax = -50, -30, 50, 30
t1, t2 = 1500715000000, 1500716000000
hilbert_range_query(xmin, ymin, xmax, ymax, t1, t2)

# Example 2
xmin, ymin, xmax, ymax = -10, -70, 30, 70
t1, t2 = 1500714000000, 1500714500000
hilbert_range_query(xmin, ymin, xmax, ymax, t1, t2)

# Example 3
xmin, ymin, xmax, ymax = 60, 110, 80, 150
t1, t2 = 1500714500000, 1500715500000
hilbert_range_query(xmin, ymin, xmax, ymax, t1, t2)


# Example Testing
xmin, ymin, xmax, ymax = -10, -70, 30, 70
t1, t2 = 1500714000000, 1500714500000
# res = hilbert_range_query(xmin, ymin, xmax, ymax, t1, t2)
#print(len(res))

Unnamed: 0,latitude,longitude,timestamp_ms
3,43.999523,12.645314,1.500715e+12
4,45.188524,6.099764,1.500715e+12
7,42.136529,-0.427346,1.500715e+12
8,43.136640,5.933468,1.500715e+12
9,41.991135,-8.605562,1.500715e+12
...,...,...,...
29672,5.623198,-0.184293,1.500716e+12
29674,4.824630,7.031275,1.500716e+12
29675,5.623198,-0.184293,1.500716e+12
29702,-15.306759,29.095333,1.500716e+12


'Number of Rows: 6426'

Fraction of Spurious Points: 0.21574618096357226
Execution time: 3.3612942695617676 seconds




Unnamed: 0,latitude,longitude,timestamp_ms
0,10.496000,-66.898300,1.500714e+12
1,10.496000,-66.898300,1.500714e+12
2,10.496000,-66.898300,1.500714e+12
3,10.496000,-66.898300,1.500714e+12
4,10.496000,-66.898300,1.500714e+12
...,...,...,...
3358,26.353592,49.994546,1.500714e+12
3359,27.238464,31.643874,1.500714e+12
3362,29.000369,29.273024,1.500714e+12
3363,29.000369,29.273024,1.500714e+12


'Number of Rows: 796'

Fraction of Spurious Points: 0.23620178041543027
Execution time: 1.1124653816223145 seconds




Unnamed: 0,latitude,longitude,timestamp_ms
4,62.066246,129.730657,1500715000000.0
11,66.304051,134.190403,1500715000000.0
23,63.284818,118.365972,1500715000000.0
40,63.284818,118.365972,1500715000000.0
44,62.066246,129.730657,1500715000000.0
47,62.066246,129.730657,1500715000000.0
49,62.066246,129.730657,1500715000000.0
52,62.066246,129.730657,1500715000000.0
72,62.066246,129.730657,1500715000000.0
73,62.066246,129.730657,1500715000000.0


'Number of Rows: 27'

Fraction of Spurious Points: 0.14835164835164835
Execution time: 0.9520924091339111 seconds


