Spark 4.1: Map geo Spark types#16851
Conversation
szehon-ho
left a comment
There was a problem hiding this comment.
Thanks for splitting this out — the type mapping is clean and the CRS round-trip coverage is good. One semantic-loss concern on the geography edge algorithm, plus a couple of optional follow-ups. Nothing blocking.
| .put(TypeID.FIXED, ImmutableSet.of(BinaryType$.class)) | ||
| .put(TypeID.BINARY, ImmutableSet.of(BinaryType$.class)) | ||
| .put(TypeID.GEOMETRY, ImmutableSet.of(GeometryType.class)) | ||
| .put(TypeID.GEOGRAPHY, ImmutableSet.of(GeographyType.class)) |
There was a problem hiding this comment.
Unlike DECIMAL, geo types get no additional CRS/SRID compatibility check in primitive(). Since primitive() returns the table's own type there's no corruption, but a requested geo column with a mismatched CRS passes pruning silently. Is deferring SRID validation intentional (consistent with UUID->String), or worth a follow-up?
There was a problem hiding this comment.
Good point. I am leaving CRS/SRID validation out of this split for now: pruning only validates the requested Spark type class and returns the table Iceberg type, so it cannot change the table CRS. The CRS/SRID compatibility check belongs with the read/write path where Spark values are materialized.
There was a problem hiding this comment.
Revisiting this — I think it's worth adding CRS/algorithm checks here now, mirroring DECIMAL. Unlikely in normal reads (Spark copies types from the table schema), but cheap defensive validation.
Example for primitive():
case GEOMETRY:
Types.GeometryType geometry = (Types.GeometryType) primitive;
GeometryType requestedGeometry = (GeometryType) current;
Preconditions.checkArgument(
geometry.crs().equalsIgnoreCase(requestedGeometry.crs()),
"Cannot project geometry with incompatible CRS: %s != %s",
geometry.crs(),
requestedGeometry.crs());
break;
case GEOGRAPHY:
Types.GeographyType geography = (Types.GeographyType) primitive;
GeographyType requestedGeography = (GeographyType) current;
Preconditions.checkArgument(
geography.crs().equalsIgnoreCase(requestedGeography.crs()),
"Cannot project geography with incompatible CRS: %s != %s",
geography.crs(),
requestedGeography.crs());
Preconditions.checkArgument(
geography.algorithm() == requestedGeography.algorithm(),
"Cannot project geography with incompatible edge algorithm: %s != %s",
geography.algorithm(),
requestedGeography.algorithm());
break;And a test, e.g. table geometry(EPSG:3857) with requested GeometryType(EPSG:4326) → Cannot project geometry with incompatible CRS.
There was a problem hiding this comment.
Done, added the CRS/algorithm checks plus a test. The algorithm compare is cross-type (Iceberg vs Spark enum), so I reused convertAlgorithm to translate first instead of ==. Thanks!
szehon-ho
left a comment
There was a problem hiding this comment.
Thanks for splitting this out — the type mapping is clean and the CRS round-trip coverage is good. One semantic-loss concern on the geography edge algorithm, plus a couple of optional follow-ups. Nothing blocking.
|
discussed offline, I think we should align on what metadata to pass through from Spark (probably we should write out the crs/algorithm for clarity eventually, but currently it nullifies it if it is equal to default) |
szehon-ho
left a comment
There was a problem hiding this comment.
A few optional follow-ups on the geo type mapping. Nothing blocking — the mapping itself is correct and the CRS round-trip coverage is good.
| return GeometryType$.MODULE$.apply(geometry.crs()); | ||
| case GEOGRAPHY: | ||
| Types.GeographyType geography = (Types.GeographyType) primitive; | ||
| if (geography.algorithm() != EdgeAlgorithm.SPHERICAL) { |
There was a problem hiding this comment.
Nit: consider an explicit translation layer between Iceberg's EdgeAlgorithm and Spark's EdgeInterpolationAlgorithm rather than the inline != SPHERICAL guard. It would make the supported set self-documenting and could also validate the CRS here (Spark only accepts OGC:CRS84 for geography, so other CRS values currently surface a raw Spark error instead of a clear Iceberg one). Optional.
| // Spark only supports the spherical edge-interpolation algorithm, which matches the Iceberg | ||
| // default, so the Spark algorithm is intentionally not propagated here. Revisit if Spark | ||
| // starts supporting additional algorithms. | ||
| return Types.GeographyType.of(((GeographyType) atomic).crs()); |
There was a problem hiding this comment.
Reverse of the translation-layer suggestion on TypeToSparkType: Spark exposes algorithm(), so we could pass it through to Iceberg instead of dropping it and defaulting to SPHERICAL. Functionally identical today (Spark only has SPHERICAL), but keeps both directions symmetric and fails loudly if Spark ever adds an algorithm Iceberg lacks. The comment already documents the trade-off, so purely optional.
There was a problem hiding this comment.
Nit (still optional): TypeToSparkType now has an explicit convertAlgorithm that throws UnsupportedOperationException. The reverse path uses EdgeAlgorithm.fromName(...), which throws IllegalArgumentException for an unknown algorithm. Consider a symmetric reverse helper so both directions fail the same way:
private static EdgeAlgorithm convertAlgorithm(EdgeInterpolationAlgorithm algorithm) {
switch (algorithm.toString().toUpperCase(Locale.ROOT)) {
case "SPHERICAL":
return EdgeAlgorithm.SPHERICAL;
default:
throw new UnsupportedOperationException(
"Iceberg does not support Spark geography edge algorithm: " + algorithm);
}
}
// call site:
return Types.GeographyType.of(geography.crs(), convertAlgorithm(geography.algorithm()));Functionally identical today (Spark only has SPHERICAL), but self-documents the supported set and fails loudly if Spark adds an algorithm Iceberg doesn't know about yet.
There was a problem hiding this comment.
Done — added a matching reverse convertAlgorithm so both directions fail loudly on an unknown algorithm.
|
|
||
| @Test | ||
| public void testGeospatialTypeConversion() { | ||
| Types.GeometryType geometry = Types.GeometryType.of("EPSG:3857"); |
There was a problem hiding this comment.
Nit: this only exercises explicit CRS values. Adding a default-CRS geometry round-trip (Types.GeometryType.crs84()) would lock in the null <-> OGC:CRS84 normalization, which is the subtlest part of the mapping.
| StructType requestedType = SparkSchemaUtil.convert(schema); | ||
| Schema pruned = SparkSchemaUtil.prune(schema, requestedType); | ||
|
|
||
| assertThat(pruned.asStruct()).isEqualTo(schema.asStruct()); |
There was a problem hiding this comment.
Nit: this covers only the happy path. A negative case (requesting an incompatible Spark type for a geo column, triggering the Cannot project ... incompatible type precondition) would round out coverage, though it mirrors how other primitives are tested so it's optional.
|
Thanks @szehon-ho! Pushed 8d4a020 with the two test follow-ups; left the type-mapping code as-is for now and want to explain why:
|
Add schema conversion support between Iceberg geometry/geography types and Spark 4.1 geospatial DataTypes, and allow pruning projections over those types.
Avoid silently coercing Iceberg geography types with non-spherical edge algorithms when converting to Spark's geography type.
Spark 4.1 only accepts OGC:CRS84 as the geography CRS, so the schema conversion tests now use it instead of EPSG:4326. Also document why the Spark edge algorithm is not propagated back to Iceberg.
8d4a020 to
55af6ce
Compare
Iceberg permits any non-empty CRS, but Spark 4.1 recognizes only a fixed set (geometry: SRID:0 / EPSG:3857 / OGC:CRS84; geography: OGC:CRS84) and throws an opaque ST_INVALID_CRS_VALUE for the rest. Translate that into a clear UnsupportedOperationException when converting an Iceberg geo type to Spark, mirroring the existing edge-algorithm rejection. In the reverse direction a Spark mixed-SRID geometry/geography carries the sentinel CRS "SRID:ANY"; reject it instead of persisting a spec-invalid Iceberg CRS, since an Iceberg geo CRS is column-level.
f509540 to
58fadd0
Compare
Replace the inline algorithm != SPHERICAL guard with an explicit EdgeAlgorithm -> EdgeInterpolationAlgorithm translation, so the supported set (only spherical) is self-documenting, and route both geo types through the conversion helpers.
58fadd0 to
6632e21
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
A few optional follow-ups with concrete examples — nothing blocking.
| // Spark's only edge-interpolation algorithm. Spark exposes no Java-accessible constant for it, so | ||
| // it is resolved once by name through the companion's case-insensitive parser. | ||
| private static final EdgeInterpolationAlgorithm SPARK_SPHERICAL = | ||
| EdgeInterpolationAlgorithm$.MODULE$.fromString("SPHERICAL").get(); |
There was a problem hiding this comment.
Nit: .get() on the Option will throw NoSuchElementException if Spark ever renames this value. That's probably fine as intentional fail-fast at class-load time, but a one-line note (or orElseThrow with a clearer message) would make that intent obvious:
private static final EdgeInterpolationAlgorithm SPARK_SPHERICAL =
EdgeInterpolationAlgorithm$.MODULE$
.fromString("SPHERICAL")
.getOrElse(() -> {
throw new IllegalStateException("Spark EdgeInterpolationAlgorithm.SPHERICAL not found");
});There was a problem hiding this comment.
Done, switched to getOrElse with a clear message.
| // CRS into a clear Iceberg error. | ||
| private static DataType convertAndValidate( | ||
| String kind, String crs, Supplier<DataType> conversion) { | ||
| try { |
There was a problem hiding this comment.
Nit: this only catches SparkIllegalArgumentException. If Spark throws a different exception type for an invalid CRS, users may still see an opaque Spark error. Probably fine for now given test coverage, but worth keeping in mind if Spark's error types change. E.g. today GeometryType.of("EPSG:4269") → UnsupportedOperationException, but a future Spark change could bypass this wrapper.
There was a problem hiding this comment.
Maybe we could leave as-is for now? toSrid() only throws SparkIllegalArgumentException for a bad CRS, so this covers it — happy to revisit if Spark's error types change. Thanks!
Mirror the DECIMAL parameter check in PruneColumnsWithoutReordering for geometry and geography: reject a requested Spark geo type whose CRS (both types) or edge algorithm (geography) disagrees with the table's, instead of letting the mismatch pass pruning silently. Spark normally copies the type from the table schema so this is defensive, but it matches how other parameterized primitives are validated. Algorithm compares across type systems (Iceberg EdgeAlgorithm vs Spark EdgeInterpolationAlgorithm), so reuse TypeToSparkType.convertAlgorithm to translate the table's algorithm into Spark's type before comparing; make that helper package-private and add a symmetric reverse helper in SparkTypeToType so both directions fail loudly on an algorithm the other side lacks. Also harden the SPARK_SPHERICAL lookup to fail fast with a clear message if Spark renames the value.
szehon-ho
left a comment
There was a problem hiding this comment.
Follow-up on SPARK_SPHERICAL lookup.
| // Spark's only edge-interpolation algorithm. Spark exposes no Java-accessible constant for it, so | ||
| // it is resolved once by name through the companion's case-insensitive parser. If Spark ever | ||
| // renames this value the lookup fails fast at class-load time with a clear message. | ||
| private static final EdgeInterpolationAlgorithm SPARK_SPHERICAL = |
There was a problem hiding this comment.
Actually, Spark does expose a constant here — same pattern as BooleanType$.MODULE$ etc. SPHERICAL is a Scala case object, so from Java:
import org.apache.spark.sql.types.EdgeInterpolationAlgorithm.SPHERICAL$;
private static final EdgeInterpolationAlgorithm SPARK_SPHERICAL = SPHERICAL$.MODULE$;JavaDoc. That makes the comment above ("no Java-accessible constant") inaccurate — fromString("SPHERICAL") works but isn't needed.
- Use EdgeInterpolationAlgorithm.SPHERICAL$.MODULE$ directly instead of resolving it by name via fromString(...).getOrElse(...). Spark exposes the case object as a Java-accessible constant (same pattern as BooleanType$ etc.), so the name lookup and its fallback are unnecessary. - Drop the convertAndValidate wrapper that caught SparkIllegalArgumentException and re-threw UnsupportedOperationException. SparkIllegalArgumentException is already an IllegalArgumentException carrying a clear ST_INVALID_CRS_VALUE message, and catching an unchecked type not on apply()'s signature coupled us to a Spark internal. Let it propagate; the error can be refined later if needed.
|
Merged, thanks @huan233usc ! |
The Spark type mapping (apache#16851) and Iceberg's own Parquet value path (apache#16982) are in place, but the Spark Parquet reader/writer did not handle geo values: geometry/geography carry a LogicalTypeAnnotation with no legacy OriginalType, so the reader fell through to a raw byte[] (mis-typed for a GeometryVal/GeographyVal column) and the writer threw on the unsupported-logical-type path. Read a geo WKB BINARY column into Spark's GeometryVal/GeographyVal and write those values back as their WKB bytes, mirroring the existing binary handling. Enable the shared geospatial DataTest coverage for the Spark Parquet reader and add a Spark writer round-trip test, including null values.
Summary
geometry/geographyto SparkGeometryType/GeographyTypewhile preserving CRS.PruneColumnsWithoutReorderingto project geospatial columns.This is split out from #16650 and intentionally does not include Parquet read/write or end-to-end SQL tests.
Test plan
./gradlew :iceberg-spark:iceberg-spark-4.1_2.13:test --tests org.apache.iceberg.spark.TestSparkSchemaUtil.testGeospatialTypeConversion --tests org.apache.iceberg.spark.TestSparkSchemaUtil.testPruneGeospatialTypes