Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support gis function #1465

Merged
merged 4 commits into from Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 168 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -152,6 +152,8 @@ opentelemetry_sdk = { version = "0.19.0" }
opentelemetry_api = { version = "0.19.0" }
dateparser = "0.1.7"
run_script = "0.10.1"
geo = "0.26.0"
geozero = "0.11.0"

[workspace.package]
edition = "2021"
Expand Down
40 changes: 30 additions & 10 deletions common/models/src/gis/data_type.rs
@@ -1,4 +1,5 @@
use std::fmt::Display;
use std::str::FromStr;

use serde::{Deserialize, Serialize};

Expand All @@ -24,24 +25,43 @@ impl Display for Geometry {
#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Eq, Hash)]
pub enum GeometryType {
Point,
Linestring,
LineString,
Polygon,
Multipoint,
Multilinestring,
Multipolygon,
Geometrycollection,
MultiPoint,
MultiLineString,
MultiPolygon,
GeometryCollection,
}

impl Display for GeometryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Point => write!(f, "POINT"),
Self::Linestring => write!(f, "LINESTRING"),
Self::LineString => write!(f, "LINESTRING"),
Self::Polygon => write!(f, "POLYGON"),
Self::Multipoint => write!(f, "MULTIPOINT"),
Self::Multilinestring => write!(f, "MULTILINESTRING"),
Self::Multipolygon => write!(f, "MULTIPOLYGON"),
Self::Geometrycollection => write!(f, "GEOMETRYCOLLECTION"),
Self::MultiPoint => write!(f, "MULTIPOINT"),
Self::MultiLineString => write!(f, "MULTILINESTRING"),
Self::MultiPolygon => write!(f, "MULTIPOLYGON"),
Self::GeometryCollection => write!(f, "GEOMETRYCOLLECTION"),
}
}
}

impl FromStr for GeometryType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"POINT" => Ok(Self::Point),
"LINESTRING" => Ok(Self::LineString),
"POLYGON" => Ok(Self::Polygon),
"MULTIPOINT" => Ok(Self::MultiPoint),
"MULTILINESTRING" => Ok(Self::MultiLineString),
"MULTIPOLYGON" => Ok(Self::MultiPolygon),
"GEOMETRYCOLLECTION" => Ok(Self::GeometryCollection),
other => {
Err(format!("Invalid geometry type: {}, excepted: POINT | LINESTRING | POLYGON | MULTIPOINT | MULTILINESTRING | MULTIPOLYGON | GEOMETRYCOLLECTION", other))
}
}
}
}
12 changes: 8 additions & 4 deletions common/models/src/schema.rs
Expand Up @@ -372,7 +372,8 @@ pub struct TableColumn {
pub encoding: Encoding,
}

pub const SRID_META_KEY: &str = "srid";
pub const GIS_SRID_META_KEY: &str = "gis.srid";
pub const GIS_SUB_TYPE_META_KEY: &str = "gis.sub_type";

impl From<&TableColumn> for ArrowField {
fn from(column: &TableColumn) -> Self {
Expand All @@ -381,8 +382,11 @@ impl From<&TableColumn> for ArrowField {
map.insert(TAG.to_string(), column.column_type.is_tag().to_string());

// 通过 SRID_META_KEY 标记 Geometry 类型的列
if let ColumnType::Field(ValueType::Geometry(Geometry { srid, .. })) = column.column_type {
map.insert(SRID_META_KEY.to_string(), srid.to_string());
if let ColumnType::Field(ValueType::Geometry(Geometry { srid, sub_type })) =
column.column_type
{
map.insert(GIS_SUB_TYPE_META_KEY.to_string(), sub_type.to_string());
map.insert(GIS_SRID_META_KEY.to_string(), srid.to_string());
}

let nullable = column.nullable();
Expand Down Expand Up @@ -536,7 +540,7 @@ impl ColumnType {
Self::Field(ValueType::Integer) => 1,
Self::Field(ValueType::Unsigned) => 2,
Self::Field(ValueType::Boolean) => 3,
Self::Field(ValueType::String) => 4,
Self::Field(ValueType::String) | Self::Field(ValueType::Geometry(_)) => 4,
_ => 0,
}
}
Expand Down
2 changes: 2 additions & 0 deletions query_server/query/Cargo.toml
Expand Up @@ -48,6 +48,8 @@ async-backtrace = { workspace = true, optional = true }
bincode = { workspace = true }
dirs = { workspace = true }
once_cell = { workspace = true }
geo = { workspace = true }
geozero = { workspace = true, features = ["with-wkb"]}

[features]
default = []
Expand Down
2 changes: 2 additions & 0 deletions query_server/query/src/data_source/mod.rs
@@ -1,6 +1,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Result as DFResult;
use datafusion::execution::context::{SessionState, TaskContext};
Expand Down Expand Up @@ -48,6 +49,7 @@ pub trait RecordBatchSink: Send + Sync {
}

pub trait RecordBatchSinkProvider: Send + Sync {
fn schema(&self) -> SchemaRef;
fn create_batch_sink(
&self,
context: Arc<TaskContext>,
Expand Down