diff --git a/Cargo.toml b/Cargo.toml index 739a289..303c71e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,3 @@ [workspace] members = ["azure-kusto-data"] +resolver = "2" diff --git a/azure-kusto-data/Cargo.toml b/azure-kusto-data/Cargo.toml index 0146a6e..a2eb98c 100644 --- a/azure-kusto-data/Cargo.toml +++ b/azure-kusto-data/Cargo.toml @@ -5,7 +5,7 @@ description = "Rust wrappers around Microsoft Azure REST APIs - Azure Data Explo readme = "README.md" license = "MIT" edition = "2021" -rust-version = "1.75" +rust-version = "1.74" repository = "https://github.com/azure/azure-sdk-for-rust" homepage = "https://github.com/azure/azure-sdk-for-rust" documentation = "https://docs.rs/azure_kusto_data" @@ -26,7 +26,7 @@ bytes = "1.4" futures = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" -serde_with = { version = "3.4.0"} +serde_with = { version = "3.4.0" } thiserror = "1.0.38" hashbrown = { version = "0.14", features = ["serde"] } regex = "1.7.1" @@ -38,16 +38,17 @@ time = { version = "0.3", features = [ "serde-well-known", ] } derive_builder = "0.12" -derive_more = { version = "1.0.0-beta.6" , features = ["from", "into", "display", "from_str"] } +derive_more = { version = "1.0.0-beta.6", features = ["from", "into", "display", "from_str"] } once_cell = "1" rust_decimal = { version = "1.33.1", features = ["serde-str"] } uuid = { version = "1.3.0", features = ["serde"] } +tokio = { version = "1.35.1", features = ["macros", "rt"] } + [dev-dependencies] arrow = { version = "49.0.0", features = ["prettyprint"] } dotenv = "0.15.0" env_logger = "0.10.0" -tokio = { version = "1.25.0", features = ["macros"] } oauth2 = "4.3.0" criterion = "0.5" clap = { version = "4.1.6", features = ["derive", "env"] } diff --git a/azure-kusto-data/src/arrow.rs b/azure-kusto-data/src/arrow.rs index 541ed43..8ce1c76 100644 --- a/azure-kusto-data/src/arrow.rs +++ b/azure-kusto-data/src/arrow.rs @@ -94,7 +94,7 @@ pub fn convert_column(data: Vec, column: &Column) -> Result<(Field, Array .map(|data| (Field::new(column_name, DataType::Int64, true), data)), ColumnType::Real => convert_array_float(data) .map(|data| (Field::new(column_name, DataType::Float64, true), data)), - ColumnType::Datetime => convert_array_datetime(data).map(|data| { + ColumnType::DateTime => convert_array_datetime(data).map(|data| { ( Field::new( column_name, diff --git a/azure-kusto-data/src/client.rs b/azure-kusto-data/src/client.rs index f42de2e..328f093 100644 --- a/azure-kusto-data/src/client.rs +++ b/azure-kusto-data/src/client.rs @@ -13,7 +13,6 @@ use crate::prelude::ClientRequestProperties; use azure_core::headers::Headers; use azure_core::prelude::{Accept, AcceptEncoding, ClientVersion, ContentType}; use serde::de::DeserializeOwned; -use serde_json::Value; use std::convert::TryFrom; use std::fmt::Debug; use std::sync::Arc; @@ -250,10 +249,8 @@ impl KustoClient { .ok_or_else(|| Error::QueryError("No primary results found".into()))? .rows .into_iter() - .map(|row| match row { - Row::Values(v) => serde_json::from_value(Value::Array(v)).map_err(Error::from), - Row::Error(e) => Err(Error::QueryApiError(e)), - }) + .map(Row::into_result) + .map(|r| r.and_then(|v| serde_json::from_value::(serde_json::Value::Array(v)).map_err(Error::from))) .collect::>>()?; Ok(results) diff --git a/azure-kusto-data/src/error.rs b/azure-kusto-data/src/error.rs index 1cfaddb..5554f3c 100644 --- a/azure-kusto-data/src/error.rs +++ b/azure-kusto-data/src/error.rs @@ -54,26 +54,50 @@ pub enum Error { /// Errors raised from the api calls to kusto #[error("Query API error: {0}")] QueryApiError(OneApiError), + + /// Multiple errors + #[error("Multiple errors: {0:?}")] + MultipleErrors(Vec), +} + +impl From> for Error { + fn from(errors: Vec) -> Self { + if errors.len() == 1 { + Error::from(errors.into_iter().next().unwrap()) + } else { + Error::MultipleErrors(errors) + } + } } +/// Errors raised when parsing values. #[derive(thiserror::Error, Debug)] pub enum ParseError { + /// Raised when a value is null, but the type is not nullable. #[error("Error parsing null value for {0}")] ValueNull(String), + /// Raised when an int value is failed to be parsed. #[error("Error parsing int: {0}")] Int(#[from] std::num::ParseIntError), + /// Raised when a long value is failed to be parsed. #[error("Error parsing float: {0}")] Float(#[from] std::num::ParseFloatError), + /// Raised when a bool value is failed to be parsed. #[error("Error parsing bool: {0}")] Bool(#[from] std::str::ParseBoolError), + /// Raised when a timespan value is failed to be parsed. #[error("Error parsing timespan: {0}")] Timespan(String), + /// Raised when a datetime value is failed to be parsed. #[error("Error parsing datetime: {0}")] DateTime(#[from] time::error::Parse), + /// Raised when a guid value is failed to be parsed. #[error("Error parsing guid: {0}")] Guid(#[from] uuid::Error), + /// Raised when a decimal value is failed to be parsed. #[error("Error parsing decimal")] Decimal(#[from] rust_decimal::Error), + /// Raised when a dynamic value is failed to be parsed. #[error("Error parsing dynamic: {0}")] Dynamic(#[from] serde_json::Error), } @@ -115,3 +139,4 @@ impl ConnectionStringError { /// Result type for kusto operations. pub type Result = std::result::Result; +pub type Partial = std::result::Result, Error)>; diff --git a/azure-kusto-data/src/models/mod.rs b/azure-kusto-data/src/models/mod.rs index 6748d42..a80cd5e 100644 --- a/azure-kusto-data/src/models/mod.rs +++ b/azure-kusto-data/src/models/mod.rs @@ -1,6 +1,9 @@ pub mod v1; pub mod v2; +#[cfg(test)] +pub(crate) mod test_helpers; + use serde::{Deserialize, Serialize}; /// Represents the scalar data types of ADX. see [the docs for more information](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/scalar-data-types/) @@ -11,7 +14,7 @@ pub enum ColumnType { Bool, /// Datetime, represents a specific point in time. #[serde(rename = "datetime")] - Datetime, + DateTime, /// A complex type, that is either an array or a dictionary of other values. #[serde(rename = "dynamic")] Dynamic, diff --git a/azure-kusto-data/src/models/one_api_error.rs b/azure-kusto-data/src/models/one_api_error.rs deleted file mode 100644 index 322571e..0000000 --- a/azure-kusto-data/src/models/one_api_error.rs +++ /dev/null @@ -1 +0,0 @@ -use serde::{Deserialize, Serialize}; diff --git a/azure-kusto-data/src/models/test_helpers.rs b/azure-kusto-data/src/models/test_helpers.rs new file mode 100644 index 0000000..5d11162 --- /dev/null +++ b/azure-kusto-data/src/models/test_helpers.rs @@ -0,0 +1,643 @@ +use crate::models::ColumnType; +use crate::models::v2::{Column, DataSetCompletion, DataSetHeader, DataTable, Frame, OneApiError, OneApiErrors, Row, TableCompletion, TableFragment, TableFragmentType, TableHeader, TableKind}; +use crate::models::v2::ErrorReportingPlacement::EndOfTable; + +const V2_VALID_FRAMES: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/validFrames.json")); +const V2_TWO_TABLES: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/twoTables.json")); +const V2_PARTIAL_ERROR: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/partialError.json")); +const V2_PARTIAL_ERROR_FULL_DATASET: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/partialErrorFullDataset.json")); + + +fn expected_v2_valid_frames() -> Vec { + vec![ + Frame::DataSetHeader(DataSetHeader { + is_progressive: false, + version: "v2.0".to_string(), + is_fragmented: Some(true), + error_reporting_placement: Some(EndOfTable), + }), + Frame::DataTable(DataTable { + table_id: 0, + table_name: "@ExtendedProperties".to_string(), + table_kind: TableKind::QueryProperties, + columns: vec![ + Column { + column_name: "TableId".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "Key".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "Value".to_string(), + column_type: ColumnType::Dynamic, + }, + ], + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + serde_json::Value::String("Visualization".to_string()), + serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()), + ]), + ], + }), + Frame::TableHeader(TableHeader { + table_id: 1, + table_name: "AllDataTypes".to_string(), + table_kind: TableKind::PrimaryResult, + columns: vec![ + Column { + column_name: "vnum".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "vdec".to_string(), + column_type: ColumnType::Decimal, + }, + Column { + column_name: "vdate".to_string(), + column_type: ColumnType::DateTime, + }, + Column { + column_name: "vspan".to_string(), + column_type: ColumnType::Timespan, + }, + Column { + column_name: "vobj".to_string(), + column_type: ColumnType::Dynamic, + }, + Column { + column_name: "vb".to_string(), + column_type: ColumnType::Bool, + }, + Column { + column_name: "vreal".to_string(), + column_type: ColumnType::Real, + }, + Column { + column_name: "vstr".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "vlong".to_string(), + column_type: ColumnType::Long, + }, + Column { + column_name: "vguid".to_string(), + column_type: ColumnType::Guid, + }, + ], + }), + Frame::TableFragment(TableFragment { + table_fragment_type: TableFragmentType::DataAppend, + table_id: 1, + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + serde_json::Value::String("2.00000000000001".to_string()), + serde_json::Value::String("2020-03-04T14:05:01.3109965Z".to_string()), + serde_json::Value::String("01:23:45.6789000".to_string()), + serde_json::Value::Object(vec![("moshe".to_string(), serde_json::Value::String("value".to_string()))].into_iter().collect()), + serde_json::Value::Bool(true), + serde_json::Value::Number(serde_json::Number::from_f64(0.01).unwrap()), + serde_json::Value::String("asdf".to_string()), + serde_json::Value::Number(serde_json::Number::from(9223372036854775807i64)), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + ]), + ], + }), + Frame::TableCompletion(TableCompletion { + table_id: 1, + row_count: 1, + one_api_errors: None, + }), + Frame::DataTable(DataTable { + table_id: 2, + table_name: "QueryCompletionInformation".to_string(), + table_kind: TableKind::QueryCompletionInformation, + columns: vec![ + Column { + column_name: "Timestamp".to_string(), + column_type: ColumnType::DateTime, + }, + Column { + column_name: "ClientRequestId".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "ActivityId".to_string(), + column_type: ColumnType::Guid, + }, + Column { + column_name: "SubActivityId".to_string(), + column_type: ColumnType::Guid, + }, + Column { + column_name: "ParentActivityId".to_string(), + column_type: ColumnType::Guid, + }, + Column { + column_name: "Level".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "LevelName".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "StatusCode".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "StatusCodeName".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "EventType".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "EventTypeName".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "Payload".to_string(), + column_type: ColumnType::String, + }, + ], + rows: vec![ + Row::Values(vec![ + serde_json::Value::String("2023-11-26T13:34:17.0731478Z".to_string()), + serde_json::Value::String("blab6".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::Number(serde_json::Number::from(4)), + serde_json::Value::String("Info".to_string()), + serde_json::Value::Number(serde_json::Number::from(0)), + serde_json::Value::String("S_OK (0)".to_string()), + serde_json::Value::Number(serde_json::Number::from(4)), + serde_json::Value::String("QueryInfo".to_string()), + serde_json::Value::String("{\"Count\":1,\"Text\":\"Query completed successfully\"}".to_string()), + ]), + Row::Values(vec![ + serde_json::Value::String("2023-11-26T13:34:17.0731478Z".to_string()), + serde_json::Value::String("blab6".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::Number(serde_json::Number::from(4)), + serde_json::Value::String("Info".to_string()), + serde_json::Value::Number(serde_json::Number::from(0)), + serde_json::Value::String("S_OK (0)".to_string()), + serde_json::Value::Number(serde_json::Number::from(5)), + serde_json::Value::String("WorkloadGroup".to_string()), + serde_json::Value::String("{\"Count\":1,\"Text\":\"default\"}".to_string()), + ]), + ], + }), + Frame::DataSetCompletion(DataSetCompletion { + has_errors: false, + cancelled: false, + one_api_errors: None, + }), + ] +} + + +fn expected_v2_two_tables() -> Vec { + vec![ + Frame::DataSetHeader(DataSetHeader { + is_progressive: false, + version: "v2.0".to_string(), + is_fragmented: Some(true), + error_reporting_placement: Some(EndOfTable), + }), + Frame::DataTable(DataTable { + table_id: 0, + table_name: "@ExtendedProperties".to_string(), + table_kind: TableKind::QueryProperties, + columns: vec![ + Column { + column_name: "TableId".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "Key".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "Value".to_string(), + column_type: ColumnType::Dynamic, + }, + ], + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + serde_json::Value::String("Visualization".to_string()), + serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()), + ]), + Row::Values(vec![2.into(), "Visualization".to_string().into(), "{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string().into()]), + ], + }), + Frame::TableHeader(TableHeader { + table_id: 1, + table_name: "PrimaryResult".to_string(), + table_kind: TableKind::PrimaryResult, + columns: vec![ + Column { + column_name: "A".to_string(), + column_type: ColumnType::Int, + }, + ], + }), + Frame::TableFragment(TableFragment { + table_fragment_type: TableFragmentType::DataAppend, + table_id: 1, + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + ]), + ], + }), + Frame::TableFragment(TableFragment { + table_fragment_type: TableFragmentType::DataAppend, + table_id: 1, + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(2)), + ]), + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(3)), + ]), + ], + }), + Frame::TableCompletion(TableCompletion { + table_id: 1, + row_count: 3, + one_api_errors: None, + }), + Frame::TableHeader(TableHeader { + table_id: 2, + table_name: "PrimaryResult".to_string(), + table_kind: TableKind::PrimaryResult, + columns: vec![ + Column { + column_name: "A".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "B".to_string(), + column_type: ColumnType::Int, + }, + ], + }), + Frame::TableFragment(TableFragment { + table_fragment_type: TableFragmentType::DataAppend, + table_id: 2, + rows: vec![ + Row::Values(vec![ + serde_json::Value::String("a".to_string()), + serde_json::Value::Number(serde_json::Number::from(1)), + ]), + ], + }), + Frame::TableFragment(TableFragment { + table_fragment_type: TableFragmentType::DataAppend, + table_id: 2, + rows: vec![ + Row::Values(vec![ + serde_json::Value::String("b".to_string()), + serde_json::Value::Number(serde_json::Number::from(2)), + ]), + Row::Values(vec![ + serde_json::Value::String("c".to_string()), + serde_json::Value::Number(serde_json::Number::from(3)), + ]), + ], + }), + Frame::TableCompletion(TableCompletion { + table_id: 2, + row_count: 3, + one_api_errors: None, + }), + Frame::DataTable(DataTable { + table_id: 3, + table_name: "QueryCompletionInformation".to_string(), + table_kind: TableKind::QueryCompletionInformation, + columns: vec![ + Column { + column_name: "Timestamp".to_string(), + column_type: ColumnType::DateTime, + }, + Column { + column_name: "ClientRequestId".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "ActivityId".to_string(), + column_type: ColumnType::Guid, + }, + Column { + column_name: "SubActivityId".to_string(), + column_type: ColumnType::Guid, + }, + Column { + column_name: "ParentActivityId".to_string(), + column_type: ColumnType::Guid, + }, + Column { + column_name: "Level".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "LevelName".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "StatusCode".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "StatusCodeName".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "EventType".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "EventTypeName".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "Payload".to_string(), + column_type: ColumnType::String, + }, + ], + rows: vec![ + Row::Values(vec![ + serde_json::Value::String("2023-11-28T11:13:43.2514779Z".to_string()), + serde_json::Value::String("blab6".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::Number(serde_json::Number::from(4)), + serde_json::Value::String("Info".to_string()), + serde_json::Value::Number(serde_json::Number::from(0)), + serde_json::Value::String("S_OK (0)".to_string()), + serde_json::Value::Number(serde_json::Number::from(4)), + serde_json::Value::String("QueryInfo".to_string()), + serde_json::Value::String("{\"Count\":1,\"Text\":\"Query completed successfully\"}".to_string()), + ]), + Row::Values(vec![ + serde_json::Value::String("2023-11-28T11:13:43.2514779Z".to_string()), + serde_json::Value::String("blab6".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), + serde_json::Value::Number(serde_json::Number::from(4)), + serde_json::Value::String("Info".to_string()), + serde_json::Value::Number(serde_json::Number::from(0)), + serde_json::Value::String("S_OK (0)".to_string()), + serde_json::Value::Number(serde_json::Number::from(5)), + serde_json::Value::String("WorkloadGroup".to_string()), + serde_json::Value::String("{\"Count\":1,\"Text\":\"default\"}".to_string()), + ]), + Row::Values(vec![serde_json::Value::String("2023-11-28T11:13:43.2514779Z".to_string()), serde_json::Value::String("blab6".to_string()), serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), serde_json::Value::from(4), serde_json::Value::String("Info".to_string()), serde_json::Value::from(0), serde_json::Value::String("S_OK (0)".to_string()), serde_json::Value::from(6), serde_json::Value::String("EffectiveRequestOptions".to_string()), serde_json::Value::String("{\"Count\":1,\"Text\":\"{\\\"DataScope\\\":\\\"All\\\",\\\"QueryConsistency\\\":\\\"strongconsistency\\\",\\\"MaxMemoryConsumptionPerIterator\\\":5368709120,\\\"MaxMemoryConsumptionPerQueryPerNode\\\":8589346816,\\\"QueryFanoutNodesPercent\\\":100,\\\"QueryFanoutThreadsPercent\\\":100}\"}".to_string())]), + Row::Values(vec![serde_json::Value::String("2023-11-28T11:13:43.2514779Z".to_string()), serde_json::Value::String("blab6".to_string()), serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), serde_json::Value::String("123e27de-1e4e-49d9-b579-fe0b331d3642".to_string()), serde_json::Value::from(6), serde_json::Value::String("Stats".to_string()), serde_json::Value::from(0), serde_json::Value::String("S_OK (0)".to_string()), serde_json::Value::from(0), serde_json::Value::String("QueryResourceConsumption".to_string()), serde_json::Value::String("{\"ExecutionTime\":0.0,\"resource_usage\":{\"cache\":{\"memory\":{\"hits\":0,\"misses\":0,\"total\":0},\"disk\":{\"hits\":0,\"misses\":0,\"total\":0},\"shards\":{\"hot\":{\"hitbytes\":0,\"missbytes\":0,\"retrievebytes\":0},\"cold\":{\"hitbytes\":0,\"missbytes\":0,\"retrievebytes\":0},\"bypassbytes\":0}},\"cpu\":{\"user\":\"00:00:00\",\"kernel\":\"00:00:00\",\"total cpu\":\"00:00:00\"},\"memory\":{\"peak_per_node\":524384},\"network\":{\"inter_cluster_total_bytes\":1099,\"cross_cluster_total_bytes\":0}},\"input_dataset_statistics\":{\"extents\":{\"total\":0,\"scanned\":0,\"scanned_min_datetime\":\"0001-01-01T00:00:00.0000000Z\",\"scanned_max_datetime\":\"0001-01-01T00:00:00.0000000Z\"},\"rows\":{\"total\":0,\"scanned\":0},\"rowstores\":{\"scanned_rows\":0,\"scanned_values_size\":0},\"shards\":{\"queries_generic\":0,\"queries_specialized\":0}},\"dataset_statistics\":[{\"table_row_count\":3,\"table_size\":15},{\"table_row_count\":3,\"table_size\":43}],\"cross_cluster_resource_usage\":{}}".to_string())]), + ], + }), + Frame::DataSetCompletion(DataSetCompletion { + has_errors: false, + cancelled: false, + one_api_errors: None, + }), + ] +} + +fn expected_v2_partial_error() -> Vec { + vec![ + Frame::DataSetHeader(DataSetHeader { + is_progressive: false, + version: "v2.0".to_string(), + is_fragmented: Some(true), + error_reporting_placement: Some(EndOfTable), + }), + Frame::DataTable(DataTable { + table_id: 0, + table_name: "@ExtendedProperties".to_string(), + table_kind: TableKind::QueryProperties, + columns: vec![ + Column { + column_name: "TableId".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "Key".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "Value".to_string(), + column_type: ColumnType::Dynamic, + }, + ], + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + serde_json::Value::String("Visualization".to_string()), + serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()), + ]), + ], + }), + Frame::TableHeader(TableHeader { + table_id: 1, + table_name: "PrimaryResult".to_string(), + table_kind: TableKind::PrimaryResult, + columns: vec![ + Column { + column_name: "A".to_string(), + column_type: ColumnType::Int, + }, + ], + }), + Frame::TableFragment(TableFragment { + table_fragment_type: TableFragmentType::DataAppend, + table_id: 1, + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + ]), + ], + }), + Frame::TableCompletion(TableCompletion { + table_id: 1, + row_count: 1, + one_api_errors: Some(vec![ + OneApiError { + error_message: crate::models::v2::ErrorMessage { + code: "LimitsExceeded".to_string(), + message: "Request is invalid and cannot be executed.".to_string(), + r#type: "Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException".to_string(), + description: "Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..".to_string(), + context: crate::models::v2::ErrorContext { + timestamp: "2023-11-28T08:30:06.4085369Z".to_string(), + service_alias: "".to_string(), + machine_name: "KSEngine000000".to_string(), + process_name: "Kusto.WinSvc.Svc".to_string(), + process_id: 4900, + thread_id: 6828, + client_request_id: "blab6".to_string(), + activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + sub_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_type: "GW.Http.CallContext".to_string(), + parent_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_stack: "(Activity stack: CRID=blab6 ARID=123e27de-1e4e-49d9-b579-fe0b331d3642 > GW.Http.CallContext/123e27de-1e4e-49d9-b579-fe0b331d3642)".to_string(), + }, + is_permanent: false, + }, + }, + ]), + }), + Frame::DataSetCompletion(DataSetCompletion { + has_errors: true, + cancelled: false, + one_api_errors: Some(vec![ + OneApiError { + error_message: crate::models::v2::ErrorMessage { + code: "LimitsExceeded".to_string(), + message: "Request is invalid and cannot be executed.".to_string(), + r#type: "Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException".to_string(), + description: "Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..".to_string(), + r#context: crate::models::v2::ErrorContext { + timestamp: "2023-11-28T08:30:06.4085369Z".to_string(), + service_alias: "".to_string(), + machine_name: "KSEngine000000".to_string(), + process_name: "Kusto.WinSvc.Svc".to_string(), + process_id: 4900, + thread_id: 6828, + client_request_id: "blab6".to_string(), + activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + sub_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_type: "GW.Http.CallContext".to_string(), + parent_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_stack: "(Activity stack: CRID=blab6 ARID=123e27de-1e4e-49d9-b579-fe0b331d3642 > GW.Http.CallContext/123e27de-1e4e-49d9-b579-fe0b331d3642)".to_string(), + }, + is_permanent: false, + }, + }, + ]), + }), + ] +} + +fn expected_v2_partial_error_full_dataset() -> Vec { + vec![ + Frame::DataSetHeader(DataSetHeader { + is_progressive: false, + version: "v2.0".to_string(), + is_fragmented: Some(false), + error_reporting_placement: Some(crate::models::v2::ErrorReportingPlacement::InData), + }), + Frame::DataTable(DataTable { + table_id: 0, + table_name: "@ExtendedProperties".to_string(), + table_kind: TableKind::QueryProperties, + columns: vec![ + Column { + column_name: "TableId".to_string(), + column_type: ColumnType::Int, + }, + Column { + column_name: "Key".to_string(), + column_type: ColumnType::String, + }, + Column { + column_name: "Value".to_string(), + column_type: ColumnType::Dynamic, + }, + ], + rows: vec![ + Row::Values(vec![ + serde_json::Value::Number(serde_json::Number::from(1)), + serde_json::Value::String("Visualization".to_string()), + serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()), + ]), + Row::Error((OneApiErrors { + errors: vec![OneApiError { + error_message: crate::models::v2::ErrorMessage { + code: "LimitsExceeded".to_string(), + message: "Request is invalid and cannot be executed.".to_string(), + r#type: "Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException".to_string(), + description: "Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..".to_string(), + context: crate::models::v2::ErrorContext { + timestamp: "2023-12-18T08:25:05.8871389Z".to_string(), + service_alias: "ASAF".to_string(), + machine_name: "KSEngine000000".to_string(), + process_name: "Kusto.WinSvc.Svc".to_string(), + process_id: 4900, + thread_id: 4852, + client_request_id: "blab6".to_string(), + activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + sub_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_type: "GW.Http.CallContext".to_string(), + parent_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_stack: "(Activity stack: CRID=blab6 ARID=d6a331d8-4b0e-498b-b72f-9f842b86e0b2 > GW.Http.CallContext/d6a331d8-4b0e-498b-b72f-9f842b86e0b2)".to_string(), + }, + is_permanent: false, + }, + }] + })), + ], + }), + Frame::DataSetCompletion(DataSetCompletion { + has_errors: true, + cancelled: false, + one_api_errors: Some(vec![ + OneApiError { + error_message: crate::models::v2::ErrorMessage { + code: "LimitsExceeded".to_string(), + message: "Request is invalid and cannot be executed.".to_string(), + r#type: "Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException".to_string(), + description: "Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..".to_string(), + context: crate::models::v2::ErrorContext { + timestamp: "2023-12-18T08:25:05.8871389Z".to_string(), + service_alias: "ASAF".to_string(), + machine_name: "KSEngine000000".to_string(), + process_name: "Kusto.WinSvc.Svc".to_string(), + process_id: 4900, + thread_id: 4852, + client_request_id: "blab6".to_string(), + activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + sub_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_type: "GW.Http.CallContext".to_string(), + parent_activity_id: "123e27de-1e4e-49d9-b579-fe0b331d3642".to_string(), + activity_stack: "(Activity stack: CRID=blab6 ARID=d6a331d8-4b0e-498b-b72f-9f842b86e0b2 > GW.Http.CallContext/d6a331d8-4b0e-498b-b72f-9f842b86e0b2)".to_string(), + }, + is_permanent: false, + }, + }, + ]), + }), + ] +} + + +pub fn v2_files_full() -> Vec<(&'static str, Vec)> { + vec![ + (V2_VALID_FRAMES, expected_v2_valid_frames()), + (V2_TWO_TABLES, expected_v2_two_tables()), + (V2_PARTIAL_ERROR, expected_v2_partial_error()), + (V2_PARTIAL_ERROR_FULL_DATASET, expected_v2_partial_error_full_dataset()), + ] +} + + +pub fn v2_files_iterative() -> Vec<(&'static str, Vec)> { + vec![ + (V2_VALID_FRAMES, expected_v2_valid_frames()), + (V2_TWO_TABLES, expected_v2_two_tables()), + (V2_PARTIAL_ERROR, expected_v2_partial_error()), + ] +} diff --git a/azure-kusto-data/src/models/v2/errors.rs b/azure-kusto-data/src/models/v2/errors.rs index a53841f..d6aeda8 100644 --- a/azure-kusto-data/src/models/v2/errors.rs +++ b/azure-kusto-data/src/models/v2/errors.rs @@ -1,11 +1,18 @@ use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct OneApiErrors { + #[serde(rename = "OneApiErrors")] + pub errors: Vec, +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(rename_all = "PascalCase")] pub struct OneApiError { #[serde(rename = "error")] - error_message: ErrorMessage, + pub(crate) error_message: ErrorMessage, } impl Display for OneApiError { @@ -21,29 +28,31 @@ impl Display for OneApiError { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(rename_all = "camelCase")] pub struct ErrorMessage { - code: String, - message: String, + pub code: String, + pub message: String, + #[serde(rename = "@message")] + pub description: String, #[serde(rename = "@type")] - r#type: String, + pub r#type: String, #[serde(rename = "@context")] - context: ErrorContext, + pub context: ErrorContext, #[serde(rename = "@permanent")] - is_permanent: bool, + pub is_permanent: bool, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(rename_all = "camelCase")] pub struct ErrorContext { - timestamp: String, - service_alias: String, - machine_name: String, - process_name: String, - process_id: i32, - thread_id: i32, - client_request_id: String, - activity_id: String, - sub_activity_id: String, - activity_type: String, - parent_activity_id: String, - activity_stack: String, + pub timestamp: String, + pub service_alias: String, + pub machine_name: String, + pub process_name: String, + pub process_id: i32, + pub thread_id: i32, + pub client_request_id: String, + pub activity_id: String, + pub sub_activity_id: String, + pub activity_type: String, + pub parent_activity_id: String, + pub activity_stack: String, } diff --git a/azure-kusto-data/src/models/v2/frames.rs b/azure-kusto-data/src/models/v2/frames.rs index 6b31c96..5f31cd5 100644 --- a/azure-kusto-data/src/models/v2/frames.rs +++ b/azure-kusto-data/src/models/v2/frames.rs @@ -54,8 +54,6 @@ pub struct TableHeader { pub struct TableFragment { /// Table id - unique identifier of the table. Corresponds to the table_id in the TableHeader. pub table_id: i32, - /// The amount of fields - pub field_count: Option, /// The type of the fragment, instructs to how to use it. pub table_fragment_type: TableFragmentType, /// Rows in the table. Each row is a list of values, corresponding to the columns in the TableHeader, or an error. diff --git a/azure-kusto-data/src/models/v2/known_tables.rs b/azure-kusto-data/src/models/v2/known_tables.rs index 63335ee..13198d4 100644 --- a/azure-kusto-data/src/models/v2/known_tables.rs +++ b/azure-kusto-data/src/models/v2/known_tables.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use crate::{KustoInt, KustoString, KustoDateTime}; +use crate::{KustoInt, KustoString, KustoDateTime, KustoDynamic, KustoGuid}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] diff --git a/azure-kusto-data/src/models/v2/mod.rs b/azure-kusto-data/src/models/v2/mod.rs index 5eaa1a3..0c5b60d 100644 --- a/azure-kusto-data/src/models/v2/mod.rs +++ b/azure-kusto-data/src/models/v2/mod.rs @@ -10,7 +10,7 @@ pub use consts::*; pub use errors::*; pub use frames::*; pub use known_tables::*; -use crate::error::Error; +use crate::error::{Error, Partial}; /// A result of a V2 query. /// Could be a table, a part of a table, or metadata about the dataset. @@ -50,38 +50,45 @@ pub enum Row { /// A row in a table. Values(Vec), /// An error in a table. - Error(OneApiError), + Error(OneApiErrors), } -impl Into, OneApiError>> for Row { - fn into(self) -> Result, OneApiError> { +impl Into, Error>> for Row { + fn into(self) -> Result, Error> { match self { Row::Values(v) => Ok(v), - Row::Error(e) => Err(e), + Row::Error(e) => Err(e.errors.into_iter().map(Error::QueryApiError).collect::>().into()), } } } impl Row { - pub fn into_result(self) -> Result, OneApiError> { + pub fn into_result(self) -> Result, Error> { self.into() } } impl DataTable { - pub fn collect_values(&self) -> (serde_json::Value, Vec) { + pub fn collect_values(&self) -> Partial { let mut errors = vec![]; let mut values = vec![]; for row in &self.rows { match row.clone().into_result() { Ok(v) => values.push(serde_json::Value::Array(v)), - Err(e) => errors.push(e), + Err(e) => match e { + Error::MultipleErrors(e) => errors.extend(e), + _ => errors.push(e), + } } } - (serde_json::Value::Array(values), errors) + match (values.len(), errors.len()) { + (0, _) => Err((None, errors.into())), + (_, 0) => Ok(serde_json::Value::Array(values)), + (_, _) => Err((Some(serde_json::Value::Array(values)), errors.into())), + } } - pub fn deserialize_values(&self) -> (Vec, Vec) { + pub fn deserialize_values(&self) -> Partial> { let mut errors = vec![]; let mut values = vec![]; for row in &self.rows { @@ -90,10 +97,18 @@ impl DataTable { Ok(v) => values.push(v), Err(e) => errors.push(e.into()), }, - Err(e) => errors.push(e.into()), + Err(e) => match e { + Error::MultipleErrors(e) => errors.extend(e), + _ => errors.push(e), + } } } - (values, errors) + + match (values.len(), errors.len()) { + (0, _) => Err((None, errors.into())), + (_, 0) => Ok(values), + (_, _) => Err((Some(values), errors.into())), + } } } diff --git a/azure-kusto-data/src/operations/v2.rs b/azure-kusto-data/src/operations/v2.rs index 3c59fcc..e78e2a3 100644 --- a/azure-kusto-data/src/operations/v2.rs +++ b/azure-kusto-data/src/operations/v2.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use crate::error::{Error::JsonError, Result}; use crate::models::v2; -use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt}; +use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt, pin_mut}; use futures::lock::Mutex; -use tokio::spawn; +use tokio::sync::mpsc::{Receiver, Sender}; use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind}; pub fn parse_frames_iterative( @@ -11,11 +11,14 @@ pub fn parse_frames_iterative( ) -> impl Stream> { let buf = Vec::with_capacity(4096); stream::unfold((reader, buf), |(mut reader, mut buf)| async move { - let size = reader.read_until(b'\n', &mut buf).await.ok()?; - if size == 0 { + buf.clear(); + let size = reader.read_until(b'\n', &mut buf).await.ok()? - 1; + if size <= 0 { return None; } + dbg!(String::from_utf8_lossy(&buf[1..size])); + if buf[0] == b']' { return None; } @@ -35,23 +38,43 @@ pub async fn parse_frames_full( return Ok(serde_json::from_slice(&buf)?); } -struct Dataset { - header : Option, - completion : Option, - query_properties : Option>, - query_completion_information : Option>, - results : Vec, + +/// Arc Mutex +type M = Arc>; +/// Arc Mutex Option +type OM = M>; + +struct StreamingDataset { + header : OM, + completion : OM, + query_properties : OM>, + query_completion_information : OM>, + results : Receiver } -impl Dataset { - async fn from_stream(mut stream: impl Stream>) -> Result { - let mut dataset = Dataset { - header : None, - completion : None, - query_properties : None, - query_completion_information : None, - results : Vec::new(), +impl StreamingDataset { + fn new(stream: impl Stream> + Send + 'static) -> Arc { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let res = StreamingDataset { + header: Arc::new(Mutex::new(None)), + completion: Arc::new(Mutex::new(None)), + query_properties: Arc::new(Mutex::new(None)), + query_completion_information: Arc::new(Mutex::new(None)), + results: rx, }; + let res = Arc::new(res); + let tokio_res = res.clone(); + // TODO: to spawn a task we have to have a runtime. We wanted to be runtime independent, and that may still be a desire, but currently azure core isn't, so we might as well use tokio here. + tokio::spawn(async move { + tokio_res.populate_with_stream(stream, tx).await; + }); + + res + } + + async fn populate_with_stream(&self, stream: impl Stream>, tx: Sender) { + pin_mut!(stream); + let mut current_table = Some(DataTable { table_id: 0, table_name: "".to_string(), @@ -60,23 +83,25 @@ impl Dataset { rows: Vec::new(), }); - while let Some(frame) = stream.try_next().await? { + while let Some(frame) = stream.try_next().await.transpose() { + // TODO: handle errors + let frame = frame.expect("failed to read frame"); match frame { v2::Frame::DataSetHeader(header) => { - dataset.header = Some(header); + self.header.lock().await.replace(header); }, v2::Frame::DataSetCompletion(completion) => { - dataset.completion = Some(completion); + self.completion.lock().await.replace(completion); }, // TODO: properly handle errors/missing v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => { - dataset.query_properties.replace(table.deserialize_values::().expect("failed to deserialize query properties")); + self.query_properties.lock().await.replace(table.deserialize_values::().expect("failed to deserialize query properties")); }, v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => { - dataset.query_completion_information.replace(table.deserialize_values::().expect("failed to deserialize query completion information")); + self.query_completion_information.lock().await.replace(table.deserialize_values::().expect("failed to deserialize query completion information")); }, v2::Frame::DataTable(table) => { - dataset.results.push(table); + tx.send(table).await.expect("failed to send table"); }, // TODO - handle errors v2::Frame::TableHeader(table_header) => { @@ -94,43 +119,45 @@ impl Dataset { } v2::Frame::TableCompletion(table_completion) => { if let Some(table) = current_table.take() { - dataset.results.push(table); + // TODO - handle errors + + tx.send(table).await.expect("failed to send table"); } } Frame::TableProgress(_) => {} } } - Ok(dataset) } } +// test -/// Arc Mutex -type M = Arc>; -/// Arc Mutex Option -type OM = M>; - -struct StreamingDataset { - header : OM, - completion : OM, - query_properties : OM>, - query_completion_information : OM>, - results : M>, - stream : M>>, -} +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use futures::StreamExt; + use crate::models::test_helpers::{v2_files_full, v2_files_iterative}; -impl StreamingDataset { - fn new(stream: impl Stream> + Send + 'static) -> Self { - StreamingDataset { - header: Arc::new(Mutex::new(None)), - completion: Arc::new(Mutex::new(None)), - query_properties: Arc::new(Mutex::new(None)), - query_completion_information: Arc::new(Mutex::new(None)), - results: Arc::new(Mutex::new(Vec::new())), - stream: Arc::new(Mutex::new(stream)), - }; - // TODO: to spawn a task we have to have a runtime. We wanted to be runtime independent, and that may still be a desire, but currently azure core isn't, so we might as well use tokio here. - tokio::spawn( + #[tokio::test] + async fn test_parse_frames_full() { + for (contents ,frames) in v2_files_full() { + println!("testing: {}", contents); + let reader = Cursor::new(contents.as_bytes()); + let parsed_frames = super::parse_frames_full(reader).await.unwrap(); + assert_eq!(parsed_frames, frames); + } } + #[tokio::test] + async fn test_parse_frames_iterative() { + for (contents ,frames) in v2_files_iterative() { + println!("testing: {}", contents); + let reader = Cursor::new(contents.as_bytes()); + let parsed_frames = super::parse_frames_iterative(reader) + .map(|f| f.expect("failed to parse frame")) + .collect::>().await; + assert_eq!(parsed_frames, frames); + } + } } + diff --git a/azure-kusto-data/src/types/mod.rs b/azure-kusto-data/src/types/mod.rs index 86b55e0..1d67c36 100644 --- a/azure-kusto-data/src/types/mod.rs +++ b/azure-kusto-data/src/types/mod.rs @@ -90,6 +90,20 @@ kusto_from_str!(KustoReal, f64, ParseError::Float); kusto_from_str!(KustoDecimal, Decimal, ParseError::Decimal); kusto_from_str!(KustoGuid, uuid::Uuid, ParseError::Guid); +enum KustoValue { + Bool(KustoBool), + Int(KustoInt), + Long(KustoLong), + Real(KustoReal), + Decimal(KustoDecimal), + String(KustoString), + Guid(KustoGuid), + DateTime(KustoDateTime), + TimeSpan(KustoTimespan), + Dynamic(KustoDynamic), +} + + impl FromStr for KustoString { type Err = Infallible; diff --git a/azure-kusto-data/src/types/timespan.rs b/azure-kusto-data/src/types/timespan.rs index 1e9bd31..2ed248c 100644 --- a/azure-kusto-data/src/types/timespan.rs +++ b/azure-kusto-data/src/types/timespan.rs @@ -26,7 +26,7 @@ pub struct KustoTimespan(pub Option); impl KustoTimespan { /// Creates a new `KustoTimespan` from a `std::time::Duration`. - fn new(duration: Duration) -> Self { + pub fn new(duration: Duration) -> Self { Self(Some(duration)) } diff --git a/azure-kusto-data/tests/inputs/v2/partialError.json b/azure-kusto-data/tests/inputs/v2/partialError.json new file mode 100644 index 0000000..c915ad9 --- /dev/null +++ b/azure-kusto-data/tests/inputs/v2/partialError.json @@ -0,0 +1,7 @@ +[{"FrameType":"DataSetHeader","IsProgressive":false,"Version":"v2.0","IsFragmented":true,"ErrorReportingPlacement":"EndOfTable"} +,{"FrameType":"DataTable","TableId":0,"TableKind":"QueryProperties","TableName":"@ExtendedProperties","Columns":[{"ColumnName":"TableId","ColumnType":"int"},{"ColumnName":"Key","ColumnType":"string"},{"ColumnName":"Value","ColumnType":"dynamic"}],"Rows":[[1,"Visualization","{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}"]]} +,{"FrameType":"TableHeader","TableId":1,"TableKind":"PrimaryResult","TableName":"PrimaryResult","Columns":[{"ColumnName":"A","ColumnType":"int"}]} +,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":1,"Rows":[[1]]} +,{"FrameType":"TableCompletion","TableId":1,"RowCount":1,"OneApiErrors":[{"error":{"code":"LimitsExceeded","message":"Request is invalid and cannot be executed.","@type":"Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException","@message":"Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..","@context":{"timestamp":"2023-11-28T08:30:06.4085369Z","serviceAlias":"","machineName":"KSEngine000000","processName":"Kusto.WinSvc.Svc","processId":4900,"threadId":6828,"clientRequestId":"blab6","activityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","subActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityType":"GW.Http.CallContext","parentActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityStack":"(Activity stack: CRID=blab6 ARID=123e27de-1e4e-49d9-b579-fe0b331d3642 > GW.Http.CallContext/123e27de-1e4e-49d9-b579-fe0b331d3642)"},"@permanent":false}}]} +,{"FrameType":"DataSetCompletion","HasErrors":true,"Cancelled":false,"OneApiErrors":[{"error":{"code":"LimitsExceeded","message":"Request is invalid and cannot be executed.","@type":"Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException","@message":"Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..","@context":{"timestamp":"2023-11-28T08:30:06.4085369Z","serviceAlias":"","machineName":"KSEngine000000","processName":"Kusto.WinSvc.Svc","processId":4900,"threadId":6828,"clientRequestId":"blab6","activityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","subActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityType":"GW.Http.CallContext","parentActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityStack":"(Activity stack: CRID=blab6 ARID=123e27de-1e4e-49d9-b579-fe0b331d3642 > GW.Http.CallContext/123e27de-1e4e-49d9-b579-fe0b331d3642)"},"@permanent":false}}]} +] diff --git a/azure-kusto-data/tests/inputs/v2/partialErrorFullDataset.json b/azure-kusto-data/tests/inputs/v2/partialErrorFullDataset.json new file mode 100644 index 0000000..696ed71 --- /dev/null +++ b/azure-kusto-data/tests/inputs/v2/partialErrorFullDataset.json @@ -0,0 +1 @@ +[{"FrameType":"DataSetHeader","IsProgressive":false,"Version":"v2.0","IsFragmented":false,"ErrorReportingPlacement":"InData"},{"FrameType":"DataTable","TableId":0,"TableKind":"QueryProperties","TableName":"@ExtendedProperties","Columns":[{"ColumnName":"TableId","ColumnType":"int"},{"ColumnName":"Key","ColumnType":"string"},{"ColumnName":"Value","ColumnType":"dynamic"}],"Rows":[[1,"Visualization","{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}"],{"OneApiErrors":[{"error":{"code":"LimitsExceeded","message":"Request is invalid and cannot be executed.","@type":"Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException","@message":"Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..","@context":{"timestamp":"2023-12-18T08:25:05.8871389Z","serviceAlias":"ASAF","machineName":"KSEngine000000","processName":"Kusto.WinSvc.Svc","processId":4900,"threadId":4852,"clientRequestId":"blab6","activityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","subActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityType":"GW.Http.CallContext","parentActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityStack":"(Activity stack: CRID=blab6 ARID=d6a331d8-4b0e-498b-b72f-9f842b86e0b2 > GW.Http.CallContext/d6a331d8-4b0e-498b-b72f-9f842b86e0b2)"},"@permanent":false}}]}]},{"FrameType":"DataSetCompletion","HasErrors":true,"Cancelled":false,"OneApiErrors":[{"error":{"code":"LimitsExceeded","message":"Request is invalid and cannot be executed.","@type":"Kusto.Data.Exceptions.KustoServicePartialQueryFailureLimitsExceededException","@message":"Query execution has exceeded the allowed limits (80DA0003): The results of this query exceed the set limit of 1 records, so not all records were returned (E_QUERY_RESULT_SET_TOO_LARGE, 0x80DA0003). See https://aka.ms/kustoquerylimits for more information and possible solutions..","@context":{"timestamp":"2023-12-18T08:25:05.8871389Z","serviceAlias":"ASAF","machineName":"KSEngine000000","processName":"Kusto.WinSvc.Svc","processId":4900,"threadId":4852,"clientRequestId":"blab6","activityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","subActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityType":"GW.Http.CallContext","parentActivityId":"123e27de-1e4e-49d9-b579-fe0b331d3642","activityStack":"(Activity stack: CRID=blab6 ARID=d6a331d8-4b0e-498b-b72f-9f842b86e0b2 > GW.Http.CallContext/d6a331d8-4b0e-498b-b72f-9f842b86e0b2)"},"@permanent":false}}]}] \ No newline at end of file diff --git a/azure-kusto-data/tests/inputs/v2/twoTables.json b/azure-kusto-data/tests/inputs/v2/twoTables.json new file mode 100644 index 0000000..3d4a925 --- /dev/null +++ b/azure-kusto-data/tests/inputs/v2/twoTables.json @@ -0,0 +1,13 @@ +[{"FrameType":"DataSetHeader","IsProgressive":false,"Version":"v2.0","IsFragmented":true,"ErrorReportingPlacement":"EndOfTable"} +,{"FrameType":"DataTable","TableId":0,"TableKind":"QueryProperties","TableName":"@ExtendedProperties","Columns":[{"ColumnName":"TableId","ColumnType":"int"},{"ColumnName":"Key","ColumnType":"string"},{"ColumnName":"Value","ColumnType":"dynamic"}],"Rows":[[1,"Visualization","{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}"],[2,"Visualization","{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}"]]} +,{"FrameType":"TableHeader","TableId":1,"TableKind":"PrimaryResult","TableName":"PrimaryResult","Columns":[{"ColumnName":"A","ColumnType":"int"}]} +,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":1,"Rows":[[1]]} +,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":1,"Rows":[[2], [3]]} +,{"FrameType":"TableCompletion","TableId":1,"RowCount":3} +,{"FrameType":"TableHeader","TableId":2,"TableKind":"PrimaryResult","TableName":"PrimaryResult","Columns":[{"ColumnName":"A","ColumnType":"string"},{"ColumnName":"B","ColumnType":"int"}]} +,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":2,"Rows":[["a",1]]} +,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":2,"Rows":[["b",2], ["c",3]]} +,{"FrameType":"TableCompletion","TableId":2,"RowCount":3} +,{"FrameType":"DataTable","TableId":3,"TableKind":"QueryCompletionInformation","TableName":"QueryCompletionInformation","Columns":[{"ColumnName":"Timestamp","ColumnType":"datetime"},{"ColumnName":"ClientRequestId","ColumnType":"string"},{"ColumnName":"ActivityId","ColumnType":"guid"},{"ColumnName":"SubActivityId","ColumnType":"guid"},{"ColumnName":"ParentActivityId","ColumnType":"guid"},{"ColumnName":"Level","ColumnType":"int"},{"ColumnName":"LevelName","ColumnType":"string"},{"ColumnName":"StatusCode","ColumnType":"int"},{"ColumnName":"StatusCodeName","ColumnType":"string"},{"ColumnName":"EventType","ColumnType":"int"},{"ColumnName":"EventTypeName","ColumnType":"string"},{"ColumnName":"Payload","ColumnType":"string"}],"Rows":[["2023-11-28T11:13:43.2514779Z","blab6","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642",4,"Info",0,"S_OK (0)",4,"QueryInfo","{\"Count\":1,\"Text\":\"Query completed successfully\"}"],["2023-11-28T11:13:43.2514779Z","blab6","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642",4,"Info",0,"S_OK (0)",5,"WorkloadGroup","{\"Count\":1,\"Text\":\"default\"}"],["2023-11-28T11:13:43.2514779Z","blab6","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642",4,"Info",0,"S_OK (0)",6,"EffectiveRequestOptions","{\"Count\":1,\"Text\":\"{\\\"DataScope\\\":\\\"All\\\",\\\"QueryConsistency\\\":\\\"strongconsistency\\\",\\\"MaxMemoryConsumptionPerIterator\\\":5368709120,\\\"MaxMemoryConsumptionPerQueryPerNode\\\":8589346816,\\\"QueryFanoutNodesPercent\\\":100,\\\"QueryFanoutThreadsPercent\\\":100}\"}"],["2023-11-28T11:13:43.2514779Z","blab6","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642",6,"Stats",0,"S_OK (0)",0,"QueryResourceConsumption","{\"ExecutionTime\":0.0,\"resource_usage\":{\"cache\":{\"memory\":{\"hits\":0,\"misses\":0,\"total\":0},\"disk\":{\"hits\":0,\"misses\":0,\"total\":0},\"shards\":{\"hot\":{\"hitbytes\":0,\"missbytes\":0,\"retrievebytes\":0},\"cold\":{\"hitbytes\":0,\"missbytes\":0,\"retrievebytes\":0},\"bypassbytes\":0}},\"cpu\":{\"user\":\"00:00:00\",\"kernel\":\"00:00:00\",\"total cpu\":\"00:00:00\"},\"memory\":{\"peak_per_node\":524384},\"network\":{\"inter_cluster_total_bytes\":1099,\"cross_cluster_total_bytes\":0}},\"input_dataset_statistics\":{\"extents\":{\"total\":0,\"scanned\":0,\"scanned_min_datetime\":\"0001-01-01T00:00:00.0000000Z\",\"scanned_max_datetime\":\"0001-01-01T00:00:00.0000000Z\"},\"rows\":{\"total\":0,\"scanned\":0},\"rowstores\":{\"scanned_rows\":0,\"scanned_values_size\":0},\"shards\":{\"queries_generic\":0,\"queries_specialized\":0}},\"dataset_statistics\":[{\"table_row_count\":3,\"table_size\":15},{\"table_row_count\":3,\"table_size\":43}],\"cross_cluster_resource_usage\":{}}"]]} +,{"FrameType":"DataSetCompletion","HasErrors":false,"Cancelled":false} +] diff --git a/azure-kusto-data/tests/inputs/v2/validFrames.json b/azure-kusto-data/tests/inputs/v2/validFrames.json new file mode 100644 index 0000000..0ec1b12 --- /dev/null +++ b/azure-kusto-data/tests/inputs/v2/validFrames.json @@ -0,0 +1,8 @@ +[{"FrameType":"DataSetHeader","IsProgressive":false,"Version":"v2.0","IsFragmented":true,"ErrorReportingPlacement":"EndOfTable"} +,{"FrameType":"DataTable","TableId":0,"TableKind":"QueryProperties","TableName":"@ExtendedProperties","Columns":[{"ColumnName":"TableId","ColumnType":"int"},{"ColumnName":"Key","ColumnType":"string"},{"ColumnName":"Value","ColumnType":"dynamic"}],"Rows":[[1,"Visualization","{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}"]]} +,{"FrameType":"TableHeader","TableId":1,"TableKind":"PrimaryResult","TableName":"AllDataTypes","Columns":[{"ColumnName":"vnum","ColumnType":"int"},{"ColumnName":"vdec","ColumnType":"decimal"},{"ColumnName":"vdate","ColumnType":"datetime"},{"ColumnName":"vspan","ColumnType":"timespan"},{"ColumnName":"vobj","ColumnType":"dynamic"},{"ColumnName":"vb","ColumnType":"bool"},{"ColumnName":"vreal","ColumnType":"real"},{"ColumnName":"vstr","ColumnType":"string"},{"ColumnName":"vlong","ColumnType":"long"},{"ColumnName":"vguid","ColumnType":"guid"}]} +,{"FrameType":"TableFragment","TableFragmentType":"DataAppend","TableId":1,"Rows":[[1,"2.00000000000001","2020-03-04T14:05:01.3109965Z","01:23:45.6789000",{"moshe":"value"},true,0.01,"asdf",9223372036854775807,"123e27de-1e4e-49d9-b579-fe0b331d3642"]]} +,{"FrameType":"TableCompletion","TableId":1,"RowCount":1} +,{"FrameType":"DataTable","TableId":2,"TableKind":"QueryCompletionInformation","TableName":"QueryCompletionInformation","Columns":[{"ColumnName":"Timestamp","ColumnType":"datetime"},{"ColumnName":"ClientRequestId","ColumnType":"string"},{"ColumnName":"ActivityId","ColumnType":"guid"},{"ColumnName":"SubActivityId","ColumnType":"guid"},{"ColumnName":"ParentActivityId","ColumnType":"guid"},{"ColumnName":"Level","ColumnType":"int"},{"ColumnName":"LevelName","ColumnType":"string"},{"ColumnName":"StatusCode","ColumnType":"int"},{"ColumnName":"StatusCodeName","ColumnType":"string"},{"ColumnName":"EventType","ColumnType":"int"},{"ColumnName":"EventTypeName","ColumnType":"string"},{"ColumnName":"Payload","ColumnType":"string"}],"Rows":[["2023-11-26T13:34:17.0731478Z","blab6","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642",4,"Info",0,"S_OK (0)",4,"QueryInfo","{\"Count\":1,\"Text\":\"Query completed successfully\"}"],["2023-11-26T13:34:17.0731478Z","blab6","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642","123e27de-1e4e-49d9-b579-fe0b331d3642",4,"Info",0,"S_OK (0)",5,"WorkloadGroup","{\"Count\":1,\"Text\":\"default\"}"]]} +,{"FrameType":"DataSetCompletion","HasErrors":false,"Cancelled":false} +]