diff --git a/azure-kusto-data/src/models/v2/known_tables.rs b/azure-kusto-data/src/models/v2/known_tables.rs new file mode 100644 index 0000000..63335ee --- /dev/null +++ b/azure-kusto-data/src/models/v2/known_tables.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Serialize}; +use crate::{KustoInt, KustoString, KustoDateTime}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct QueryProperties { + table_id: KustoInt, + key: KustoString, + value: KustoDynamic, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct QueryCompletionInformation { + timestamp: KustoDateTime, + client_request_id: KustoString, + activity_id: KustoGuid, + sub_activity_id: KustoGuid, + parent_activity_id: KustoGuid, + level: KustoInt, + level_name: KustoString, + status_code: KustoInt, + status_code_name: KustoString, + event_type: KustoInt, + event_type_name: KustoString, + payload: KustoString, +} diff --git a/azure-kusto-data/src/models/v2/mod.rs b/azure-kusto-data/src/models/v2/mod.rs index c18b7d1..5eaa1a3 100644 --- a/azure-kusto-data/src/models/v2/mod.rs +++ b/azure-kusto-data/src/models/v2/mod.rs @@ -4,10 +4,13 @@ use serde::{Deserialize, Serialize}; mod consts; mod errors; mod frames; +mod known_tables; pub use consts::*; pub use errors::*; pub use frames::*; +pub use known_tables::*; +use crate::error::Error; /// A result of a V2 query. /// Could be a table, a part of a table, or metadata about the dataset. @@ -65,4 +68,33 @@ impl Row { } } +impl DataTable { + pub fn collect_values(&self) -> (serde_json::Value, Vec) { + let mut errors = vec![]; + let mut values = vec![]; + for row in &self.rows { + match row.clone().into_result() { + Ok(v) => values.push(serde_json::Value::Array(v)), + Err(e) => errors.push(e), + } + } + (serde_json::Value::Array(values), errors) + } + + pub fn deserialize_values(&self) -> (Vec, Vec) { + let mut errors = vec![]; + let mut values = vec![]; + for row in &self.rows { + match row.clone().into_result() { + Ok(v) => match serde_json::from_value::(serde_json::Value::Array(v)) { + Ok(v) => values.push(v), + Err(e) => errors.push(e.into()), + }, + Err(e) => errors.push(e.into()), + } + } + (values, errors) + } +} + pub type DataSet = Vec; diff --git a/azure-kusto-data/src/operations/v2.rs b/azure-kusto-data/src/operations/v2.rs index 4afb7ba..3c59fcc 100644 --- a/azure-kusto-data/src/operations/v2.rs +++ b/azure-kusto-data/src/operations/v2.rs @@ -1,6 +1,10 @@ +use std::sync::Arc; use crate::error::{Error::JsonError, Result}; use crate::models::v2; -use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream}; +use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt}; +use futures::lock::Mutex; +use tokio::spawn; +use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind}; pub fn parse_frames_iterative( reader: impl AsyncBufRead + Unpin, @@ -30,3 +34,103 @@ pub async fn parse_frames_full( reader.read_to_end(&mut buf).await?; return Ok(serde_json::from_slice(&buf)?); } + +struct Dataset { + header : Option, + completion : Option, + query_properties : Option>, + query_completion_information : Option>, + results : Vec, +} + +impl Dataset { + async fn from_stream(mut stream: impl Stream>) -> Result { + let mut dataset = Dataset { + header : None, + completion : None, + query_properties : None, + query_completion_information : None, + results : Vec::new(), + }; + let mut current_table = Some(DataTable { + table_id: 0, + table_name: "".to_string(), + table_kind: TableKind::PrimaryResult, + columns: Vec::new(), + rows: Vec::new(), + }); + + while let Some(frame) = stream.try_next().await? { + match frame { + v2::Frame::DataSetHeader(header) => { + dataset.header = Some(header); + }, + v2::Frame::DataSetCompletion(completion) => { + dataset.completion = Some(completion); + }, + // TODO: properly handle errors/missing + v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => { + dataset.query_properties.replace(table.deserialize_values::().expect("failed to deserialize query properties")); + }, + v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => { + dataset.query_completion_information.replace(table.deserialize_values::().expect("failed to deserialize query completion information")); + }, + v2::Frame::DataTable(table) => { + dataset.results.push(table); + }, + // TODO - handle errors + v2::Frame::TableHeader(table_header) => { + if let Some(table) = &mut current_table { + table.table_id = table_header.table_id; + table.table_name = table_header.table_name.clone(); + table.table_kind = table_header.table_kind; + table.columns = table_header.columns.clone(); + } + } + v2::Frame::TableFragment(table_fragment) => { + if let Some(table) = &mut current_table { + table.rows.extend(table_fragment.rows); + } + } + v2::Frame::TableCompletion(table_completion) => { + if let Some(table) = current_table.take() { + dataset.results.push(table); + } + } + Frame::TableProgress(_) => {} + } + } + Ok(dataset) + } +} + + +/// Arc Mutex +type M = Arc>; +/// Arc Mutex Option +type OM = M>; + +struct StreamingDataset { + header : OM, + completion : OM, + query_properties : OM>, + query_completion_information : OM>, + results : M>, + stream : M>>, +} + +impl StreamingDataset { + fn new(stream: impl Stream> + Send + 'static) -> Self { + StreamingDataset { + header: Arc::new(Mutex::new(None)), + completion: Arc::new(Mutex::new(None)), + query_properties: Arc::new(Mutex::new(None)), + query_completion_information: Arc::new(Mutex::new(None)), + results: Arc::new(Mutex::new(Vec::new())), + stream: Arc::new(Mutex::new(stream)), + }; + // TODO: to spawn a task we have to have a runtime. We wanted to be runtime independent, and that may still be a desire, but currently azure core isn't, so we might as well use tokio here. + tokio::spawn( + } + +}