Skip to content

Commit

Permalink
feat: check geometry column data format
Browse files Browse the repository at this point in the history
  • Loading branch information
yukkit committed Sep 4, 2023
1 parent 253b72e commit 83e19e8
Show file tree
Hide file tree
Showing 16 changed files with 853 additions and 18 deletions.
40 changes: 30 additions & 10 deletions common/models/src/gis/data_type.rs
@@ -1,4 +1,5 @@
use std::fmt::Display;
use std::str::FromStr;

use serde::{Deserialize, Serialize};

Expand All @@ -24,24 +25,43 @@ impl Display for Geometry {
#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Eq, Hash)]
pub enum GeometryType {
Point,
Linestring,
LineString,
Polygon,
Multipoint,
Multilinestring,
Multipolygon,
Geometrycollection,
MultiPoint,
MultiLineString,
MultiPolygon,
GeometryCollection,
}

impl Display for GeometryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Point => write!(f, "POINT"),
Self::Linestring => write!(f, "LINESTRING"),
Self::LineString => write!(f, "LINESTRING"),
Self::Polygon => write!(f, "POLYGON"),
Self::Multipoint => write!(f, "MULTIPOINT"),
Self::Multilinestring => write!(f, "MULTILINESTRING"),
Self::Multipolygon => write!(f, "MULTIPOLYGON"),
Self::Geometrycollection => write!(f, "GEOMETRYCOLLECTION"),
Self::MultiPoint => write!(f, "MULTIPOINT"),
Self::MultiLineString => write!(f, "MULTILINESTRING"),
Self::MultiPolygon => write!(f, "MULTIPOLYGON"),
Self::GeometryCollection => write!(f, "GEOMETRYCOLLECTION"),
}
}
}

impl FromStr for GeometryType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"POINT" => Ok(Self::Point),
"LINESTRING" => Ok(Self::LineString),
"POLYGON" => Ok(Self::Polygon),
"MULTIPOINT" => Ok(Self::MultiPoint),
"MULTILINESTRING" => Ok(Self::MultiLineString),
"MULTIPOLYGON" => Ok(Self::MultiPolygon),
"GEOMETRYCOLLECTION" => Ok(Self::GeometryCollection),
other => {
Err(format!("Invalid geometry type: {}, excepted: POINT | LINESTRING | POLYGON | MULTIPOINT | MULTILINESTRING | MULTIPOLYGON | GEOMETRYCOLLECTION", other))
}
}
}
}
10 changes: 7 additions & 3 deletions common/models/src/schema.rs
Expand Up @@ -372,7 +372,8 @@ pub struct TableColumn {
pub encoding: Encoding,
}

pub const SRID_META_KEY: &str = "srid";
pub const GIS_SRID_META_KEY: &str = "gis.srid";
pub const GIS_SUB_TYPE_META_KEY: &str = "gis.sub_type";

impl From<&TableColumn> for ArrowField {
fn from(column: &TableColumn) -> Self {
Expand All @@ -381,8 +382,11 @@ impl From<&TableColumn> for ArrowField {
map.insert(TAG.to_string(), column.column_type.is_tag().to_string());

// 通过 SRID_META_KEY 标记 Geometry 类型的列
if let ColumnType::Field(ValueType::Geometry(Geometry { srid, .. })) = column.column_type {
map.insert(SRID_META_KEY.to_string(), srid.to_string());
if let ColumnType::Field(ValueType::Geometry(Geometry { srid, sub_type })) =
column.column_type
{
map.insert(GIS_SUB_TYPE_META_KEY.to_string(), sub_type.to_string());
map.insert(GIS_SRID_META_KEY.to_string(), srid.to_string());
}

let nullable = column.nullable();
Expand Down
2 changes: 2 additions & 0 deletions query_server/query/src/data_source/mod.rs
@@ -1,6 +1,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Result as DFResult;
use datafusion::execution::context::{SessionState, TaskContext};
Expand Down Expand Up @@ -48,6 +49,7 @@ pub trait RecordBatchSink: Send + Sync {
}

pub trait RecordBatchSinkProvider: Send + Sync {
fn schema(&self) -> SchemaRef;
fn create_batch_sink(
&self,
context: Arc<TaskContext>,
Expand Down
7 changes: 7 additions & 0 deletions query_server/query/src/data_source/sink/obj_store/mod.rs
Expand Up @@ -62,6 +62,7 @@ pub struct ObjectStoreSinkProvider {
object_store: Arc<DynObjectStore>,
serializer: Arc<DynRecordBatchSerializer>,
file_extension: String,
schema: SchemaRef,
}

impl ObjectStoreSinkProvider {
Expand All @@ -70,17 +71,23 @@ impl ObjectStoreSinkProvider {
object_store: Arc<DynObjectStore>,
serializer: Arc<DynRecordBatchSerializer>,
file_extension: String,
schema: SchemaRef,
) -> Self {
Self {
location,
object_store,
serializer,
file_extension,
schema,
}
}
}

impl RecordBatchSinkProvider for ObjectStoreSinkProvider {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn create_batch_sink(
&self,
context: Arc<TaskContext>,
Expand Down
5 changes: 5 additions & 0 deletions query_server/query/src/data_source/sink/tskv.rs
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use coordinator::service::CoordinatorRef;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{self, Count, ExecutionPlanMetricsSet, MetricBuilder};
Expand Down Expand Up @@ -77,6 +78,10 @@ impl TskvRecordBatchSinkProvider {
}

impl RecordBatchSinkProvider for TskvRecordBatchSinkProvider {
fn schema(&self) -> SchemaRef {
self.schema.to_arrow_schema()
}

fn create_batch_sink(
&self,
context: Arc<TaskContext>,
Expand Down
Expand Up @@ -57,6 +57,7 @@ impl WriteExecExt for ListingTable {
object_store,
serializer,
file_extension,
input.schema(),
));

Ok(Arc::new(TableWriterExec::new(
Expand Down
104 changes: 104 additions & 0 deletions query_server/query/src/extension/physical/optimizer_rule/add_assert.rs
@@ -0,0 +1,104 @@
use std::str::FromStr;
use std::sync::Arc;

use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::Result as DFResult;
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use models::gis::data_type::GeometryType;
use models::schema::GIS_SUB_TYPE_META_KEY;
use spi::QueryError;

use crate::extension::physical::plan_node::assert::geom_write::AssertGeomType;
use crate::extension::physical::plan_node::assert::AssertExec;
use crate::extension::physical::plan_node::table_writer::TableWriterExec;
use crate::extension::utils::downcast_execution_plan;

#[non_exhaustive]
pub struct AddAssertExec {}

impl AddAssertExec {
pub fn new() -> Self {
Self {}
}
}

impl Default for AddAssertExec {
fn default() -> Self {
Self::new()
}
}

impl PhysicalOptimizerRule for AddAssertExec {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> DFResult<Arc<dyn ExecutionPlan>> {
plan.transform_down(&|plan| {
if let Some(exec) = downcast_execution_plan::<TableWriterExec>(plan.as_ref()) {
if let Some(new_child) = add_table_write_asserter_if_necessary(exec)? {
let new_plan = plan.with_new_children(vec![new_child])?;
return Ok(Transformed::Yes(new_plan));
}
}

Ok(Transformed::No(plan))
})
}

fn name(&self) -> &str {
"add_assert_exec"
}

fn schema_check(&self) -> bool {
true
}
}

fn add_table_write_asserter_if_necessary(
exec: &TableWriterExec,
) -> DFResult<Option<Arc<dyn ExecutionPlan>>> {
let schema = exec.sink_schema();
let child = exec.children()[0].clone();

let geoms_with_idx = schema
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
match field
.metadata()
.get(GIS_SUB_TYPE_META_KEY)
.map(|e| GeometryType::from_str(e))
{
Some(Ok(sub_type)) => {
// The target table for the write operation contains a column of type geometry
Ok(Some((sub_type, idx)))
}
Some(Err(err)) => {
// Contains a column of type geometry, but the type is not recognized
Err(DataFusionError::External(Box::new(
QueryError::InvalidGeometryType { reason: err },
)))
}
None => {
// Not contain a column of type geometry
Ok(None)
}
}
.transpose()
})
.collect::<DFResult<Vec<_>>>()?;

if geoms_with_idx.is_empty() {
return Ok(None);
}

let assert_expr = Arc::new(AssertGeomType::new(geoms_with_idx));
let new_child = Arc::new(AssertExec::new(assert_expr, child));

Ok(Some(new_child))
}
@@ -1,3 +1,4 @@
//! physical plan optimizer rule
pub mod add_assert;
pub mod add_state_store;
pub mod add_traced_proxy;
@@ -0,0 +1,92 @@
use std::fmt::{Debug, Display};

use datafusion::arrow::array::{downcast_array, Array, StringArray};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Result as DFResult;
use datafusion::error::DataFusionError;
use geo::Geometry;
use geozero::wkt::WktStr;
use geozero::ToGeo;
use models::gis::data_type::GeometryType;
use spi::QueryError;

use super::AssertExpr;

#[derive(Debug)]
pub struct AssertGeomType {
geom_with_idx: Vec<(GeometryType, usize)>,
}

impl AssertGeomType {
pub fn new(geom_with_idx: Vec<(GeometryType, usize)>) -> Self {
Self { geom_with_idx }
}
}

impl AssertExpr for AssertGeomType {
fn assert(&self, batch: &RecordBatch) -> DFResult<()> {
for (sub_type, idx) in &self.geom_with_idx {
let column = batch.column(*idx).as_ref();
check_wkt(sub_type, column)?;
}

Ok(())
}
}

impl Display for AssertGeomType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = self
.geom_with_idx
.iter()
.map(|(_, idx)| idx.to_string())
.collect::<Vec<_>>()
.join(", ");

write!(f, "AssertGeomType({})", str)
}
}

macro_rules! define_check_wkt {
($(
$sub_type:ident $(= $string_keyword:expr)?
),*) => {
fn check_wkt(sub_type: &GeometryType, array: &dyn Array) -> DFResult<()> {
let str_array = downcast_array::<StringArray>(array);

match sub_type {
$(GeometryType::$sub_type => {
for ele in str_array.iter().flatten() {
let geom = WktStr(ele).to_geo().map_err(|err| {
DataFusionError::External(Box::new(QueryError::InvalidGeometryType {
reason: format!("{}, expect {}, got {:?}", err, stringify!($sub_type), ele),
}))
})?;

if let Geometry::$sub_type(_) = &geom {
continue;
}

return Err(DataFusionError::External(Box::new(
QueryError::InvalidGeometryType {
reason: format!("expect {}, got {:?}", stringify!($sub_type), geom),
},
)));
}
}),*
}

Ok(())
}
};
}

define_check_wkt!(
Point,
LineString,
Polygon,
MultiPoint,
MultiLineString,
MultiPolygon,
GeometryCollection
);

0 comments on commit 83e19e8

Please sign in to comment.