https://chatgpt.com/share/673b8458-f7e4-800a-8a78-6a36162618e6

In [1]:
from sedona.spark import *
import rasterio
import numpy as np
from shapely.geometry import Point, Polygon

In [2]:
# spark_master_url = "spark://192.168.18.110:7077"
spark_master_url = "local[*]"

config = (
    SedonaContext.builder()
    .master(spark_master_url) \
    .appName("RasterProcessingWithSedona") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "8g") \
    .config("spark.hadoop.fs.s3a.access.key", "GD5Lg2x7tLaGxShjozFw") \
    .config("spark.hadoop.fs.s3a.secret.key", "H7J3mn7pI6oK35Xpuyrv6Lk4AsgymaSiQ2zwdAlu") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://192.168.18.115:9000") \
    .config(
        'spark.jars.packages',
        'org.apache.sedona:sedona-spark-3.5_2.12:1.6.1,'
        'org.datasyslab:geotools-wrapper:1.6.1-28.2'
    )
    .getOrCreate()
)

sedona = SedonaContext.create(config)

sc = sedona.sparkContext

# Check if the application is connected to the Spark master
def check_spark_connection():
    try:
        # Access the master URL and application ID
        master_url = sc.master
        app_id = sc.applicationId
        
        print(f"Connected to Spark Master at: {master_url}")
        print(f"Application ID: {app_id}")
        return True
    except Exception as e:
        print("Failed to connect to the Spark cluster.")
        print(f"Error: {e}")
        return False
    
# Run the connection check
if check_spark_connection():
    print("Spark connection successful.")
else:
    print("Failed to connect to Spark. Exiting...")
    spark.stop()  # Stop the Spark session if connection fails
    exit(1)

Connected to Spark Master at: local[*]
Application ID: local-1732470437880
Spark connection successful.


In [3]:
# Step 1: Load Raster Data Using Rasterio
def raster_to_point_geometries(raster_path):
    with rasterio.open(raster_path) as src:
        # Read raster data
        band1 = src.read(1)
        affine = src.transform

        # Convert raster to points
        points = []
        rows, cols = band1.shape
        for row in range(rows):
            for col in range(cols):
                value = band1[row, col]
                if not np.isnan(value):  # Skip no-data values
                    x, y = rasterio.transform.xy(affine, row, col)
                    points.append((Point(x, y), value))
        return points

In [None]:
# Convert raster to points
raster_path = "BFA_hist_land_cover_subindicator_2001_2010.tif"
points_with_values = raster_to_point_geometries(raster_path)

In [None]:
# Step 2: Convert to Spark DataFrame
schema = ["geometry", "value"]
rdd = spark.sparkContext.parallelize(points_with_values)
points_df = spark.createDataFrame(rdd, schema=schema)

In [None]:
# Register as Spatial Table
points_df = points_df.withColumn("geometry", col("geometry").cast(GeometryType()))
points_df.createOrReplaceTempView("raster_points")

In [None]:
# Step 3: Perform Spatial Operations with Sedona
# Example: Query points within a bounding box
bounding_box_wkt = "POLYGON((-180 -90, -180 90, 180 90, 180 -90, -180 -90))"
result = spark.sql(f"""
    SELECT *
    FROM raster_points
    WHERE ST_Contains(ST_GeomFromText('{bounding_box_wkt}'), geometry)
""")

In [None]:
# Show the results
result.show()

In [None]:
# Step 4: Save Results
result.write.csv("output/filtered_raster_points.csv")