diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala index a3f23b9c4b..7a88a111ba 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala @@ -14,13 +14,15 @@ package org.apache.spark.sql.execution.datasources.parquet import org.json4s.jackson.JsonMethods.parse -import org.json4s.JValue +import org.json4s.{JNothing, JNull, JValue} /** * A case class that holds the metadata of geometry column in GeoParquet metadata * @param encoding Name of the geometry encoding format. Currently only "WKB" is supported * @param geometryTypes The geometry types of all geometries, or an empty array if they are not known. * @param bbox Bounding Box of the geometries in the file, formatted according to RFC 7946, section 5. + * @param crs The CRS of the geometries in the file. None if crs metadata is absent, Some(JNull) if crs is null, + * Some(value) if the crs is present and not null. */ case class GeometryFieldMetaData( encoding: String, @@ -58,7 +60,16 @@ object GeoParquetMetaData { def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = { Option(keyValueMetaData.get("geo")).map { geo => implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats - parse(geo).camelizeKeys.extract[GeoParquetMetaData] + val geoObject = parse(geo) + val metadata = geoObject.camelizeKeys.extract[GeoParquetMetaData] + metadata.copy(columns = metadata.columns.map { case (name, column) => + // Postprocess to distinguish between null (JNull) and missing field (JNothing). + geoObject \ "columns" \ name \ "crs" match { + case JNothing => name -> column.copy(crs = None) + case JNull => name -> column.copy(crs = Some(JNull)) + case _ => name -> column + } + }) } } } diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index fe1b4f8c5f..545bbfb31e 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory { UTF8String.fromString(columnMetadata.encoding), new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray), new GenericArrayData(columnMetadata.bbox.toArray), - columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull) + columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))) + .getOrElse(UTF8String.fromString(""))) val columnMetadataStruct = new GenericInternalRow(columnMetadataFields) UTF8String.fromString(columnName) -> columnMetadataStruct } diff --git a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala index 71e1595422..44359defb7 100644 --- a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala +++ b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation: String = resourceFolder + "geoparquet/" + val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" describe("GeoParquet Metadata tests") { it("Reading GeoParquet Metadata") { @@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { columnMetadata.getAs[String]("encoding") == "WKB" && columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) && columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) && - columnMetadata.getAs[String]("crs").nonEmpty + columnMetadata.getAs[String]("crs").nonEmpty && + columnMetadata.getAs[String]("crs") != "null" } }) } @@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { assert(metadataArray.forall(_.getAs[String]("primary_column") == null)) assert(metadataArray.forall(_.getAs[String]("columns") == null)) } + + it("Read GeoParquet without CRS") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet") + val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite").save(geoParquetSavePath) + val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath) + val row = dfMeta.collect()(0) + val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row] + assert(metadata.getAs[String]("crs") == "") + } + + it("Read GeoParquet with null CRS") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet") + val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite").save(geoParquetSavePath) + val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath) + val row = dfMeta.collect()(0) + val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row] + assert(metadata.getAs[String]("crs") == "null") + } } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 874d2e743d..c192a7a94f 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory { UTF8String.fromString(columnMetadata.encoding), new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray), new GenericArrayData(columnMetadata.bbox.toArray), - columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull) + columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))) + .getOrElse(UTF8String.fromString(""))) val columnMetadataStruct = new GenericInternalRow(columnMetadataFields) UTF8String.fromString(columnName) -> columnMetadataStruct } diff --git a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala index 71e1595422..44359defb7 100644 --- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala +++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation: String = resourceFolder + "geoparquet/" + val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" describe("GeoParquet Metadata tests") { it("Reading GeoParquet Metadata") { @@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { columnMetadata.getAs[String]("encoding") == "WKB" && columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) && columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) && - columnMetadata.getAs[String]("crs").nonEmpty + columnMetadata.getAs[String]("crs").nonEmpty && + columnMetadata.getAs[String]("crs") != "null" } }) } @@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { assert(metadataArray.forall(_.getAs[String]("primary_column") == null)) assert(metadataArray.forall(_.getAs[String]("columns") == null)) } + + it("Read GeoParquet without CRS") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet") + val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite").save(geoParquetSavePath) + val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath) + val row = dfMeta.collect()(0) + val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row] + assert(metadata.getAs[String]("crs") == "") + } + + it("Read GeoParquet with null CRS") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet") + val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite").save(geoParquetSavePath) + val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath) + val row = dfMeta.collect()(0) + val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row] + assert(metadata.getAs[String]("crs") == "null") + } } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 874d2e743d..c192a7a94f 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory { UTF8String.fromString(columnMetadata.encoding), new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray), new GenericArrayData(columnMetadata.bbox.toArray), - columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull) + columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))) + .getOrElse(UTF8String.fromString(""))) val columnMetadataStruct = new GenericInternalRow(columnMetadataFields) UTF8String.fromString(columnName) -> columnMetadataStruct } diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala index 71e1595422..44359defb7 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation: String = resourceFolder + "geoparquet/" + val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" describe("GeoParquet Metadata tests") { it("Reading GeoParquet Metadata") { @@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { columnMetadata.getAs[String]("encoding") == "WKB" && columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) && columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) && - columnMetadata.getAs[String]("crs").nonEmpty + columnMetadata.getAs[String]("crs").nonEmpty && + columnMetadata.getAs[String]("crs") != "null" } }) } @@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { assert(metadataArray.forall(_.getAs[String]("primary_column") == null)) assert(metadataArray.forall(_.getAs[String]("columns") == null)) } + + it("Read GeoParquet without CRS") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet") + val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite").save(geoParquetSavePath) + val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath) + val row = dfMeta.collect()(0) + val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row] + assert(metadata.getAs[String]("crs") == "") + } + + it("Read GeoParquet with null CRS") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet") + val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite").save(geoParquetSavePath) + val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath) + val row = dfMeta.collect()(0) + val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row] + assert(metadata.getAs[String]("crs") == "null") + } } }