# Install Java

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

# Set Environment Variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = "/content/spark-3.5.3-bin-hadoop3/python"

# Install PySpark

In [None]:
!pip install findspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Install Apache Sedona

In [None]:
!pip install apache-sedona[spark]

Collecting apache-sedona[spark]
  Downloading apache_sedona-1.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.0 kB)
Collecting rasterio>=1.2.10 (from apache-sedona[spark])
  Downloading rasterio-1.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.1 kB)
Collecting affine (from rasterio>=1.2.10->apache-sedona[spark])
  Downloading affine-2.4.0-py3-none-any.whl.metadata (4.0 kB)
Collecting cligj>=0.5 (from rasterio>=1.2.10->apache-sedona[spark])
  Downloading cligj-0.7.2-py3-none-any.whl.metadata (5.0 kB)
Collecting click-plugins (from rasterio>=1.2.10->apache-sedona[spark])
  Downloading click_plugins-1.1.1-py2.py3-none-any.whl.metadata (6.4 kB)
Downloading rasterio-1.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (22.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m22.2/22.2 MB[0m [31m67.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading apache_sedona-1.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux

# Start Sedona

In [None]:
from sedona.spark import *
config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.6.1,'
           'org.datasyslab:geotools-wrapper:1.6.1-28.2'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
sedona = SedonaContext.create(config)

# Example Reading Data
Using earthquake data from https://www.kaggle.com/datasets/warcoder/earthquake-dataset?select=earthquake_data.csv

In [None]:
# SQL approach
earthquakes = sedona.read.option("delimiter", ",").option("header", "true").csv("earthquake_data.csv")
earthquakes.createOrReplaceTempView("earthquake")
earthquakes_info = sedona.sql(
      "SELECT latitude, longitude, date_time from earthquake"
)
earthquakes_info.show(5)

+--------+---------+----------------+
|latitude|longitude|       date_time|
+--------+---------+----------------+
| -9.7963|  159.596|22-11-2022 02:03|
| -4.9559|  100.738|18-11-2022 13:37|
|-20.0508| -178.346|12-11-2022 07:09|
|-19.2918| -172.129|11-11-2022 10:48|
|-25.5948|  178.278|09-11-2022 10:14|
+--------+---------+----------------+
only showing top 5 rows



In [None]:
# RDD (Resilient Distributed Database) approach
# more information here: https://sedona.apache.org/1.5.1/tutorial/rdd/
# First modify the file to be longitude, latitude, time format
df = sedona.read.csv('earthquake_data.csv', header=True, inferSchema=True)
df = df[["latitude", "longitude", "date_time"]]
df.write.csv("output", header=False)
# the output is a directory, with part 1 file that contains the data

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/earthquake_data.csv.

In [None]:
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter

data = PointRDD(sedona.sparkContext, 'earthquake_modified.csv', 0, FileDataSplitter.CSV, True)
#before any partitioning the data is stored in rawSpatialRDD
all_records = data.rawSpatialRDD.collect()

Py4JJavaError: An error occurred while calling None.org.apache.sedona.core.spatialRDD.PointRDD.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/content/earthquake_modified.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2488)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1228)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1221)
	at org.apache.spark.api.java.JavaRDDLike.aggregate(JavaRDDLike.scala:427)
	at org.apache.spark.api.java.JavaRDDLike.aggregate$(JavaRDDLike.scala:425)
	at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:45)
	at org.apache.sedona.core.spatialRDD.SpatialRDD.analyze(SpatialRDD.java:420)
	at org.apache.sedona.core.spatialRDD.PointRDD.<init>(PointRDD.java:158)
	at org.apache.sedona.core.spatialRDD.PointRDD.<init>(PointRDD.java:61)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Input path does not exist: file:/content/earthquake_modified.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 38 more


In [None]:
all_records[0:5]

NameError: name 'all_records' is not defined

Map To 1D

In [None]:
# define a grid to use for hilbert
#
#

def hilbert_curve(point):
    # Extract the latitude and longitude from the Point object
    lat = point.geom.x
    lon = point.geom.y

    # Compute the hilbert value
    #
    #

    timestamp = point.getUserData()  # Adjust if necessary

    return str(hilbert) + ',' + str(timestamp)


# Map the PointRDD to a new RDD with the hilbert value and timestamps
updated_data = data.rawSpatialRDD.map(hilbert_curve)

# Store the updated data in new file

# Follow similar steps in the first milestone to read the new updated data

In [None]:
from hilbertcurve.hilbertcurve import HilbertCurve

# Define the grid size and spatial boundaries for the Hilbert curve
grid_size = 1024  # A 1024x1024 grid
min_lat, max_lat = 10.0, 60.0  # Latitude range
min_lon, max_lon = -70.0, 140.0  # Longitude range

# Function to normalize coordinates to the grid
def normalize(value, min_val, max_val, grid_size):
    """
    Normalize a spatial coordinate to fit into a grid of given size.
    :param value: Coordinate value to normalize
    :param min_val: Minimum value of the coordinate range
    :param max_val: Maximum value of the coordinate range
    :param grid_size: Size of the grid (e.g., 1024 for a 1024x1024 grid)
    :return: Normalized coordinate as an integer in the range [0, grid_size-1]
    """
    return int((value - min_val) / (max_val - min_val) * (grid_size - 1))

# Initialize the Hilbert curve
p = int(grid_size).bit_length() - 1  # Order of the Hilbert curve
hilbert_curve = HilbertCurve(p, 2)  # 2D Hilbert curve

def hilbert_curve(point):
    """
    Compute the Hilbert value for a given point and append the timestamp.
    :param point: Spatial point with latitude, longitude, and timestamp
    :return: A string of the form "Hxy,timestamp"
    """
    # Extract the latitude and longitude from the Point object
    lat = point.getX()
    lon = point.getY()

    # Normalize latitude and longitude to the grid
    x = normalize(lat, min_lat, max_lat, grid_size)
    y = normalize(lon, min_lon, max_lon, grid_size)

    # Compute the Hilbert index
    hilbert = hilbert_curve.distance_from_coordinates([x, y])

    # Extract timestamp from user data
    timestamp = point.getUserData()  # Adjust if your PointRDD stores timestamp differently

    return str(hilbert) + ',' + str(timestamp)

# Map the PointRDD to a new RDD with the Hilbert value and timestamps
updated_data = data.rawSpatialRDD.map(hilbert_curve)

# Save the updated data to a new file
output_file_path = "hilbert_transformed_data.txt"
updated_data.saveAsTextFile(output_file_path)

# Steps to read the new updated data
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Read Hilbert Transformed Data").getOrCreate()

# Define schema for the new dataset
from pyspark.sql.types import StructType, StructField, LongType, StringType

schema = StructType([
    StructField("Hxy", LongType(), True),
    StructField("timestamp", StringType(), True)
])

# Read the transformed data into a DataFrame
hilbert_df = spark.read.csv(output_file_path, schema=schema, sep=",", header=False)

# Show the DataFrame for verification
hilbert_df.show(5, truncate=False)


ModuleNotFoundError: No module named 'hilbertcurve'

Own code here onwards


In [None]:
import json
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TwitterDataSchema") \
    .getOrCreate()

# Load the .txt dataset
file_path = '/content/2017-07-22_09-02-53.txt'

# Function to parse JSON lines and infer schema
def parse_twitter_data(file_path):
    # Read the dataset as text file
    raw_data = spark.read.text(file_path)

    # Parse each line as JSON
    json_rdd = raw_data.rdd.map(lambda row: json.loads(row[0]))

    # Convert RDD to DataFrame
    dataframe = spark.read.json(json_rdd)

    return dataframe

# Parse the dataset
twitter_df = parse_twitter_data(file_path)

# Show the inferred schema
twitter_df.printSchema()

# Display a sample of the data
twitter_df.show(truncate=False)


root
 |-- _corrupt_record: string (nullable = true)
 |-- limit: struct (nullable = true)
 |    |-- timestamp_ms: string (nullable = true)
 |    |-- track: long (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
import json
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TwitterDataGeospatialProcessing") \
    .getOrCreate()

# Define the path to the dataset
file_path = '/content/2017-07-22_09-02-53.txt'  # Replace with the actual path

#Load the raw dataset as text (each line is a JSON object)
raw_data = spark.read.text(file_path)

#Parse and convert to DataFrame
json_rdd = raw_data.rdd.map(lambda row: json.loads(row[0]))
df = spark.read.json(json_rdd)

# Show the schema
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- limit: struct (nullable = true)
 |    |-- timestamp_ms: string (nullable = true)
 |    |-- track: long (nullable = true)



In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TwitterDataFix") \
    .getOrCreate()

# Define the path to the dataset
file_path = "/content/2017-07-22_09-02-53.txt"  # Replace with actual path

# Load the dataset with multiLine=True
df = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json(file_path)

# Print the schema
df.printSchema()

# Show some rows to verify the structure
df.show(5, truncate=False)

root
 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- symbols: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- urls: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- favorite_count:

In [None]:
# Inspect the schema
df.printSchema()

# Preview the first few rows
df.show(5, truncate=False)

root
 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- symbols: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- urls: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- favorite_count:

In [None]:
from pyspark.sql.functions import from_json, split,col
from pyspark.sql.types import ArrayType, DoubleType

# Parse the coordinates field if it's a JSON string
df = df.withColumn("parsed_coordinates", from_json(col("coordinates"), ArrayType(DoubleType())))

# If coordinates are comma-separated strings (e.g., "123.45,-67.89"), use split instead:
# df = df.withColumn("parsed_coordinates", split(col("coordinates"), ","))

# Check the schema and preview the parsed data
df.select("coordinates", "parsed_coordinates").show(5, truncate=False)


+-----------+------------------+
|coordinates|parsed_coordinates|
+-----------+------------------+
|NULL       |NULL              |
+-----------+------------------+



In [None]:
from pyspark.sql.functions import expr, col, when

# Calc. latitude (average of second element [1] of each point in bounding box)
df = df.withColumn(
    "latitude",
    when(
        col("place.bounding_box.coordinates").isNotNull(),
        expr("""
            aggregate(
                transform(place.bounding_box.coordinates[0], point -> point[1]),
                0.0D,
                (sum, value) -> sum + value
            ) / 4
        """)  # Average latitude from bounding box
    ).otherwise(None)  # Set to null if bounding_box is null
)

# Calc. longitude (average of first element [0] of each point in bounding box)
df = df.withColumn(
    "longitude",
    when(
        col("place.bounding_box.coordinates").isNotNull(),
        expr("""
            aggregate(
                transform(place.bounding_box.coordinates[0], point -> point[0]),
                0.0D,
                (sum, value) -> sum + value
            ) / 4
        """)  # Average longitude from bounding box
    ).otherwise(None)  # Set to null if bounding_box is null
)

# Show the resulting DataFrame with latitude and longitude
df.select("latitude", "longitude", "place.bounding_box.coordinates").show(5, truncate=False)


+----------+---------+--------------------------------------------------------------------------------------------------+
|latitude  |longitude|coordinates                                                                                       |
+----------+---------+--------------------------------------------------------------------------------------------------+
|53.4569525|-2.23348 |[[[-2.319934, 53.343623], [-2.319934, 53.570282], [-2.147026, 53.570282], [-2.147026, 53.343623]]]|
+----------+---------+--------------------------------------------------------------------------------------------------+



In [None]:

from pyspark.sql.functions import expr, col, when

# Calc. latitude (average of second element [1] of each point in bounding box)
df = df.withColumn(
    "latitude",
    when(
        col("place.bounding_box.coordinates").isNotNull(),
        expr("""
            aggregate(
                transform(place.bounding_box.coordinates[0], point -> point[1]),
                0.0D,
                (sum, value) -> sum + value
            ) / 4
        """)  # Avg latitude from bounding box
    ).otherwise(None)  # Set to null if bounding_box is null
)

# Calc. longitude (avg of 1st element [0] of each point in bounding box)
df = df.withColumn(
    "longitude",
    when(
        col("place.bounding_box.coordinates").isNotNull(),
        expr("""
            aggregate(
                transform(place.bounding_box.coordinates[0], point -> point[0]),
                0.0D,
                (sum, value) -> sum + value
            ) / 4
        """)  # Avg longitude from bounding box
    ).otherwise(None)  # Set to null if bounding_box is null
)

# Create the location col. as a geospatial point (ST_Point)
df = df.withColumn("location", expr("ST_Point(longitude, latitude)"))

# Show the resulting DataFrame
df.select("latitude", "longitude", "place.bounding_box.coordinates").show(5, truncate=False)


+----------+---------+--------------------------------------------------------------------------------------------------+
|latitude  |longitude|coordinates                                                                                       |
+----------+---------+--------------------------------------------------------------------------------------------------+
|53.4569525|-2.23348 |[[[-2.319934, 53.343623], [-2.319934, 53.570282], [-2.147026, 53.570282], [-2.147026, 53.343623]]]|
+----------+---------+--------------------------------------------------------------------------------------------------+



In [None]:
from pyspark.sql.functions import expr, col, when

# Filter out invalid geometries using ST_IsValid
df = df.withColumn("is_valid", expr("ST_IsValid(location)"))

# Filter rows with invalid geometries
df_valid = df.filter(col("is_valid") == True)

# Show the valid data for verification
df_valid.select("latitude", "longitude", "location", "is_valid").show(5, truncate=False)


+----------+---------+---------------------------+--------+
|latitude  |longitude|location                   |is_valid|
+----------+---------+---------------------------+--------+
|53.4569525|-2.23348 |POINT (-2.23348 53.4569525)|true    |
+----------+---------+---------------------------+--------+



In [None]:

!pip install apache-sedona

from pyspark.sql import SparkSession
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.core.SpatialRDD import SpatialRDD
from pyspark.sql.functions import expr, col, when, lit


#nitialize Spark Session with Sedona
spark = SparkSession.builder \
    .appName("TwitterDataRTree") \
    .config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.4.1-incubating,org.datasyslab:geotools-wrapper:1.4.0-25.2") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

#Register Sedona SQL Functions
SedonaRegistrator.registerAll(spark)

#Load the Raw Data
file_path = "/content/2017-07-22_09-02-53.txt"  # Replace with actual dataset path

# Load the raw dataset as text and parse as JSON
raw_data = spark.read.text(file_path)
df = raw_data.selectExpr("CAST(value AS STRING) as json_string")
df = spark.read.json(df.rdd.map(lambda row: row.json_string))


# Calc. latitude (average of second element [1] of each point in bounding box)
df = df.withColumn(
    "latitude",
    when(
        col("place.bounding_box.coordinates").isNotNull(),
        expr("""
            aggregate(
                transform(place.bounding_box.coordinates[0], point -> point[1]),
                0.0D,
                (sum, value) -> sum + value
            ) / 4
        """)  # Average latitude from bounding box
    ).otherwise(lit(None))  # Set to null if bounding_box is null using lit(None)
)

# Calc. longitude (average of first element [0] of each point in bounding box)
df = df.withColumn(
    "longitude",
    when(
        col("place.bounding_box.coordinates").isNotNull(),
        expr("""
            aggregate(
                transform(place.bounding_box.coordinates[0], point -> point[0]),
                0.0D,
                (sum, value) -> sum + value
            ) / 4
        """)  # Average longitude from bounding box
    ).otherwise(lit(None))  # Set to null if bounding_box is null using lit(None)
)

# Create the location column as a geospatial point (ST_Point)
# Filter out rows where latitude or longitude is null to avoid invalid geometries
df = df.withColumn("location", expr("ST_Point(longitude, latitude)")).filter(col("latitude").isNotNull() & col("longitude").isNotNull())

# Verify the transformed dataset
df.select("location", "latitude", "longitude").show(5, truncate=False)



# Ensure only valid geometries are processed
df_clean = df.filter(expr("ST_IsValid(location)"))

# Convert DataFrame to SpatialRDD
spatial_rdd = SpatialRDD()
spatial_rdd.rawSpatialRDD = df_clean.rdd.map(lambda row: row["location"])

# Analyze the SpatialRDD to compute bounding boxes
spatial_rdd.analyze()

# Perform spatial partitioning using QuadTree
spatial_rdd.spatialPartitioning("QUADTREE")

# Build R-Tree index on partitions
spatial_rdd.buildIndex("RTREE", True)

# Confirm partitions and display spatially indexed data
print(f"Number of partitions: {spatial_rdd.getNumPartitions()}")
df_spatial = spatial_rdd.rawSpatialRDD.toDF(["geometry"])
df_spatial.show(5, truncate=False)


Collecting apache-sedona
  Downloading apache_sedona-1.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.0 kB)
Downloading apache_sedona-1.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (190 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.0/190.0 kB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: apache-sedona
Successfully installed apache-sedona-1.7.0


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# Measure query performance
import time
start_time = time.time()

# Filter tweets based on time and spatial range
df_time_spatial_filtered = df_clean.filter(
    (col("timestamp_ms") >= t1) &
    (col("timestamp_ms") <= t2) &
    expr(f"ST_Contains(ST_PolygonFromEnvelope({xmin}, {ymin}, {xmax}, {ymax}), location)")
)

# Count the number of tweets in the specified range
num_tweets = df_time_spatial_filtered.count()

# End timing the query
query_runtime = time.time() - start_time

# Output results
# Corrected way to get the number of partitions
num_partitions = spatial_rdd.rawSpatialRDD.getNumPartitions()  # Retrieve the partition count

# Approximate total number of records
approx_total_records = df_clean.count()

# Print
print(f"No of Partitions: {num_partitions}")
print(f"Query Running time: {query_runtime:.2f} seconds")
print(f"Approximate total no. of records: {approx_total_records}")
print(f"No of tweets in the specified range: {num_tweets}")



NameError: name 'df_clean' is not defined

In [None]:
import numpy as np

# Hilbert Curve 2D Mapping function
def hilbert_curve_2d(x, y, n):
    """
    Convert 2D coordinates (x, y) to a Hilbert number (Hxy).
    x, y: coordinates (in the range [0, n-1]).
    n: the size of the grid (must be a power of 2).
    """
    H = 0
    for s in range(n.bit_length() - 1, -1, -1):
        rx = (x >> s) & 1
        ry = (y >> s) & 1
        H += (n ** 2 // 4) * ((3 * rx) ^ ry)
        x, y = rotate(x, y, rx, ry, s)
    return H

def rotate(x, y, rx, ry, s):
    """Rotate the coordinates based on Hilbert curve rotation."""
    if ry == 0:
        if rx == 1:
            x, y = (n - 1 - x, n - 1 - y)
        x, y = (y, x)
    return x, y


In [None]:
!pip install apache-sedona

import numpy as np
from pyspark.sql import SparkSession
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.core.SpatialRDD import SpatialRDD
from pyspark.sql.functions import expr, col, when, lit, udf
from pyspark.sql.types import LongType

#Initialize Spark Session with Sedona
spark = SparkSession.builder \
    .appName("TwitterDataRTree") \
    .config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.4.1-incubating,org.datasyslab:geotools-wrapper:1.4.0-25.2") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

#Register Sedona SQL Functions
SedonaRegistrator.registerAll(spark)

# Hilbert Curve 2D Mapping function
def hilbert_curve_2d(x, y, n):
    """
    Convert 2D coordinates (x, y) to a Hilbert number (Hxy).
    x, y: coordinates (in the range [0, n-1]).
    n: the size of the grid (must be a power of 2).
    """
    H = 0
    for s in range(n.bit_length() - 1, -1, -1):
        rx = (x >> s) & 1
        ry = (y >> s) & 1
        H += (n ** 2 // 4) * ((3 * rx) ^ ry)
        x, y = rotate(x, y, rx, ry, s, n)  # Pass n to rotate
    return H

def rotate(x, y, rx, ry, s, n):  # Add n as a parameter
    """Rotate the coordinates based on Hilbert curve rotation."""
    if ry == 0:
        if rx == 1:
            x, y = (n - 1 - x, n - 1 - y)
        x, y = (y, x)
    return x, y

# Find the min and max latitude and longitude values
min_lat = df_clean.selectExpr("min(latitude)").first()[0]
max_lat = df_clean.selectExpr("max(latitude)").first()[0]
min_lon = df_clean.selectExpr("min(longitude)").first()[0]
max_lon = df_clean.selectExpr("max(longitude)").first()[0]

# Now define the Hilbert transformation function with proper normalization
def spatial_to_hilbert(latitude, longitude, n=1024):
    # Normalize latitude and longitude to be in the range [0, n-1]
    x = int((latitude - min_lat) / (max_lat - min_lat) * (n - 1))  # Normalize latitude
    y = int((longitude - min_lon) / (max_lon - min_lon) * (n - 1))  # Normalize longitude
    return hilbert_curve_2d(x, y, n)

# Register the UDF (User Defined Function) for Hilbert transformation
spatial_to_hilbert_udf = udf(spatial_to_hilbert, LongType())

# Apply the UDF to the dataframe
df_hilbert = df_clean.withColumn("Hxy", spatial_to_hilbert_udf(col("latitude"), col("longitude")))

# Show result
df_hilbert.select("latitude", "longitude", "Hxy").show(5, truncate=False)



  SedonaRegistrator.registerAll(spark)


NameError: name 'df_clean' is not defined

In [None]:
from pyspark.sql.functions import struct, col

#Add Hilbert number (Hxy) and timestamp (t) to create the final (Hxy, t) data point
df_hilbert_time = df_hilbert.withColumn("Hxy_t", struct(col("Hxy"), col("timestamp_ms").alias("t")))

#Show the result
df_hilbert_time.select("latitude", "longitude", "Hxy", "timestamp_ms", "Hxy_t").show(5, truncate=False)


+------------------+-----------+-------+-------------+------------------------+
|latitude          |longitude  |Hxy    |timestamp_ms |Hxy_t                   |
+------------------+-----------+-------+-------------+------------------------+
|53.4569525        |-2.23348   |2621440|1500714173451|{2621440, 1500714173451}|
|28.362256000000002|129.532835 |4194304|1500714173496|{4194304, 1500714173496}|
|10.483539         |-66.930644 |4456448|1500714173590|{4456448, 1500714173590}|
|22.869936000000003|113.4197245|5767168|1500714172528|{5767168, 1500714172528}|
|35.701657         |139.7091805|5242880|1500714173418|{5242880, 1500714173418}|
+------------------+-----------+-------+-------------+------------------------+
only showing top 5 rows



In [None]:
!pip install apache-sedona

from pyspark.sql.functions import expr, lit, col
from sedona.core.enums import IndexType
from sedona.core.SpatialRDD import SpatialRDD, PointRDD
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from sedona.core.geometry import Point

#Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spatial Partitioning with R-Tree") \
    .getOrCreate()


# Convert DataFrame into SpatialRDD

# Convert the DataFrame into an RDD of Spatial Points (Hxy and timestamp_ms)
# Create Point objects from Hxy and timestamp_ms
spatial_rdd = PointRDD(
    df_hilbert_time.rdd.map(lambda row: Point(row['Hxy'], row['timestamp_ms'])),
    StorageLevel.MEMORY_ONLY
)

#Spatial Partitioning - Use QUADTREE partitioning strategy
spatial_rdd.spatialPartitioning("QUADTREE")

#Number of partitions after spatial partitioning
print(f"Number of Partitions: {spatial_rdd.getNumPartitions()}")

#Build R-Tree Index on the SpatialRDD
spatial_rdd.buildIndex(IndexType.RTREE, False)

#Check index
print("R-tree index built on SpatialRDD")



ModuleNotFoundError: No module named 'sedona.core.geometry'

In [None]:
#Enlarge the Query Range and Compute Hilbert Numbers

# Enlarge the query range to fit grid points (u1, v1, u2, v2)
def enlarge_query_range(x1, y1, x2, y2, grid_size=1024):
    """
    Enlarge the query range to the smallest enclosing rectangle whose corners are grid points.
    This means scaling the coordinates to fit the nearest grid boundary.
    """
    # Normalize the coords to the range [0, grid_size-1]
    u1 = int((x1 - min_lat) / (max_lat - min_lat) * (grid_size - 1))
    v1 = int((y1 - min_lon) / (max_lon - min_lon) * (grid_size - 1))
    u2 = int((x2 - min_lat) / (max_lat - min_lat) * (grid_size - 1))
    v2 = int((y2 - min_lon) / (max_lon - min_lon) * (grid_size - 1))

    # Return the enlarged rectangle coords
    return u1, v1, u2, v2

# Function to compute Hilbert number
def hilbert_number(x, y, grid_size=1024):
    """
    Compute the Hilbert number for the given (x, y) coordinate.
    """
    return hilbert_curve_2d(x, y, grid_size)

# Ex. query parameters: [(x1, y1), (x2, y2), (t1, t2)]
x1, y1 = -2.4, 53.2  # Bottom-left corner of bounding box
x2, y2 = -2.1, 53.6  # Top-right corner of bounding box
t1 = 1500714000000  # Start time (lower bound)
t2 = 1500715000000  # End time (upper bound)

min_lat = -90.0
max_lat = 90.0
min_lon = -180.0
max_lon = 180.0

#Enlarge the query range
u1, v1, u2, v2 = enlarge_query_range(x1, y1, x2, y2)

#Compute Hilbert numbers
Hu1v1 = hilbert_number(u1, v1)
Hu2v2 = hilbert_number(u2, v2)

print(f"Enlarged query range: ({u1}, {v1}) to ({u2}, {v2})")
print(f"Hilbert numbers: Hu1v1 = {Hu1v1}, Hu2v2 = {Hu2v2}")

Enlarged query range: (497, 662) to (499, 663)
Hilbert numbers: Hu1v1 = 4980736, Hu2v2 = 4980736


In [None]:
import time
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import LongType

# Normalize the coords to [0, grid_size-1]
def normalize(x, min_val, max_val, grid_size):
    return int((x - min_val) / (max_val - min_val) * (grid_size - 1))

# Function to calculate Hilbert index using bit-level interleaving (based on normalized coords)
def hilbert_index(x, y, grid_size):
    """
    Calculate the Hilbert number for normalized (x, y) coordinates.
    :param x: Normalized x-coordinate
    :param y: Normalized y-coordinate
    :param grid_size: Grid size for Hilbert curve
    :return: Hilbert index
    """
    n = grid_size
    result = 0
    shift = 0

    # Calc. Hilbert index using bit-level operations
    while n > 0:
        # Extract the relevant bits from x and y
        result |= ((x >> shift) & 1) << (2 * shift)
        result |= ((y >> shift) & 1) << (2 * shift + 1)
        shift += 1
        n >>= 1

    return result

#normalizing coordinates & calc. the Hilbert number
min_lat, max_lat = 10.0, 60.0  # Latitude min/max range
min_lon, max_lon = -70.0, 140.0  # Longitude min/max range
grid_size = 1024  # Grid size for Hilbert transformation

# Function to normalize & compute Hilbert index
def spatial_to_hilbert(lat, lon, grid_size):
    # Normalize coordinates to [0, grid_size-1]
    x = normalize(lat, min_lat, max_lat, grid_size)
    y = normalize(lon, min_lon, max_lon, grid_size)

    # Use Hilbert index calculation function
    return hilbert_index(x, y, grid_size)

# Register the UDF to apply Hilbert calc.
spatial_to_hilbert_udf = udf(lambda lat, lon: spatial_to_hilbert(lat, lon, grid_size), LongType())

# Add the Hilbert index to your DataFrame
df_hilbert_time = df.withColumn("Hxy", spatial_to_hilbert_udf(col("latitude"), col("longitude")))

# Function to execute the query with temporal & spatial filters
def execute_query(df_hilbert_time, x1, y1, x2, y2, t1, t2, Hu1v1, Hu2v2):
    """
    Executes a range query with spatial and temporal filters.
    :param df_hilbert_time: DataFrame containing (Hxy, t) data
    :param x1, y1: Bottom-left corner of the original bounding box
    :param x2, y2: Top-right corner of the original bounding box
    :param t1, t2: Temporal range (start and end time in milliseconds)
    :param Hu1v1, Hu2v2: Hilbert numbers for enlarged query range corners
    :return: Refined DataFrame
    """
    start_time = time.time()

    #Retrieve data points lying between Hu1v1 & Hu2v2 within the time range (t1, t2)
    df_filtered = df_hilbert_time.filter(
        (col("Hxy") >= Hu1v1) & (col("Hxy") <= Hu2v2) &
        (col("timestamp_ms") >= t1) & (col("timestamp_ms") <= t2)
    )

    #onvert the results into (x, y, t) triples and filter by the exact query range
    df_refined = df_filtered.filter(
        (col("latitude") >= x1) & (col("latitude") <= x2) &
        (col("longitude") >= y1) & (col("longitude") <= y2)
    )

    #Calc.the fraction of spurious points
    num_spurious_points = df_filtered.subtract(df_refined).count()
    total_points = df_filtered.count()
    spurious_fraction = num_spurious_points / total_points if total_points > 0 else 0

    #O/P the running time and the fraction of spurious points
    running_time = time.time() - start_time
    print(f"Running time: {running_time:.4f} seconds")
    print(f"Fraction of spurious points: {spurious_fraction:.4f}")
    print(f"Number of points in the exact query range: {df_refined.count()}")

    return df_refined


#Query: [(x1, y1), (x2, y2), (t1, t2)]
x1, y1 = -2.8, 53.2  # Bottom-left corner of bounding box
x2, y2 = -2.1, 54.6  # Top-right corner of bounding box
t1 = 1500714000000  # Start time (lower bound)
t2 = 1500715000000  # End time (upper bound)

# Enlarge the query range for testing
u1, v1 = 490, 650  #Grid-aligned coordinates
u2, v2 = 510, 670  # Enlarge the query range corners

# Compute Hilbert numbers for enlarged query range corners
grid_size = 1024
Hu1v1 = spatial_to_hilbert(u1, v1, grid_size)
Hu2v2 = spatial_to_hilbert(u2, v2, grid_size)

print(f"Enlarged query range: ({u1}, {v1}) to ({u2}, {v2})")
print(f"Hilbert numbers: Hu1v1 = {Hu1v1}, Hu2v2 = {Hu2v2}")

# Execute query
df_result = execute_query(df_hilbert_time, x1, y1, x2, y2, t1, t2, Hu1v1, Hu2v2)

# Show the final filtered result
df_result.show(5, truncate=False)


Enlarged query range: (490, 650) to (510, 670)
Hilbert numbers: Hu1v1 = 3578714, Hu2v2 = 4020020
Running time: 1.9238 seconds
Fraction of spurious points: 0.0000
Number of points in the exact query range: 0
+------------+-----------+----------+------------------+--------+--------------+---------+------------+---+---+------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----+-------------+---------+------+----+------------+---------+----+------------------+--------+---------+--------+--------+---+
|contributors|coordinates|created_at|display_text_range|entities|favorite_count|favorited|filter_level|geo|id |id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|place|retweet_count|retweeted|source|text|timestamp_ms|truncated|user|parsed_coordinates|latitude|longitude|location|is_valid|Hxy|
+------------+-------

REFERENCES (in addition to previously cited mterial):

1. https://www.geeksforgeeks.org/hilbert-number/
2.https://www.youtube.com/watch?v=_T3cIK9KFdE
3.Coursework covered in class