Skip to content

[GH-2830] Add broadcast spatial-join support for the Geography type with ST_Contains#2864

Merged
zhangfengcdt merged 4 commits into
apache:masterfrom
zhangfengcdt:feature/geography.spatialjoin.bc
Apr 29, 2026
Merged

[GH-2830] Add broadcast spatial-join support for the Geography type with ST_Contains#2864
zhangfengcdt merged 4 commits into
apache:masterfrom
zhangfengcdt:feature/geography.spatialjoin.bc

Conversation

@zhangfengcdt
Copy link
Copy Markdown
Member

Did you read the Contributor Guide?

Is this PR related to a ticket?

  • Yes, and the PR name follows the format [GH-XXX] my subject. Closes #<issue_number>

What changes were proposed in this PR?

Adds broadcast spatial-join support for the Geography type with ST_Contains. JoinQueryDetector now plans BroadcastIndexJoinExec for ST_Contains(geog, geog) with a broadcast() hint (previously fell back to a row-by-row BroadcastNestedLoopJoin). The build side is indexed by each Geography's lat/lng bounding rect (full-longitude fallback at the antimeridian) and refined with the existing S2 Functions.contains. Non-broadcast Geography joins still fall through to row-by-row.

Also fixes a Geography ST_Contains correctness bug uncovered during testing.

Out of scope: range/partition joins, distance/KNN joins, and true split-at-±180 indexing for antimeridian polygons.

EXPLAIN on SELECT … JOIN /*+ BROADCAST(b) */ b ON ST_Contains(a.geog, b.geog) now shows BroadcastIndexJoinExec.

How was this patch tested?

New tests:

  • common/.../WkbContainsRoundtripTest — 4/4
  • spark/common/.../BroadcastIndexJoinGeographySuite — 5/5 (both broadcast sides, antimeridian polygon, plan-choice + correctness)

Regression (Spark 3.4 / Scala 2.12), 363/363 total:

  • S2Geography.*Test 83/83, Geography.*Test 33/33
  • BroadcastIndexJoinSuite 65/65, SpatialJoinSuite 160/160, GeographyFunctionTest 17/17

Did this PR include necessary documentation updates?

  • No, this PR does not affect any public API so no need to change the documentation.

@zhangfengcdt zhangfengcdt marked this pull request as ready for review April 28, 2026 18:10
@zhangfengcdt zhangfengcdt requested a review from jiayuasu as a code owner April 28, 2026 18:10
@jiayuasu jiayuasu requested a review from Copilot April 28, 2026 22:18
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds broadcast spatial-join planning for ST_Contains over GeographyUDT, enabling BroadcastIndexJoinExec (index on coarse lat/lng envelope + S2 refine) instead of falling back to BroadcastNestedLoopJoin. Also includes a fix for a Geography ST_Contains correctness issue in the WKB fast-path, plus new regression and planning tests.

Changes:

  • Plan BroadcastIndexJoinExec for ST_Contains(geog, geog) when a broadcast hint (or Sedona auto-broadcast) applies, using a Geography-aware index/refine path.
  • Index Geography rows by their S2 lat/lng bounding rectangle (full-longitude fallback for antimeridian-wrapping rects) and refine with org.apache.sedona.common.geography.Functions.contains.
  • Fix WKB polygon ring handling in WkbS2Shape (open vs closed rings) and add tests covering the regression + join planning/correctness.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
spark/common/src/test/scala/org/apache/sedona/sql/geography/BroadcastIndexJoinGeographySuite.scala Adds planning + correctness tests for broadcast Geography ST_Contains, including antimeridian spanning polygons.
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala Adds toGeographySpatialRDD to build broadcast-side index entries from Geography bytes with coarse envelopes + attached payload.
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala Routes broadcast index build through toGeographySpatialRDD when geographyShape is set.
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinedGeometry.scala Introduces GeographyJoinShape payload and envelope construction from S2 rect bounds (antimeridian full-longitude fallback).
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala Detects Geography ST_Contains and plans broadcast index join with a Geography-specific execution path; keeps non-broadcast fallback row-by-row.
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala Adds Geography refiner (S2 contains) and Geography stream-shape extraction; refactors JTS vs Geography refinement into JoinRefiner.
common/src/test/java/org/apache/sedona/common/S2Geography/WkbContainsRoundtripTest.java Adds regression coverage for WKB round-trip contains correctness.
common/src/main/java/org/apache/sedona/common/S2Geography/WkbS2Shape.java Fixes polygon ring edge counting for open vs closed rings; adds helper to detect closing duplicate coordinates.
Comments suppressed due to low confidence (1)

common/src/main/java/org/apache/sedona/common/S2Geography/WkbS2Shape.java:155

  • WkbS2Shape computes containsOriginValue by calling computeContainsOrigin() unconditionally, but for an empty polygon WKB (numRings == 0) the vertexOffsets/chainLengths arrays are length 0 and computeContainsOrigin() will throw ArrayIndexOutOfBoundsException. Add a fast-path for numRings == 0 (and possibly for rings with < 3 unique vertices) to set containsOriginValue = false without calling computeContainsOrigin().
      case 3: // Polygon
        {
          this.dim = 2;
          int numRings = buf.getInt(payloadOffset);
          this.chainStarts = new int[numRings];
          this.chainLengths = new int[numRings];
          this.vertexOffsets = new int[numRings];

          // First pass: count total vertices and compute offsets. Sedona's WKBWriter writes
          // open rings (n unique vertices, no closing duplicate); standard WKB writes closed
          // rings (n+1 coords with last == first). Detect the closing-duplicate case by
          // comparing the first and last (lon, lat) pair so we get the right edge count
          // either way: edges = uniqueVertices = closed ? ringCoords - 1 : ringCoords.
          int totalVerts = 0;
          int edgeCount = 0;
          int byteOffset = payloadOffset + 4;
          int[] ringCoordCounts = new int[numRings];
          int[] ringByteOffsets = new int[numRings];
          boolean[] ringClosed = new boolean[numRings];
          for (int r = 0; r < numRings; r++) {
            int ringCoords = buf.getInt(byteOffset);
            ringCoordCounts[r] = ringCoords;
            ringByteOffsets[r] = byteOffset + 4;
            boolean closed =
                ringCoords >= 2 && firstAndLastEqual(buf, ringByteOffsets[r], ringCoords);
            ringClosed[r] = closed;
            byteOffset += 4 + ringCoords * 16;

            int ringEdges = closed ? Math.max(0, ringCoords - 1) : ringCoords;
            int storedVerts = closed ? ringCoords : ringCoords;
            chainStarts[r] = edgeCount;
            chainLengths[r] = ringEdges;
            vertexOffsets[r] = totalVerts;
            edgeCount += ringEdges;
            totalVerts += storedVerts + (closed ? 0 : 1); // append closing duplicate for open rings
          }
          this.totalEdges = edgeCount;

          // Second pass: read all vertices, appending a closing duplicate for open rings so
          // the rest of the shape interface (getEdge, getChainEdge, computeContainsOrigin)
          // can index `vertexOffsets[r] + (i % chainLengths[r])` uniformly.
          this.vertices = new S2Point[totalVerts];
          int vi = 0;
          for (int r = 0; r < numRings; r++) {
            S2Point[] ringVerts = readVertices(buf, ringByteOffsets[r], ringCoordCounts[r]);
            System.arraycopy(ringVerts, 0, vertices, vi, ringVerts.length);
            vi += ringVerts.length;
            if (!ringClosed[r] && ringVerts.length > 0) {
              vertices[vi++] = ringVerts[0];
            }
          }

          // Eagerly compute containsOrigin from first ring
          this.containsOriginValue = computeContainsOrigin();
          break;
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jiayuasu
Copy link
Copy Markdown
Member

@zhangfengcdt Please fix these simple comments otherwise this PR is read to be merged

.selectExpr("pt_id", "ST_GeogFromWKT(wkt, 4326) AS pt_geog")
}

private def planUsesBroadcastIndexJoin(df: org.apache.spark.sql.DataFrame): Boolean =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume left join / right join (broadcast version) will work as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — both LEFT OUTER and RIGHT OUTER broadcast variants are supported. The planner only matches them when the build side aligns with the join type:

LEFT OUTER ⇒ broadcast right
RIGHT OUTER ⇒ broadcast left

@zhangfengcdt zhangfengcdt merged commit 62b7406 into apache:master Apr 29, 2026
44 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants