Skip to content

Commit

Permalink
[SEDONA-470] Distinguish between missing or null crs from the result …
Browse files Browse the repository at this point in the history
…of geoparquet.metadata (#1211)

* geoparquet.metadata should show distinct output for missing crs and null crs.

* Apply the patch to Spark 3.4 and Spark 3.5
  • Loading branch information
Kontinuation committed Jan 22, 2024
1 parent 33ace6f commit bedea9a
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package org.apache.spark.sql.execution.datasources.parquet

import org.json4s.jackson.JsonMethods.parse
import org.json4s.JValue
import org.json4s.{JNothing, JNull, JValue}

/**
* A case class that holds the metadata of geometry column in GeoParquet metadata
* @param encoding Name of the geometry encoding format. Currently only "WKB" is supported
* @param geometryTypes The geometry types of all geometries, or an empty array if they are not known.
* @param bbox Bounding Box of the geometries in the file, formatted according to RFC 7946, section 5.
* @param crs The CRS of the geometries in the file. None if crs metadata is absent, Some(JNull) if crs is null,
* Some(value) if the crs is present and not null.
*/
case class GeometryFieldMetaData(
encoding: String,
Expand Down Expand Up @@ -58,7 +60,16 @@ object GeoParquetMetaData {
def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
parse(geo).camelizeKeys.extract[GeoParquetMetaData]
val geoObject = parse(geo)
val metadata = geoObject.camelizeKeys.extract[GeoParquetMetaData]
metadata.copy(columns = metadata.columns.map { case (name, column) =>
// Postprocess to distinguish between null (JNull) and missing field (JNothing).
geoObject \ "columns" \ name \ "crs" match {
case JNothing => name -> column.copy(crs = None)
case JNull => name -> column.copy(crs = Some(JNull))
case _ => name -> column
}
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
UTF8String.fromString(columnMetadata.encoding),
new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull)
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson))))
.getOrElse(UTF8String.fromString("")))
val columnMetadataStruct = new GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._

class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/"

describe("GeoParquet Metadata tests") {
it("Reading GeoParquet Metadata") {
Expand All @@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
columnMetadata.getAs[String]("encoding") == "WKB" &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) &&
columnMetadata.getAs[String]("crs").nonEmpty
columnMetadata.getAs[String]("crs").nonEmpty &&
columnMetadata.getAs[String]("crs") != "null"
}
})
}
Expand All @@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
assert(metadataArray.forall(_.getAs[String]("columns") == null))
}

it("Read GeoParquet without CRS") {
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "")
.mode("overwrite").save(geoParquetSavePath)
val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
val row = dfMeta.collect()(0)
val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "")
}

it("Read GeoParquet with null CRS") {
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "null")
.mode("overwrite").save(geoParquetSavePath)
val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
val row = dfMeta.collect()(0)
val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "null")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
UTF8String.fromString(columnMetadata.encoding),
new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull)
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson))))
.getOrElse(UTF8String.fromString("")))
val columnMetadataStruct = new GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._

class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/"

describe("GeoParquet Metadata tests") {
it("Reading GeoParquet Metadata") {
Expand All @@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
columnMetadata.getAs[String]("encoding") == "WKB" &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) &&
columnMetadata.getAs[String]("crs").nonEmpty
columnMetadata.getAs[String]("crs").nonEmpty &&
columnMetadata.getAs[String]("crs") != "null"
}
})
}
Expand All @@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
assert(metadataArray.forall(_.getAs[String]("columns") == null))
}

it("Read GeoParquet without CRS") {
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "")
.mode("overwrite").save(geoParquetSavePath)
val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
val row = dfMeta.collect()(0)
val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "")
}

it("Read GeoParquet with null CRS") {
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "null")
.mode("overwrite").save(geoParquetSavePath)
val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
val row = dfMeta.collect()(0)
val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "null")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
UTF8String.fromString(columnMetadata.encoding),
new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull)
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson))))
.getOrElse(UTF8String.fromString("")))
val columnMetadataStruct = new GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._

class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/"

describe("GeoParquet Metadata tests") {
it("Reading GeoParquet Metadata") {
Expand All @@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
columnMetadata.getAs[String]("encoding") == "WKB" &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) &&
columnMetadata.getAs[String]("crs").nonEmpty
columnMetadata.getAs[String]("crs").nonEmpty &&
columnMetadata.getAs[String]("crs") != "null"
}
})
}
Expand All @@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
assert(metadataArray.forall(_.getAs[String]("columns") == null))
}

it("Read GeoParquet without CRS") {
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "")
.mode("overwrite").save(geoParquetSavePath)
val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
val row = dfMeta.collect()(0)
val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "")
}

it("Read GeoParquet with null CRS") {
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation + "/example-1.0.0-beta.1.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "null")
.mode("overwrite").save(geoParquetSavePath)
val dfMeta = sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
val row = dfMeta.collect()(0)
val metadata = row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "null")
}
}
}

0 comments on commit bedea9a

Please sign in to comment.