# CS 236 Database Management Project Final Submission

## By: Steven Ryan Leonido (SID:[Taken out for public GitHub])

Content after Milestone 2 is under sections 2.2 and 2.3

## 1.1 Installing Apache Sedona and Dependencies

## Installing Drivers for Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = "/content/spark-3.5.3-bin-hadoop3/python"

In [None]:
!pip install findspark
import findspark
findspark.init()



In [None]:
!pip install apache-sedona[spark] # installed all dependencies to run Apache Sedona on Google Colab. Followed example from TA



## 1.2 Creating Apache Sedona and Reading in Data

In [None]:
#  Creating Apache Sedona Instance
from sedona.spark import *
config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.6.1,'
           'org.datasyslab:geotools-wrapper:1.6.1-28.2'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
sedona = SedonaContext.create(config)

In [None]:
# Importing helper libraries to read and manipulate data
import json
import pandas as pd
import numpy as np

# Creating Helper Variables
cols = ['longitude', 'latitude', 'created_at'] # Created for when creating pandas dataframe
data = [] # Where data will be stored
file_name = '2017-07-22_09-02-53.json' # Changed sample data from .txt to .json to read with json library

# Open file, read line by line, pull coordinates for longitude and latitude as well as created_at data.
# Was unable to implement ability to take bounding box data and find center. Opted to skip data point.
num = 0
with open(file_name, encoding='latin-1') as f:
    for line in f:
        doc = json.loads(line)
        if doc is None: # Skip if decoding failed
            continue
        if 'coordinates' in doc and 'created_at' in doc:
            if doc['coordinates'] is None:
                continue
            coordinates = doc['coordinates']['coordinates']
            longitude, latitude = coordinates
            lst = [longitude, latitude, doc['created_at']]
            data.append(lst)

# Create pandas data frame from list
df = pd.DataFrame(data=data, columns=cols)
print(df[:5])

# Put pandas dataframe into csv file to be read by Apache Sedona
df.to_csv('data.csv', index=False, header=False)

    longitude   latitude                      created_at
0  129.493744  28.377247  Sat Jul 22 09:02:53 +0000 2017
1  -66.898300  10.496000  Sat Jul 22 09:02:53 +0000 2017
2  129.493744  28.377247  Sat Jul 22 09:02:53 +0000 2017
3  149.693333 -34.859167  Sat Jul 22 09:02:53 +0000 2017
4 -121.944620  37.325510  Sat Jul 22 09:02:53 +0000 2017


## 1.3.1 Import Data Into Apache Sedona Via SpatialRDD to Put Into Worker Nodes

In [None]:
# Imported PointRDD and FileDataSplitter to turn csv to SpatialRDD data. Followed TA example.
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter

RDDdata = PointRDD(sedona.sparkContext, 'data.csv', 0, FileDataSplitter.CSV, True)
#before any partitioning the data is stored in rawSpatialRDD
all_records = RDDdata.rawSpatialRDD.collect()
all_records[0:5]

[Geometry: Point userData: Sat Jul 22 09:02:53 +0000 2017,
 Geometry: Point userData: Sat Jul 22 09:02:53 +0000 2017,
 Geometry: Point userData: Sat Jul 22 09:02:53 +0000 2017,
 Geometry: Point userData: Sat Jul 22 09:02:53 +0000 2017,
 Geometry: Point userData: Sat Jul 22 09:02:53 +0000 2017]

## 1.3.2 and 1.4 Build R-Tree Index and Complete Range Queries

In [None]:
#  Used Apache Sedona API Docs and followed examples to build RTree Index.
from sedona.core.geom.envelope import Envelope
from sedona.core.enums import IndexType
from sedona.core.spatialOperator import RangeQuery
import time

consider_boundary_intersection = False ## Only return gemeotries fully covered by the window

build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
RDDdata.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

# Create variables for range query bounding box
using_index = True
range_query_window = Envelope(-90.01, 80.01, 30.01, 40.01)

# Using time library to determine how long it took to run query.
start_time = time.time()

query_result = RangeQuery.SpatialRangeQuery(
    RDDdata,
    range_query_window,
    consider_boundary_intersection,
    using_index
)

end_time = time.time()

# Time:
elapsed_time = end_time - start_time

print(f"Function took {elapsed_time} seconds to execute")
# Function took 0.11197066307067871 seconds to execute

# Using geopandas libary to check output of data to make sure it is not empty. Followed from Apache Sedona API docs.
import geopandas as gpd
gpd.GeoDataFrame(
    query_result.map(lambda x: [x.geom, x.userData]).collect(),
    columns=["geom", "user_data"],
    geometry="geom"
)

Function took 0.00990605354309082 seconds to execute


Unnamed: 0,geom,user_data
0,POINT (10.54473 36.37491),Sat Jul 22 09:05:10 +0000 2017
1,POINT (1.42818 38.70348),Sat Jul 22 09:20:02 +0000 2017
2,POINT (1.42829 38.70567),Sat Jul 22 09:23:37 +0000 2017
3,POINT (1.40545 38.88533),Sat Jul 22 09:04:04 +0000 2017
4,POINT (1.43333 38.9),Sat Jul 22 09:18:03 +0000 2017
...,...,...
1481,POINT (32.75178 39.98828),Sat Jul 22 09:37:03 +0000 2017
1482,POINT (41.25526 39.90741),Sat Jul 22 09:28:55 +0000 2017
1483,POINT (41.24727 39.91511),Sat Jul 22 09:27:22 +0000 2017
1484,POINT (77.55 34.7833),Sat Jul 22 09:34:49 +0000 2017


In [None]:
# Second query with different parameters
range_query_window = Envelope(-100.0, 0.0, 0.01, 30.01)

start_time = time.time()

query_result = RangeQuery.SpatialRangeQuery(
    RDDdata,
    range_query_window,
    consider_boundary_intersection,
    using_index
)

end_time = time.time()

elapsed_time = end_time - start_time

print(f"Function took {elapsed_time} seconds to execute")
# Function took 0.0261385440826416 seconds to execute

import geopandas as gpd
gpd.GeoDataFrame(
    query_result.map(lambda x: [x.geom, x.userData]).collect(),
    columns=["geom", "user_data"],
    geometry="geom"
)

Function took 0.010827779769897461 seconds to execute


Unnamed: 0,geom,user_data
0,POINT (-99.8489 16.84594),Sat Jul 22 09:18:50 +0000 2017
1,POINT (-84.05712 9.93384),Sat Jul 22 09:13:47 +0000 2017
2,POINT (-84.19607 9.95948),Sat Jul 22 09:25:01 +0000 2017
3,POINT (-84.08994 9.98007),Sat Jul 22 09:16:28 +0000 2017
4,POINT (-84.08832 9.98212),Sat Jul 22 09:16:40 +0000 2017
...,...,...
401,POINT (-90.16245 29.99751),Sat Jul 22 09:30:46 +0000 2017
402,POINT (-16.60556 28.26861),Sat Jul 22 09:35:26 +0000 2017
403,POINT (-16.60556 28.26861),Sat Jul 22 09:48:46 +0000 2017
404,POINT (-13.55461 28.95762),Sat Jul 22 09:44:02 +0000 2017


In [None]:
# Third query with different parameters
range_query_window = Envelope(-100.0, 200.0, 0.01, 100.01)

start_time = time.time()

query_result = RangeQuery.SpatialRangeQuery(
    RDDdata,
    range_query_window,
    consider_boundary_intersection,
    using_index
)

end_time = time.time()

elapsed_time = end_time - start_time

print(f"Function took {elapsed_time} seconds to execute")
# Function took 0.04613089561462402 seconds to execute

import geopandas as gpd
gpd.GeoDataFrame(
    query_result.map(lambda x: [x.geom, x.userData]).collect(),
    columns=["geom", "user_data"],
    geometry="geom"
)

Function took 0.0097198486328125 seconds to execute


Unnamed: 0,geom,user_data
0,POINT (73.4879 4.31193),Sat Jul 22 09:06:06 +0000 2017
1,POINT (76.27108 10.85052),Sat Jul 22 09:11:37 +0000 2017
2,POINT (77.45391 11.45946),Sat Jul 22 09:09:28 +0000 2017
3,POINT (98.66334 3.5826),Sat Jul 22 09:05:46 +0000 2017
4,POINT (98.67162 3.58317),Sat Jul 22 09:11:09 +0000 2017
...,...,...
15528,POINT (-21.9 64.14),Sat Jul 22 09:33:11 +0000 2017
15529,POINT (-51.75 64.1833),Sat Jul 22 09:38:03 +0000 2017
15530,POINT (-1.48889 55.0425),Sat Jul 22 09:30:04 +0000 2017
15531,POINT (-2.33601 56.79431),Sat Jul 22 09:38:45 +0000 2017


## Project Milestone 2 Section
2.1 Obtain Hilbert Numbers

In [None]:
# define a grid to use for hilbert
# Needs to be 0 -> Max value that is a power of 2
# Grid must be integer values
# Latitude will be + 90
# Longitude will be + 180
# Lat max = 180, Long max = 360
# Will just turn float to int

from datetime import datetime

def hilbert_curve(point):
    # Extract the latitude and longitude from the Point object
    lat = point.geom.x
    lon = point.geom.y

    # Standardize Data: Make so all positive values. Then get rid of decimal
    normLat = lat + 90
    normLon = lon + 180
    # Convert to integer values
    intLat = int(normLat)
    intLon = int(normLon)

    # Compute the hilbert value
    # Going to need to involve getting bit manipulation
    # Iterate by the number of bits in a coordinate
    # Take corresponding bit value, put together and add onto final bit value
    hilbert = 0
    place = 1

    while (intLat > 0 or intLon > 0):
        # Extract the least significant bit of each number
        bit1 = intLat & 1
        bit2 = intLon & 1

        # Combine the bits
        if bit1 == 1 and bit2 == 0:
            combined = 2  # Binary 10
        elif bit1 == 0 and bit2 == 1:
            combined = 1  # Binary 10
        elif bit1 == 1 and bit2 == 1:
            combined = 3  # Binary 11
        else:
            combined = 0  # Binary 00

        # Add the combined result to the final number
        hilbert += combined * place
        place *= 4  # Move to the next "digit" (2 bits at a time)

        # Shift the input numbers to process the next bits
        intLat >>= 1
        intLon >>= 1

    timestamp = point.getUserData()  # Adjust if necessary
    try:
        # Attempt to parse with the original format
        newtimestamp = int(datetime.strptime(timestamp, "%a %b %d %H:%M:%S %z %Y").timestamp())
    except ValueError:
        # If the original format fails, try an alternative format
        newtimestamp = int(datetime.strptime(timestamp, "%a %b %d %H:%M:%S %Z %Y").timestamp())

    return str(hilbert) + ',' + str(newtimestamp)

# Map the PointRDD to a new RDD with the hilbert value and timestamps
updated_data = RDDdata.rawSpatialRDD.map(hilbert_curve)

# Store the updated data in new file
output_path = '/content/updated_data.csv'
if os.path.exists(output_path):
    !rm -rf {output_path}  # Remove the directory and its contents

updated_data.saveAsTextFile(output_path)

# Follow similar steps in the first milestone to read the new updated data
all_records = updated_data.collect()
print(all_records[0:5])
print(len(all_records))

['62346,1500714173', '18302,1500714173', '62346,1500714173', '59819,1500714173', '63811,1500714173']
18722


## 2.2 Partion the Data Set and Build R-Tree Index

In [None]:
NewRDDdata = PointRDD(sedona.sparkContext, 'updated_data.csv', 0, FileDataSplitter.CSV, True)
#before any partitioning the data is stored in rawSpatialRDD
all_records = NewRDDdata.rawSpatialRDD.collect()
all_records[0:5]

consider_boundary_intersection = False ## Only return gemeotries fully covered by the window

build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
NewRDDdata.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)

## 2.3 Run Range Query with Time Filter On The Data



In [None]:
def PointToHilbert (lon, lat):
  normLat = lat + 90
  normLon = lon + 180
  # Convert to integer values
  intLat = int(normLat)
  intLon = int(normLon)

  hilbert = 0
  place = 1

  # Compute the hilbert value
  # Going to need to involve getting bit manipulation
  # Iterate by the number of bits in a coordinate
  # Take corresponding bit value, put together and add onto final bit value
  while (intLat > 0 or intLon > 0):
      # Extract the least significant bit of each number
      # print(intLat, intLon)
      bit1 = intLat & 1
      bit2 = intLon & 1

      # Combine the bits
      if bit1 == 1 and bit2 == 0:
          combined = 2  # Binary 10
      elif bit1 == 0 and bit2 == 1:
          combined = 1  # Binary 10
      elif bit1 == 1 and bit2 == 1:
          combined = 3  # Binary 11
      else:
          combined = 0  # Binary 00

      # Add the combined result to the final number
      hilbert += combined * place
      place *= 4  # Move to the next "digit" (2 bits at a time)

      # Shift the input numbers to process the next bits
      intLat >>= 1
      intLon >>= 1

  return hilbert

def HilbertToPoint(hilbert):
    intLat = 0
    intLon = 0
    bit_position = 0
    intHilbert = int(hilbert)

    while intHilbert > 0:
        # Extract the last 2 bits (base-4 digit)
        combined = intHilbert & 3  # Mask the last 2 bits

        # Decode the bits back into lat and lon components
        if combined == 0:  # Binary 00
            bit1 = 0
            bit2 = 0
        elif combined == 1:  # Binary 01
            bit1 = 0
            bit2 = 1
        elif combined == 2:  # Binary 10
            bit1 = 1
            bit2 = 0
        elif combined == 3:  # Binary 11
            bit1 = 1
            bit2 = 1

        # Reconstruct the bits for lat and lon
        intLat |= (bit1 << bit_position)
        intLon |= (bit2 << bit_position)

        # Shift the Hilbert value to process the next pair
        intHilbert >>= 2
        bit_position += 1

    return intLon - 180, intLat - 90


def RunQuery(lowerLeft, topRight, Time):
  # Filter Step 1: Enlarge query to smallest grid points, not needed for binary manipulation implementation
  newLeftLon = lowerLeft[0]
  newLeftLat = lowerLeft[1]
  newRightLon = topRight[0]
  newRightLat = topRight[1]

  # Filter Step 2: Find Hilbert Numbers and run range query
  # Standardize Data: Make so all positive values. Then get rid of decimal
  normLat = newLeftLat + 90
  normLon = newLeftLon + 180
  # Convert to integer values
  intLat = int(normLat)
  intLon = int(normLon)

  # Compute the hilbert value
  hilbertLeft = PointToHilbert(newLeftLon, newLeftLat)
  hilbertRight = PointToHilbert(newRightLon, newRightLat)

  # Run Range Query

  range_query_window = Envelope(hilbertLeft, Time[0], hilbertRight, Time[1])

  start_time = time.time()

  query_result = RangeQuery.SpatialRangeQuery(
      NewRDDdata,
      range_query_window,
      consider_boundary_intersection,
      using_index
  )

  end_time = time.time()

  elapsed_time = end_time - start_time

  print(f"Function took {elapsed_time} seconds to execute")

  import geopandas as gpd
  query_gdf = gpd.GeoDataFrame(
      query_result.map(lambda x: [x.geom, x.userData]).collect(),
      columns=["geom", "user_data"],
      geometry="geom"
  )

  # Refinement Step: Convert to x, y, t points and output
  output = pd.DataFrame(columns=['Longitude', 'Latitude', 'Time'])
  for index, row in query_gdf.iterrows():
      point = row['geom']
      x_coordinate = point.x  # Access the x-coordinate

      # Perform your transformation on x_coordinate
      lon, lat = HilbertToPoint(x_coordinate)

      t = point.y

      # Append to output
      new_row = {'Longitude': lon, 'Latitude': lat, 'Time': t}
      output.loc[len(output)] = new_row

  print(output[0:5])
  print(output.shape)

  # print(output.min())
  # print(query_gdf)

### Running Queries

In [None]:
LowerLeft = [30, 0]
TopRight = [60, 30]
Time = [1650000000, 1700000000]
RunQuery (LowerLeft, TopRight, Time)

Function took 0.008368968963623047 seconds to execute
   Longitude  Latitude          Time
0         42        18  1.500714e+09
1         38        27  1.500714e+09
2         40        17  1.500714e+09
3         41        25  1.500714e+09
4         42        18  1.500714e+09
(16313, 3)


In [None]:
LowerLeft = [0, 0]
TopRight = [50, 50]
Time = [1600000000, 1650000000]
RunQuery (LowerLeft, TopRight, Time)

Function took 0.02675628662109375 seconds to execute
   Longitude  Latitude          Time
0         42        18  1.500714e+09
1         38        27  1.500714e+09
2         40        17  1.500714e+09
3         41        25  1.500714e+09
4         42        18  1.500714e+09
(17161, 3)


In [None]:
LowerLeft = [-90, -90]
TopRight = [90, 90]
Time = [1500000000, 1800000000]
RunQuery (LowerLeft, TopRight, Time)

Function took 0.012551307678222656 seconds to execute
   Longitude  Latitude          Time
0         42        18  1.500714e+09
1         38        27  1.500714e+09
2         40        17  1.500714e+09
3         41        25  1.500714e+09
4         42        18  1.500714e+09
(18722, 3)


In [None]:
LowerLeft = [100, 40]
TopRight = [180, 50]
Time = [1530000000, 1550000000]
RunQuery (LowerLeft, TopRight, Time)

Function took 0.016635656356811523 seconds to execute
   Longitude  Latitude          Time
0        -37       174  1.500714e+09
1        -40       175  1.500714e+09
2        -45       169  1.500714e+09
3        -37       174  1.500714e+09
4        -39       175  1.500714e+09
(31, 3)


In [None]:
LowerLeft = [60, 40]
TopRight = [180, 90]
Time = [1530000000, 1550000000]
RunQuery (LowerLeft, TopRight, Time)

Function took 0.010028839111328125 seconds to execute
   Longitude  Latitude          Time
0         30        75  1.500715e+09
1         23        72  1.500715e+09
2         13        99  1.500715e+09
3         28        77  1.500715e+09
4         13        99  1.500715e+09
(9316, 3)


### Overview of Results:
Time it took to execute for each query (Seconds):

Q1: 0.008368968963623047

Q2: 0.02675628662109375

Q3: 0.012551307678222656

Q4: 0.016635656356811523

Q5: 0.010028839111328125

In review, the times varied based on what seemed to be mainly the coordinate input more than the time input range. As a note, time is inputted in Unix Time for ease of coding with Apache Sedona API. Amount of spurious points returned may have been a matter of input values being out range in one dimension or another, leading to a larger or sometimes full scan of the data. Another note, discrepencies between times in comments and time is due to that being the time gotten when running the code for milestone 1.