Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Duration Field & FieldType support #1374

Merged
merged 41 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5e5bf51
feat: added u128, i128 FieldType and Field
chloeminkyung Mar 27, 2023
8ec242c
feat: added u128, i128 FieldType and Field
chloeminkyung Mar 27, 2023
00d0314
feat: added u128, i128 FieldType and Field
chloeminkyung Mar 27, 2023
4f0809f
feat: added u128, i128 FieldType and Field
chloeminkyung Mar 27, 2023
4897779
feat: added u128, i128 FieldType and Field
chloeminkyung Mar 27, 2023
547261b
feat: added u128, i128 FieldType and Field
chloeminkyung Mar 27, 2023
4388f87
feat: support u128 and i128 in sql ops
chloeminkyung Mar 27, 2023
cee5c80
feat: support u128 and i128 in sql ops
chloeminkyung Mar 27, 2023
de0db26
feat: support u128 and i128 in sql ops
chloeminkyung Mar 27, 2023
afc0fe2
feat: support u128 and i128 in sql ops
chloeminkyung Mar 27, 2023
77f81f5
merge with main
chloeminkyung Mar 30, 2023
4bb292f
fix: lint
chloeminkyung Mar 30, 2023
4f47990
fix: added division, modulo error handler in mathematical.rs
chloeminkyung Mar 30, 2023
933569f
fix: added division, modulo error handler in mathematical.rs
chloeminkyung Mar 30, 2023
045c6ca
Merge branch 'main' into feat/u128_i128
chloeminkyung Mar 31, 2023
3890d6e
Merge branch 'main' into feat/u128_i128
chloeminkyung Apr 2, 2023
fff473b
added unit tests
chloeminkyung Apr 2, 2023
eafbdda
added unit tests
chloeminkyung Apr 2, 2023
4802678
Merge branch 'main' into feat/u128_i128
chloeminkyung Apr 4, 2023
0763c69
addressed comments
chloeminkyung Apr 4, 2023
d4b6985
addressed comments
chloeminkyung Apr 4, 2023
5ee727e
addressed comments
chloeminkyung Apr 4, 2023
2f251e7
Merge remote-tracking branch 'chloe/feat/u128_i128' into feat/duration
chloeminkyung Apr 4, 2023
d475c70
feat: `Duration` support
chloeminkyung Apr 4, 2023
182f3ab
feat: `Duration` support
chloeminkyung Apr 4, 2023
3c26445
feat: `Duration` support
chloeminkyung Apr 4, 2023
ee7452f
feat: `Duration` support
chloeminkyung Apr 5, 2023
b52e037
feat: `Duration` support
chloeminkyung Apr 5, 2023
9a1dad9
feat: `Duration` support
chloeminkyung Apr 5, 2023
866023d
feat: `Duration` support
chloeminkyung Apr 5, 2023
cca73ea
feat: `Duration` support
chloeminkyung Apr 5, 2023
91196e2
feat: `Duration` support
chloeminkyung Apr 5, 2023
d9a3407
Merge remote-tracking branch 'origin/main' into feat/duration
chloeminkyung Apr 5, 2023
28cc4b3
feat: `Duration` support
chloeminkyung Apr 5, 2023
9000b3a
feat: `Duration` support
chloeminkyung Apr 5, 2023
1b1fcd5
feat: `Duration` support
chloeminkyung Apr 5, 2023
81d0ce1
feat: `Duration` support
chloeminkyung Apr 5, 2023
f2f90e3
feat: `Duration` mathematical support
chloeminkyung Apr 6, 2023
f3579fa
feat: `Duration` mathematical support
chloeminkyung Apr 6, 2023
3a9dca7
Merge branch 'main' into feat/duration
chloeminkyung Apr 6, 2023
68217b7
feat: `Duration` mathematical support
chloeminkyung Apr 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dozer-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Following table shows how Dozer type is converted to JSON type in REST API.
| date | string | "%Y-%m-%d" format |
| bson | array of number | Every number is between 0-255 and represents a byte. |
| point | object | { x: number, y: number } |
| duration | object | { value: string, time_unit: string } |

## gRPC

Expand All @@ -41,3 +42,4 @@ Following table shows how Dozer type is converted to gRPC type in gRPC API.
| date | string | "%Y-%m-%d" format |
| bson | bytes | |
| point | Point | { x: double, y: double } |
| duration | Duration | { value: string, time_unit: string } |
3 changes: 2 additions & 1 deletion dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ fn generate_secondary_indexes(fields: &[FieldDefinition]) -> Vec<IndexDefinition
| FieldType::Decimal
| FieldType::Timestamp
| FieldType::Date
| FieldType::Point => vec![IndexDefinition::SortedInverted(vec![idx])],
| FieldType::Point
| FieldType::Duration => vec![IndexDefinition::SortedInverted(vec![idx])],

// Create sorted inverted and full text indexes for string fields.
FieldType::String => vec![
Expand Down
11 changes: 10 additions & 1 deletion dozer-api/src/generator/oapi/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::utils::{
use dozer_types::indexmap::{self, IndexMap};
use dozer_types::serde_json;
use dozer_types::serde_json::Map;
use dozer_types::types::IndexDefinition;
use dozer_types::types::{IndexDefinition, TimeUnit};
use dozer_types::{models::api_endpoint::ApiEndpoint, types::FieldType};
use openapiv3::*;
use serde_json::{json, Value};
Expand Down Expand Up @@ -50,6 +50,15 @@ impl<'a> OpenApiGenerator<'a> {
m.insert("y".to_string(), Value::from(4.4));
Value::Object(m)
}
FieldType::Duration => {
let mut m = Map::new();
m.insert("val".to_string(), Value::from("3.3i128"));
m.insert(
"unit".to_string(),
Value::from(TimeUnit::Nanoseconds.to_string()),
);
Value::Object(m)
}
};
json!({ name: val })
} else {
Expand Down
1 change: 1 addition & 0 deletions dozer-api/src/generator/oapi/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ fn convert_cache_type_to_schema_type(field_type: dozer_types::types::FieldType)
max_properties: None,
})
}
FieldType::Duration => todo!(),
}
}

Expand Down
49 changes: 30 additions & 19 deletions dozer-api/src/generator/protoc/generator/implementation.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::errors::GenerationError;
use crate::errors::GenerationError::ServiceNotFound;
use crate::generator::protoc::generator::{
CountMethodDesc, DecimalDesc, EventDesc, OnEventMethodDesc, PointDesc, QueryMethodDesc,
RecordWithIdDesc, TokenMethodDesc, TokenResponseDesc,
CountMethodDesc, DecimalDesc, DurationDesc, EventDesc, OnEventMethodDesc, PointDesc,
QueryMethodDesc, RecordWithIdDesc, TokenMethodDesc, TokenResponseDesc,
};
use dozer_types::log::error;
use dozer_types::models::api_security::ApiSecurity;
Expand All @@ -17,6 +17,7 @@ use std::path::{Path, PathBuf};
use super::{CountResponseDesc, QueryResponseDesc, RecordDesc, ServiceDesc};

const POINT_TYPE_CLASS: &str = "dozer.types.PointType";
const DURATION_TYPE_CLASS: &str = "dozer.types.DurationType";
const DECIMAL_TYPE_CLASS: &str = "dozer.types.RustDecimal";
const TIMESTAMP_TYPE_CLASS: &str = "google.protobuf.Timestamp";

Expand Down Expand Up @@ -172,23 +173,32 @@ impl<'a> ProtoGeneratorImpl<'a> {
let pv = point_values;
if let Some(decimal_values) = descriptor.get_message_by_name(DECIMAL_TYPE_CLASS)
{
let dv = decimal_values;
Ok(RecordDesc {
message,
version_field,
point_field: PointDesc {
message: pv.clone(),
x: get_field(&pv, "x")?,
y: get_field(&pv, "y")?,
},
decimal_field: DecimalDesc {
message: dv.clone(),
flags: get_field(&dv, "flags")?,
lo: get_field(&dv, "lo")?,
mid: get_field(&dv, "mid")?,
hi: get_field(&dv, "hi")?,
},
})
if let Some(dv) = descriptor.get_message_by_name(DURATION_TYPE_CLASS) {
let durv = dv;
Ok(RecordDesc {
message,
version_field,
point_field: PointDesc {
message: pv.clone(),
x: get_field(&pv, "x")?,
y: get_field(&pv, "y")?,
},
decimal_field: DecimalDesc {
message: decimal_values.clone(),
flags: get_field(&decimal_values, "flags")?,
lo: get_field(&decimal_values, "lo")?,
mid: get_field(&decimal_values, "mid")?,
hi: get_field(&decimal_values, "hi")?,
},
duration_field: DurationDesc {
message: durv.clone(),
value: get_field(&durv, "value")?,
time_unit: get_field(&durv, "time_unit")?,
},
})
} else {
Err(ServiceNotFound(DURATION_TYPE_CLASS.to_string()))
}
} else {
Err(ServiceNotFound(DECIMAL_TYPE_CLASS.to_string()))
}
Expand Down Expand Up @@ -373,5 +383,6 @@ fn convert_dozer_type_to_proto_type(field_type: FieldType) -> Result<String, Gen
FieldType::Date => Ok("string".to_owned()),
FieldType::Bson => Ok("bytes".to_owned()),
FieldType::Point => Ok(POINT_TYPE_CLASS.to_owned()),
FieldType::Duration => Ok("string".to_owned()),
}
}
8 changes: 8 additions & 0 deletions dozer-api/src/generator/protoc/generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct RecordDesc {
pub version_field: FieldDescriptor,
pub point_field: PointDesc,
pub decimal_field: DecimalDesc,
pub duration_field: DurationDesc,
}

#[derive(Debug, Clone)]
Expand All @@ -65,6 +66,13 @@ pub struct PointDesc {
pub y: FieldDescriptor,
}

#[derive(Debug, Clone)]
pub struct DurationDesc {
pub message: MessageDescriptor,
pub value: FieldDescriptor,
pub time_unit: FieldDescriptor,
}

#[derive(Debug, Clone)]
pub struct DecimalDesc {
pub message: MessageDescriptor,
Expand Down
12 changes: 12 additions & 0 deletions dozer-api/src/grpc/typed/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ fn interval_value_to_pb(
point.set_field(y_field_desc, prost_reflect::Value::F64(p.y));
Value::Message(point)
}
GrpcTypes::value::Value::DurationValue(d) => {
let duration_type_desc = descriptor.duration_field.message.clone();
let value_field_desc = &descriptor.duration_field.value;
let time_unit_field_desc = &descriptor.duration_field.time_unit;
let mut duration = DynamicMessage::new(duration_type_desc);
duration.set_field(value_field_desc, prost_reflect::Value::String(d.value));
duration.set_field(
time_unit_field_desc,
prost_reflect::Value::String(d.time_unit),
);
Value::Message(duration)
}
GrpcTypes::value::Value::DecimalValue(d) => {
let decimal_type_desc = descriptor.decimal_field.message.clone();
let flags_field_desc = &descriptor.decimal_field.flags;
Expand Down
Binary file modified dozer-api/src/grpc/typed/tests/generated_films.bin
Binary file not shown.
16 changes: 14 additions & 2 deletions dozer-api/src/grpc/types_helper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use dozer_cache::cache::RecordWithId as CacheRecordWithId;
use dozer_types::grpc_types::types::{
value, Operation, OperationType, PointType, Record, RecordWithId, RustDecimal, Type, Value,
value, DurationType, Operation, OperationType, PointType, Record, RecordWithId, RustDecimal,
Type, Value,
};
use dozer_types::ordered_float::OrderedFloat;
use dozer_types::rust_decimal::Decimal;
use dozer_types::types::{Field, FieldType, Record as DozerRecord, DATE_FORMAT};
use dozer_types::types::{DozerDuration, Field, FieldType, Record as DozerRecord, DATE_FORMAT};
use prost_reflect::prost_types::Timestamp;

pub fn map_insert_operation(endpoint_name: String, record: DozerRecord, id: u64) -> Operation {
Expand Down Expand Up @@ -69,6 +70,15 @@ fn map_x_y_to_prost_coord_map((x, y): (OrderedFloat<f64>, OrderedFloat<f64>)) ->
}
}

fn map_duration_to_prost_coord_map(d: DozerDuration) -> Value {
Value {
value: Some(value::Value::DurationValue(DurationType {
value: d.0.as_nanos().to_string(),
time_unit: d.1.to_string(),
})),
}
}

fn map_decimal(d: Decimal) -> Value {
Value {
value: Some(value::Value::DecimalValue(RustDecimal {
Expand Down Expand Up @@ -126,6 +136,7 @@ fn field_to_prost_value(f: Field) -> Value {
)),
},
Field::Point(point) => map_x_y_to_prost_coord_map(point.0.x_y()),
Field::Duration(d) => map_duration_to_prost_coord_map(d),
}
}

Expand Down Expand Up @@ -158,5 +169,6 @@ fn field_type_to_internal_type(typ: FieldType) -> Type {
FieldType::Bson => Type::Bson,
FieldType::Date => Type::String,
FieldType::Point => Type::Point,
FieldType::Duration => Type::Duration,
}
}
19 changes: 18 additions & 1 deletion dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dozer_types::indexmap::IndexMap;
use dozer_types::log::warn;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::ordered_float::OrderedFloat;
use dozer_types::types::{Field, Schema, DATE_FORMAT};
use dozer_types::types::{DozerDuration, Field, Schema, DATE_FORMAT};
use openapiv3::OpenAPI;

use crate::api_helper::{get_record, get_records, get_records_count};
Expand Down Expand Up @@ -191,6 +191,13 @@ fn convert_x_y_to_object((x, y): &(OrderedFloat<f64>, OrderedFloat<f64>)) -> Val
Value::Object(m)
}

fn convert_duration_to_object(d: &DozerDuration) -> Value {
let mut m = Map::new();
m.insert("value".to_string(), Value::from(d.0.as_nanos().to_string()));
m.insert("time_unit".to_string(), Value::from(d.1.to_string()));
Value::Object(m)
}

/// Used in REST APIs for converting raw value back and forth.
///
/// Should be consistent with `convert_cache_type_to_schema_type`.
Expand All @@ -210,19 +217,22 @@ fn field_to_json_value(field: Field) -> Value {
Field::Date(n) => Value::String(n.format(DATE_FORMAT).to_string()),
Field::Bson(b) => Value::from(b),
Field::Point(point) => convert_x_y_to_object(&point.0.x_y()),
Field::Duration(d) => convert_duration_to_object(&d),
Field::Null => Value::Null,
}
}

#[cfg(test)]
mod tests {
use dozer_types::types::TimeUnit;
use dozer_types::{
chrono::{NaiveDate, Offset, TimeZone, Utc},
json_value_to_field,
ordered_float::OrderedFloat,
rust_decimal::Decimal,
types::{DozerPoint, Field, FieldType},
};
use std::time::Duration;

use super::*;

Expand Down Expand Up @@ -266,6 +276,13 @@ mod tests {
FieldType::Point,
Field::Point(DozerPoint::from((3.234, 4.567))),
),
(
FieldType::Duration,
Field::Duration(DozerDuration(
Duration::from_nanos(123_u64),
TimeUnit::Nanoseconds,
)),
),
];
for (field_type, field) in fields {
test_field_conversion(field_type, field);
Expand Down
1 change: 1 addition & 0 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
FieldType::Date => debug_assert!(value.as_date().is_some()),
FieldType::Bson => debug_assert!(value.as_bson().is_some()),
FieldType::Point => debug_assert!(value.as_point().is_some()),
FieldType::Duration => debug_assert!(value.as_duration().is_some()),
}
}
}
1 change: 1 addition & 0 deletions dozer-ingestion/tests/test_suite/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ fn assert_record_matches_schema(record: &Record, schema: &Schema, only_match_pk:
FieldType::Date => assert!(value.as_date().is_some()),
FieldType::Bson => assert!(value.as_bson().is_some()),
FieldType::Point => assert!(value.as_point().is_some()),
FieldType::Duration => assert!(value.as_duration().is_some()),
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use dozer_types::arrow::array::{
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
};

use dozer_types::{
arrow,
chrono::Datelike,
Expand Down Expand Up @@ -284,6 +285,9 @@ fn field_type_to_arrow(field_type: FieldType) -> Option<arrow::datatypes::DataTy
FieldType::Date => Some(arrow::datatypes::DataType::Date32),
FieldType::Bson => None,
FieldType::Point => None,
FieldType::Duration => Some(arrow::datatypes::DataType::Duration(
arrow::datatypes::TimeUnit::Nanosecond,
)),
}
}

Expand Down Expand Up @@ -419,6 +423,17 @@ fn fields_to_arrow<'a, F: IntoIterator<Item = &'a Field>>(
}
FieldType::Bson => panic!("Bson not supported"),
FieldType::Point => panic!("Point not supported"),
FieldType::Duration => {
let mut builder = arrow::array::DurationNanosecondArray::builder(count);
for field in fields {
match field {
Field::Duration(value) => builder.append_value(value.0.as_nanos() as i64),
Field::Null => builder.append_null(),
_ => panic!("Unexpected field type"),
}
}
Arc::new(builder.finish())
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ fn field_type_to_sql(field_type: FieldType) -> Option<String> {
FieldType::Date => Some("DATE".to_string()),
FieldType::Bson => Some("JSONB".to_string()),
FieldType::Point => Some("POINT".to_string()),
FieldType::Duration => Some("DURATION".to_string()),
}
}

Expand Down Expand Up @@ -215,6 +216,7 @@ fn field_to_sql(field: &Field) -> String {
format!("'{}'::json", json)
}
Field::Point(p) => format!("'({},{})'", p.0.x(), p.0.y()),
Field::Duration(d) => d.to_string(),
Field::Null => "NULL".to_string(),
}
}
Expand Down
10 changes: 6 additions & 4 deletions dozer-sql/src/pipeline/aggregation/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub fn validate_avg(args: &[Expression], schema: &Schema) -> Result<ExpressionTy
| FieldType::Timestamp
| FieldType::Binary
| FieldType::Bson
| FieldType::Point => {
| FieldType::Point
| FieldType::Duration => {
return Err(PipelineError::InvalidFunctionArgumentType(
Avg.to_string(),
arg.return_type,
Expand Down Expand Up @@ -194,9 +195,10 @@ fn get_average(
| FieldType::Timestamp
| FieldType::Binary
| FieldType::Bson
| FieldType::Point => Err(PipelineError::InternalExecutionError(InvalidType(format!(
"Not supported return type {typ} for {Avg}"
)))),
| FieldType::Point
| FieldType::Duration => Err(PipelineError::InternalExecutionError(InvalidType(
format!("Not supported return type {typ} for {Avg}"),
))),
},
None => Err(PipelineError::InternalExecutionError(InvalidType(format!(
"Not supported None return type for {Avg}"
Expand Down
7 changes: 4 additions & 3 deletions dozer-sql/src/pipeline/aggregation/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ fn get_count(count: u64, return_type: Option<FieldType>) -> Result<Field, Pipeli
| FieldType::Timestamp
| FieldType::Binary
| FieldType::Bson
| FieldType::Point => Err(PipelineError::InternalExecutionError(InvalidType(format!(
"Not supported return type {typ} for {Count}"
)))),
| FieldType::Point
| FieldType::Duration => Err(PipelineError::InternalExecutionError(InvalidType(
format!("Not supported return type {typ} for {Count}"),
))),
},
None => Err(PipelineError::InternalExecutionError(InvalidType(format!(
"Not supported None return type for {Count}"
Expand Down
Loading