Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Geometry data type #1463

Merged
merged 1 commit into from Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions common/models/src/gis/data_type.rs
@@ -0,0 +1,47 @@
use std::fmt::Display;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Eq, Hash)]
#[non_exhaustive]
pub struct Geometry {
pub sub_type: GeometryType,
pub srid: i16,
}

impl Geometry {
pub fn new_with_srid(sub_type: GeometryType, srid: i16) -> Self {
Self { sub_type, srid }
}
}

impl Display for Geometry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Geometry({}, {})", self.sub_type, self.srid)
}
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Eq, Hash)]
pub enum GeometryType {
Point,
Linestring,
Polygon,
Multipoint,
Multilinestring,
Multipolygon,
Geometrycollection,
}

impl Display for GeometryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Point => write!(f, "POINT"),
Self::Linestring => write!(f, "LINESTRING"),
Self::Polygon => write!(f, "POLYGON"),
Self::Multipoint => write!(f, "MULTIPOINT"),
Self::Multilinestring => write!(f, "MULTILINESTRING"),
Self::Multipolygon => write!(f, "MULTIPOLYGON"),
Self::Geometrycollection => write!(f, "GEOMETRYCOLLECTION"),
}
}
}
1 change: 1 addition & 0 deletions common/models/src/gis/mod.rs
@@ -0,0 +1 @@
pub mod data_type;
3 changes: 2 additions & 1 deletion common/models/src/lib.rs
Expand Up @@ -6,7 +6,7 @@ use parking_lot::RwLock;
pub use record_batch::*;
pub use series_info::SeriesKey;
pub use tag::Tag;
pub use value_type::ValueType;
pub use value_type::{PhysicalDType, ValueType};

pub mod codec;
pub mod consistency_level;
Expand All @@ -24,6 +24,7 @@ pub mod arrow_array;
pub mod arrow;
pub mod auth;
pub mod duration;
pub mod gis;
pub mod object_reference;
pub mod oid;
pub mod predicate;
Expand Down
109 changes: 60 additions & 49 deletions common/models/src/schema.rs
Expand Up @@ -7,6 +7,7 @@
//! - Column #3
//! - Column #4

use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::mem::size_of_val;
Expand All @@ -33,12 +34,14 @@ use derive_builder::Builder;
use serde::{Deserialize, Serialize};

use crate::codec::Encoding;
use crate::gis::data_type::Geometry;
use crate::oid::{Identifier, Oid};
use crate::utils::{
DAY_MICROS, DAY_MILLS, DAY_NANOS, HOUR_MICROS, HOUR_MILLS, HOUR_NANOS, MINUTES_MICROS,
MINUTES_MILLS, MINUTES_NANOS,
};
use crate::{ColumnId, Error, SchemaId, Timestamp, ValueType};
use crate::value_type::ValueType;
use crate::{ColumnId, Error, PhysicalDType, SchemaId, Timestamp};

pub type TskvTableSchemaRef = Arc<TskvTableSchema>;

Expand Down Expand Up @@ -171,11 +174,7 @@ impl Default for TskvTableSchema {

impl TskvTableSchema {
pub fn to_arrow_schema(&self) -> SchemaRef {
let fields: Vec<ArrowField> = self
.columns
.iter()
.map(|field| field.to_arrow_field())
.collect();
let fields: Vec<ArrowField> = self.columns.iter().map(|field| field.into()).collect();
Arc::new(Schema::new(fields))
}

Expand Down Expand Up @@ -373,18 +372,32 @@ pub struct TableColumn {
pub encoding: Encoding,
}

impl From<TableColumn> for ArrowField {
fn from(column: TableColumn) -> Self {
pub const SRID_META_KEY: &str = "srid";

impl From<&TableColumn> for ArrowField {
fn from(column: &TableColumn) -> Self {
let mut map = HashMap::new();
map.insert(FIELD_ID.to_string(), column.id.to_string());
map.insert(TAG.to_string(), column.column_type.is_tag().to_string());

// 通过 SRID_META_KEY 标记 Geometry 类型的列
if let ColumnType::Field(ValueType::Geometry(Geometry { srid, .. })) = column.column_type {
map.insert(SRID_META_KEY.to_string(), srid.to_string());
}

let nullable = column.nullable();
let mut f = ArrowField::new(&column.name, column.column_type.into(), nullable);
let mut f = ArrowField::new(&column.name, column.column_type.clone().into(), nullable);
f.set_metadata(map);
f
}
}

impl From<TableColumn> for ArrowField {
fn from(column: TableColumn) -> Self {
Self::from(&column)
}
}

impl From<TableColumn> for Column {
fn from(field: TableColumn) -> Self {
Column::from_name(field.name)
Expand Down Expand Up @@ -446,15 +459,6 @@ impl TableColumn {
Ok(column)
}

pub fn to_arrow_field(&self) -> ArrowField {
let mut f = ArrowField::new(&self.name, self.column_type.clone().into(), self.nullable());
let mut map = HashMap::new();
map.insert(FIELD_ID.to_string(), self.id.to_string());
map.insert(TAG.to_string(), self.column_type.is_tag().to_string());
f.set_metadata(map);
f
}

pub fn encoding_valid(&self) -> bool {
if let ColumnType::Field(ValueType::Float) = self.column_type {
return self.encoding.is_double_encoding();
Expand Down Expand Up @@ -486,26 +490,12 @@ impl From<ColumnType> for ArrowDataType {
ColumnType::Field(ValueType::Unsigned) => ArrowDataType::UInt64,
ColumnType::Field(ValueType::String) => ArrowDataType::Utf8,
ColumnType::Field(ValueType::Boolean) => ArrowDataType::Boolean,
ColumnType::Field(ValueType::Geometry(_)) => ArrowDataType::Utf8,
_ => ArrowDataType::Null,
}
}
}

impl TryFrom<ArrowDataType> for ColumnType {
type Error = &'static str;

fn try_from(value: ArrowDataType) -> Result<Self, Self::Error> {
match value {
ArrowDataType::Float64 => Ok(Self::Field(ValueType::Float)),
ArrowDataType::Int64 => Ok(Self::Field(ValueType::Integer)),
ArrowDataType::UInt64 => Ok(Self::Field(ValueType::Unsigned)),
ArrowDataType::Utf8 => Ok(Self::Field(ValueType::String)),
ArrowDataType::Boolean => Ok(Self::Field(ValueType::Boolean)),
_ => Err("Error field type not supported"),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub enum ColumnType {
Tag,
Expand All @@ -514,10 +504,6 @@ pub enum ColumnType {
}

impl ColumnType {
pub fn default_time() -> Self {
Self::Time(TimeUnit::Nanosecond)
}

pub fn as_str(&self) -> &'static str {
match self {
Self::Tag => "TAG",
Expand Down Expand Up @@ -566,22 +552,23 @@ impl ColumnType {
}
}

pub fn to_sql_type_str(&self) -> &'static str {
pub fn to_sql_type_str(&self) -> Cow<'static, str> {
match self {
Self::Tag => "STRING",
Self::Tag => "STRING".into(),
Self::Time(unit) => match unit {
TimeUnit::Second => "TIMESTAMP(SECOND)",
TimeUnit::Millisecond => "TIMESTAMP(MILLISECOND)",
TimeUnit::Microsecond => "TIMESTAMP(MICROSECOND)",
TimeUnit::Nanosecond => "TIMESTAMP(NANOSECOND)",
TimeUnit::Second => "TIMESTAMP(SECOND)".into(),
TimeUnit::Millisecond => "TIMESTAMP(MILLISECOND)".into(),
TimeUnit::Microsecond => "TIMESTAMP(MICROSECOND)".into(),
TimeUnit::Nanosecond => "TIMESTAMP(NANOSECOND)".into(),
},
Self::Field(value_type) => match value_type {
ValueType::String => "STRING",
ValueType::Integer => "BIGINT",
ValueType::Unsigned => "BIGINT UNSIGNED",
ValueType::Float => "DOUBLE",
ValueType::Boolean => "BOOLEAN",
ValueType::Unknown => "UNKNOWN",
ValueType::String => "STRING".into(),
ValueType::Integer => "BIGINT".into(),
ValueType::Unsigned => "BIGINT UNSIGNED".into(),
ValueType::Float => "DOUBLE".into(),
ValueType::Boolean => "BOOLEAN".into(),
ValueType::Unknown => "UNKNOWN".into(),
ValueType::Geometry(geo) => geo.to_string().into(),
},
}
}
Expand Down Expand Up @@ -1246,3 +1233,27 @@ impl From<ScalarValueForkDF> for ScalarValue {
unsafe { std::mem::transmute::<ScalarValueForkDF, Self>(value) }
}
}

/// column type for tskv
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub enum PhysicalCType {
Tag,
Time(TimeUnit),
Field(PhysicalDType),
}

impl PhysicalCType {
pub fn default_time() -> Self {
Self::Time(TimeUnit::Nanosecond)
}
}

impl ColumnType {
pub fn to_physical_type(&self) -> PhysicalCType {
match self {
Self::Tag => PhysicalCType::Tag,
Self::Time(unit) => PhysicalCType::Time(unit.clone()),
Self::Field(value_type) => PhysicalCType::Field(value_type.to_physical_type()),
}
}
}
77 changes: 46 additions & 31 deletions common/models/src/value_type.rs
Expand Up @@ -2,6 +2,8 @@ use std::fmt::{Display, Formatter};

use serde::{Deserialize, Serialize};

use crate::gis::data_type::Geometry;

#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Eq, Hash)]
pub enum ValueType {
Unknown,
Expand All @@ -10,35 +12,48 @@ pub enum ValueType {
Unsigned,
Boolean,
String,
Geometry(Geometry),
}

/// data type for tskv
#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone, Eq, Hash)]
pub enum PhysicalDType {
Unknown,
Float,
Integer,
Unsigned,
Boolean,
String,
}

impl ValueType {
pub fn to_fb_type(&self) -> protos::models::FieldType {
match *self {
ValueType::Float => protos::models::FieldType::Float,
ValueType::Integer => protos::models::FieldType::Integer,
ValueType::Unsigned => protos::models::FieldType::Unsigned,
ValueType::Boolean => protos::models::FieldType::Boolean,
ValueType::String => protos::models::FieldType::String,
ValueType::Unknown => protos::models::FieldType::Unknown,
pub fn to_physical_type(&self) -> PhysicalDType {
match self {
Self::Unknown => PhysicalDType::Unknown,
Self::Float => PhysicalDType::Float,
Self::Integer => PhysicalDType::Integer,
Self::Unsigned => PhysicalDType::Unsigned,
Self::Boolean => PhysicalDType::Boolean,
Self::String => PhysicalDType::String,
Self::Geometry(_) => PhysicalDType::String,
}
}
}

impl Display for ValueType {
impl Display for PhysicalDType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ValueType::Unknown => f.write_str("Unknown"),
ValueType::Float => f.write_str("Float"),
ValueType::Integer => f.write_str("Integer"),
ValueType::Unsigned => f.write_str("Unsigned"),
ValueType::Boolean => f.write_str("Boolean"),
ValueType::String => f.write_str("String"),
PhysicalDType::Unknown => f.write_str("Unknown"),
PhysicalDType::Float => f.write_str("Float"),
PhysicalDType::Integer => f.write_str("Integer"),
PhysicalDType::Unsigned => f.write_str("Unsigned"),
PhysicalDType::Boolean => f.write_str("Boolean"),
PhysicalDType::String => f.write_str("String"),
}
}
}

impl From<u8> for ValueType {
impl From<u8> for PhysicalDType {
fn from(value: u8) -> Self {
match value {
0 => Self::Float,
Expand All @@ -51,28 +66,28 @@ impl From<u8> for ValueType {
}
}

impl From<ValueType> for u8 {
fn from(value: ValueType) -> Self {
impl From<PhysicalDType> for u8 {
fn from(value: PhysicalDType) -> Self {
match value {
ValueType::Float => 0,
ValueType::Integer => 1,
ValueType::Boolean => 2,
ValueType::String => 3,
ValueType::Unsigned => 4,
ValueType::Unknown => 5,
PhysicalDType::Float => 0,
PhysicalDType::Integer => 1,
PhysicalDType::Boolean => 2,
PhysicalDType::String => 3,
PhysicalDType::Unsigned => 4,
PhysicalDType::Unknown => 5,
}
}
}

impl From<protos::models::FieldType> for ValueType {
impl From<protos::models::FieldType> for PhysicalDType {
fn from(t: protos::models::FieldType) -> Self {
match t {
protos::models::FieldType::Float => ValueType::Float,
protos::models::FieldType::Integer => ValueType::Integer,
protos::models::FieldType::Unsigned => ValueType::Unsigned,
protos::models::FieldType::Boolean => ValueType::Boolean,
protos::models::FieldType::String => ValueType::String,
_ => ValueType::Unknown,
protos::models::FieldType::Float => PhysicalDType::Float,
protos::models::FieldType::Integer => PhysicalDType::Integer,
protos::models::FieldType::Unsigned => PhysicalDType::Unsigned,
protos::models::FieldType::Boolean => PhysicalDType::Boolean,
protos::models::FieldType::String => PhysicalDType::String,
_ => PhysicalDType::Unknown,
}
}
}
6 changes: 3 additions & 3 deletions common/protocol_parser/src/lines_convert.rs
Expand Up @@ -6,8 +6,8 @@ use datafusion::arrow::array::{
};
use datafusion::arrow::datatypes::{SchemaRef, TimeUnit};
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use models::schema::{ColumnType, TskvTableSchemaRef};
use models::ValueType;
use models::schema::{PhysicalCType as ColumnType, TskvTableSchemaRef};
use models::PhysicalDType as ValueType;
use protos::models::{
Column as FbColumn, ColumnBuilder, ColumnType as FbColumnType, FieldType, PointsBuilder,
TableBuilder, ValuesBuilder,
Expand Down Expand Up @@ -368,7 +368,7 @@ pub fn arrow_array_to_points(
let column_schema = table_schema.column(col_name).ok_or_else(|| Error::Common {
content: format!("column {} not found in table {}", col_name, table_name),
})?;
let fb_column = match column_schema.column_type {
let fb_column = match column_schema.column_type.to_physical_type() {
ColumnType::Tag => build_string_column(column, col_name, FbColumnType::Tag, &mut fbb)?,
ColumnType::Time(ref time_unit) => {
build_timestamp_column(column, col_name, time_unit, &mut fbb)?
Expand Down