Skip to content

Commit

Permalink
Working Further
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Jan 4, 2024
1 parent 71aaff7 commit 5b3e4e0
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 50 deletions.
81 changes: 78 additions & 3 deletions azure-kusto-data/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@

use crate::authorization_policy::AuthorizationPolicy;
use crate::connection_string::{ConnectionString, ConnectionStringAuth};
use crate::error::{Error, Result};
use crate::error::{Error, ParseError, Partial, Result};
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};

use azure_core::{ClientOptions, Pipeline};
use azure_core::{ClientOptions, Context, CustomHeaders, Method, Pipeline, Request, Response, ResponseBody};

Check warning on line 9 in azure-kusto-data/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `Response`

warning: unused import: `Response` --> azure-kusto-data/src/client.rs:9:71 | 9 | ClientOptions, Context, CustomHeaders, Method, Pipeline, Request, Response, ResponseBody, | ^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use crate::client_details::ClientDetails;
use crate::models::v2::Row;
use crate::prelude::ClientRequestProperties;
use crate::prelude::{ClientRequestProperties, ClientRequestPropertiesBuilder, OptionsBuilder};
use azure_core::headers::Headers;
use azure_core::prelude::{Accept, AcceptEncoding, ClientVersion, ContentType};

Check warning on line 14 in azure-kusto-data/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `crate::operations`

warning: unused import: `crate::operations` --> azure-kusto-data/src/client.rs:14:5 | 14 | use crate::operations; | ^^^^^^^^^^^^^^^^^
use serde::de::DeserializeOwned;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;

Check warning on line 18 in azure-kusto-data/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `crate::request_options::Options`

warning: unused import: `crate::request_options::Options` --> azure-kusto-data/src/client.rs:18:5 | 18 | use crate::request_options::Options; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
use futures::TryStreamExt;
use crate::operations;
use crate::operations::v2::{FullDataset, IterativeDataset};
use crate::query::QueryBody;
use crate::request_options::Options;

/// Options for specifying how a Kusto client will behave
#[derive(Clone, Default)]
Expand Down Expand Up @@ -285,6 +290,76 @@ impl KustoClient {
) -> V1QueryRunner {
V1QueryRunner(self.execute_with_options(database, query, QueryKind::Management, options))
}

#[must_use]
pub async fn query(&self, database: impl Into<String>, query: impl Into<String>, options: impl Into<Option<ClientRequestProperties>>) -> Partial<FullDataset> {
let body = self.execute(QueryKind::Query, database.into(), query.into(), options.into()).await.map_err(|e| (None, e))?;

FullDataset::from_async_buf_read(body.into_stream().map_err(|e| std::io::Error::other(e)).into_async_read()).await
}

#[must_use]
pub async fn iterative_query(&self, database: impl Into<String>, query: impl Into<String>, options: impl Into<Option<ClientRequestProperties>>) -> Result<Arc<IterativeDataset>> {
let iterative_options = ClientRequestPropertiesBuilder::default().with_options(
OptionsBuilder::default()
.with_results_v2_newlines_between_frames(true)
.with_results_v2_fragment_primary_tables(true)
.with_error_reporting_placement("end_of_table").build().expect("Failed to build options"))
.build()
.expect("Failed to build options");

//TODO merge options

let body = self.execute(QueryKind::Query, database.into(), query.into(), iterative_options.into()).await?;
Ok(IterativeDataset::from_async_buf_read(body.into_stream().map_err(|e| std::io::Error::other(e)).into_async_read()).await)
}

async fn execute(&self, kind: QueryKind, database: String, query: String, options: Option<ClientRequestProperties>) -> Result<ResponseBody> {
let url = match kind {
QueryKind::Management => self.management_url(),
QueryKind::Query => self.query_url(),
};
let mut context = Context::new();

let mut request = Request::new(url.parse().map_err(ParseError::from)?, Method::Post);

let mut headers = (*self.default_headers).clone();


if let Some(client_request_properties) = &options {
if let Some(client_request_id) = &client_request_properties.client_request_id {
headers.insert("x-ms-client-request-id", client_request_id);
}

if let Some(application) = &client_request_properties.application {
headers.insert("x-ms-app", application);
}
}
context.insert(CustomHeaders::from(headers));

let body = QueryBody {
db: database,
csl: query,
properties: options,
};

let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
request.set_body(bytes);

let response = self
.pipeline()
.send(&mut context, &mut request)
.await?;

let status = response.status();
if !status.is_success() {
let body = response.into_body().collect_string().await;

return Err(Error::HttpError(status, body.unwrap_or_else(|e| format!("{:?}", e))));
}

Check failure on line 359 in azure-kusto-data/src/client.rs

View workflow job for this annotation

GitHub Actions / clippy

`std::sync::Arc<operations::v2::IterativeDataset>` is not a future

error[E0277]: `std::sync::Arc<operations::v2::IterativeDataset>` is not a future --> azure-kusto-data/src/client.rs:359:10 | 354 | Ok(IterativeDataset::from_async_buf_read( | ____________- 355 | | body.into_stream() 356 | | .map_err(|e| std::io::Error::other(e)) 357 | | .into_async_read(), 358 | | ) | |_________- this call returns `std::sync::Arc<operations::v2::IterativeDataset>` 359 | .await) | -^^^^^ | || | |`std::sync::Arc<operations::v2::IterativeDataset>` is not a future | help: remove the `.await` | = help: the trait `futures::Future` is not implemented for `std::sync::Arc<operations::v2::IterativeDataset>` = note: std::sync::Arc<operations::v2::IterativeDataset> must be a future or must implement `IntoFuture` to be awaited = note: required for `std::sync::Arc<operations::v2::IterativeDataset>` to implement `std::future::IntoFuture`

Ok(response.into_body())
}
}

impl TryFrom<ConnectionString> for KustoClient {
Expand Down
14 changes: 14 additions & 0 deletions azure-kusto-data/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Defines [Error] for representing failures in various operations.
use azure_core::StatusCode;
use std::fmt::Debug;

Check failure on line 3 in azure-kusto-data/src/error.rs

View workflow job for this annotation

GitHub Actions / clippy

unresolved import `oauth2`

error[E0432]: unresolved import `oauth2` --> azure-kusto-data/src/error.rs:3:5 | 3 | use oauth2::url; | ^^^^^^ use of undeclared crate or module `oauth2`
use oauth2::url;

use crate::models::v2::OneApiError;
use thiserror;
Expand Down Expand Up @@ -70,6 +71,16 @@ impl From<Vec<Error>> for Error {
}
}

impl From<Vec<OneApiError>> for Error {
fn from(errors: Vec<OneApiError>) -> Self {
if errors.len() == 1 {
Error::from(errors.into_iter().next().map(Error::QueryApiError).expect("Should be one"))
} else {
Error::MultipleErrors(errors.into_iter().map(Error::QueryApiError).collect())
}
}
}

/// Errors raised when parsing values.
#[derive(thiserror::Error, Debug)]
pub enum ParseError {
Expand Down Expand Up @@ -100,6 +111,9 @@ pub enum ParseError {
/// Raised when a dynamic value is failed to be parsed.
#[error("Error parsing dynamic: {0}")]
Dynamic(#[from] serde_json::Error),

#[error("Error parsing url: {0}")]
Url(#[from] url::ParseError),
}

/// Errors raised when parsing connection strings.
Expand Down
4 changes: 2 additions & 2 deletions azure-kusto-data/src/models/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
serde_json::Value::String("Visualization".to_string()),
serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()),
]),
Row::Error((OneApiErrors {
Row::Error(OneApiErrors {
errors: vec![OneApiError {
error_message: crate::models::v2::ErrorMessage {
code: "LimitsExceeded".to_string(),
Expand All @@ -588,7 +588,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
is_permanent: false,
},
}]
})),
}),
],
}),
Frame::DataSetCompletion(DataSetCompletion {
Expand Down
2 changes: 1 addition & 1 deletion azure-kusto-data/src/models/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Into<Result<Vec<serde_json::Value>, Error>> for Row {
fn into(self) -> Result<Vec<serde_json::Value>, Error> {
match self {
Row::Values(v) => Ok(v),
Row::Error(e) => Err(e.errors.into_iter().map(Error::QueryApiError).collect::<Vec<_>>().into()),
Row::Error(e) => Err(e.errors.into()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion azure-kusto-data/src/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod query;
mod v2;
pub(crate) mod v2;
Loading

0 comments on commit 5b3e4e0

Please sign in to comment.