Skip to content

Commit

Permalink
just syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Jan 3, 2024
1 parent 2d4966c commit 2031f7f
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 1 deletion.
27 changes: 27 additions & 0 deletions azure-kusto-data/src/models/v2/known_tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use serde::{Deserialize, Serialize};
use crate::{KustoInt, KustoString, KustoDateTime};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct QueryProperties {
table_id: KustoInt,
key: KustoString,
value: KustoDynamic,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct QueryCompletionInformation {
timestamp: KustoDateTime,
client_request_id: KustoString,
activity_id: KustoGuid,
sub_activity_id: KustoGuid,
parent_activity_id: KustoGuid,
level: KustoInt,
level_name: KustoString,
status_code: KustoInt,
status_code_name: KustoString,
event_type: KustoInt,
event_type_name: KustoString,
payload: KustoString,
}
32 changes: 32 additions & 0 deletions azure-kusto-data/src/models/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use serde::{Deserialize, Serialize};
mod consts;
mod errors;
mod frames;
mod known_tables;

pub use consts::*;
pub use errors::*;
pub use frames::*;
pub use known_tables::*;
use crate::error::Error;

/// A result of a V2 query.
/// Could be a table, a part of a table, or metadata about the dataset.
Expand Down Expand Up @@ -65,4 +68,33 @@ impl Row {
}
}

impl DataTable {
pub fn collect_values(&self) -> (serde_json::Value, Vec<OneApiError>) {
let mut errors = vec![];
let mut values = vec![];
for row in &self.rows {
match row.clone().into_result() {
Ok(v) => values.push(serde_json::Value::Array(v)),
Err(e) => errors.push(e),
}
}
(serde_json::Value::Array(values), errors)
}

pub fn deserialize_values<T: serde::de::DeserializeOwned>(&self) -> (Vec<T>, Vec<Error>) {
let mut errors = vec![];
let mut values = vec![];
for row in &self.rows {
match row.clone().into_result() {
Ok(v) => match serde_json::from_value::<T>(serde_json::Value::Array(v)) {
Ok(v) => values.push(v),
Err(e) => errors.push(e.into()),
},
Err(e) => errors.push(e.into()),
}
}
(values, errors)
}
}

pub type DataSet = Vec<Frame>;
106 changes: 105 additions & 1 deletion azure-kusto-data/src/operations/v2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::sync::Arc;
use crate::error::{Error::JsonError, Result};
use crate::models::v2;
use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream};
use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt};
use futures::lock::Mutex;
use tokio::spawn;
use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind};

pub fn parse_frames_iterative(
reader: impl AsyncBufRead + Unpin,
Expand Down Expand Up @@ -30,3 +34,103 @@ pub async fn parse_frames_full(
reader.read_to_end(&mut buf).await?;
return Ok(serde_json::from_slice(&buf)?);
}

struct Dataset {
header : Option<v2::DataSetHeader>,
completion : Option<v2::DataSetCompletion>,
query_properties : Option<Vec<QueryProperties>>,
query_completion_information : Option<Vec<QueryCompletionInformation>>,
results : Vec<DataTable>,
}

impl Dataset {
async fn from_stream(mut stream: impl Stream<Item = Result<Frame>>) -> Result<Self> {
let mut dataset = Dataset {
header : None,
completion : None,
query_properties : None,
query_completion_information : None,
results : Vec::new(),
};
let mut current_table = Some(DataTable {
table_id: 0,
table_name: "".to_string(),
table_kind: TableKind::PrimaryResult,
columns: Vec::new(),
rows: Vec::new(),
});

while let Some(frame) = stream.try_next().await? {
match frame {
v2::Frame::DataSetHeader(header) => {
dataset.header = Some(header);
},
v2::Frame::DataSetCompletion(completion) => {
dataset.completion = Some(completion);
},
// TODO: properly handle errors/missing
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
dataset.query_properties.replace(table.deserialize_values::<QueryProperties>().expect("failed to deserialize query properties"));
},
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
dataset.query_completion_information.replace(table.deserialize_values::<QueryCompletionInformation>().expect("failed to deserialize query completion information"));
},
v2::Frame::DataTable(table) => {
dataset.results.push(table);
},
// TODO - handle errors
v2::Frame::TableHeader(table_header) => {
if let Some(table) = &mut current_table {
table.table_id = table_header.table_id;
table.table_name = table_header.table_name.clone();
table.table_kind = table_header.table_kind;
table.columns = table_header.columns.clone();
}
}
v2::Frame::TableFragment(table_fragment) => {
if let Some(table) = &mut current_table {
table.rows.extend(table_fragment.rows);
}
}
v2::Frame::TableCompletion(table_completion) => {
if let Some(table) = current_table.take() {
dataset.results.push(table);
}
}
Frame::TableProgress(_) => {}
}
}
Ok(dataset)
}
}


/// Arc Mutex
type M<T> = Arc<Mutex<T>>;
/// Arc Mutex Option
type OM<T> = M<Option<T>>;

struct StreamingDataset {
header : OM<v2::DataSetHeader>,
completion : OM<v2::DataSetCompletion>,
query_properties : OM<Vec<QueryProperties>>,
query_completion_information : OM<Vec<QueryCompletionInformation>>,
results : M<Vec<DataTable>>,
stream : M<dyn Stream<Item = Result<Frame>>>,
}

impl StreamingDataset {
fn new(stream: impl Stream<Item=Result<Frame>> + Send + 'static) -> Self {
StreamingDataset {
header: Arc::new(Mutex::new(None)),
completion: Arc::new(Mutex::new(None)),
query_properties: Arc::new(Mutex::new(None)),
query_completion_information: Arc::new(Mutex::new(None)),
results: Arc::new(Mutex::new(Vec::new())),
stream: Arc::new(Mutex::new(stream)),
};
// TODO: to spawn a task we have to have a runtime. We wanted to be runtime independent, and that may still be a desire, but currently azure core isn't, so we might as well use tokio here.
tokio::spawn(
}

}

0 comments on commit 2031f7f

Please sign in to comment.