Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions sdk/core/azure_core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@

- Added `get_async_runtime()` and `set_async_runtime()` to allow customers to replace the asynchronous runtime used by the Azure SDK.
- Added `PageIterator::continuation_token()` and `PageIterator::with_continuation_token()` to support reconstructing a `PageIterator` in another process or on another machine to continue paging.
- Added `Poller<T>` for long-running operations (LROs).
- Added `Request::set_method()` to allow changing the HTTP method of a request.
- Added `StatusMonitor` for long-running operations.

### Breaking Changes

- Added `http::PollerOptions` parameter to `http::poller::get_retry_after`.
- Implemented `FromStr` where `FromStr::Err = Infallible` for `PollerStatus` instead of `From<&str>`.
- Minimum supported Rust version (MSRV) is now 1.85.
- `azure_core::http::Pipeline::new` now takes an `azure_core::http::ClientOptions` which is defined in `azure_core`, but convertible to `typespec_client_core::http::ClientOptions`.
- Moved `process::Executor` to `azure_identity`.
- Removed `Pipeline::replace_policy`.
- Removed unused `location` and `body` modules from `http::poller`.
- Renamed `azure_core::date` to `azure_core::time` and added `azure_core::time::Duration` as the standard "duration" type for the SDK.
- Renamed `http::poller::body_content` to `http::poller::body`.
- Renamed `PagerResult::More { next }` to `continuation`.
- Renamed `PollerStatus::Other` to `PollerStatus::UnknownValue` following [guidelines](https://azure.github.io/azure-sdk/rust_introduction.html#rust-enum-extensible).
- Renamed `TelemetryOptions` to `UserAgentOptions`.
- Renamed `TelemetryPolicy` to `UserAgentPolicy`.

Expand Down
1 change: 1 addition & 0 deletions sdk/core/azure_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ rustc_version.workspace = true
[dev-dependencies]
azure_core_test.workspace = true
azure_identity.workspace = true
azure_security_keyvault_certificates.path = "../../keyvault/azure_security_keyvault_certificates"
azure_security_keyvault_secrets.path = "../../keyvault/azure_security_keyvault_secrets"
criterion.workspace = true
thiserror.workspace = true
Expand Down
116 changes: 115 additions & 1 deletion sdk/core/azure_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

### Consuming service methods returning `Pager<T>`

If a service call returns multiple values in pages, it would return `Result<Pager<T>>` as a result. You can iterate all items from all pages.
If a service call returns multiple values in pages, it should return `Result<Pager<T>>` as a result. You can iterate all items from all pages.

```rust no_run
use azure_identity::DefaultAzureCredential;
Expand Down Expand Up @@ -246,6 +246,120 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
```

### Consuming service methods returning `Poller<T>`

If a service call may take a while to process, it should return `Result<Poller<T>>` as a result, representing a long-running operation (LRO).
The `Poller<T>` implements `futures::Stream` so you can asynchronously iterate over each status monitor update:

```rust no_run
use azure_identity::DefaultAzureCredential;
use azure_security_keyvault_certificates::{
CertificateClient, CertificateClientExt,
models::{CreateCertificateParameters, CertificatePolicy, X509CertificateProperties, IssuerParameters},
};
use futures::stream::TryStreamExt as _;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let credential = DefaultAzureCredential::new()?;
let client = CertificateClient::new(
"https://your-key-vault-name.vault.azure.net/",
credential.clone(),
None,
)?;

// Create a self-signed certificate.
let policy = CertificatePolicy {
x509_certificate_properties: Some(X509CertificateProperties {
subject: Some("CN=DefaultPolicy".into()),
..Default::default()
}),
issuer_parameters: Some(IssuerParameters {
name: Some("Self".into()),
..Default::default()
}),
..Default::default()
};
let body = CreateCertificateParameters {
certificate_policy: Some(policy),
..Default::default()
};

// Wait for the certificate operation to complete.
// The Poller implements futures::Stream and automatically waits between polls.
let mut poller = client.begin_create_certificate("certificate-name", body.try_into()?, None)?;
while let Some(operation) = poller.try_next().await? {
let operation = operation.into_body().await?;
match operation.status.as_deref().unwrap_or("unknown") {
"inProgress" => continue,
"completed" => {
let target = operation.target.ok_or("expected target")?;
println!("Created certificate {}", target);
break;
},
status => Err(format!("operation terminated with status {status}"))?,
}
}

Ok(())
}
```

If you just want to wait until the `Poller<T>` is complete and get the last status monitor, you can await `wait()`:

```rust no_run
use azure_identity::DefaultAzureCredential;
use azure_security_keyvault_certificates::{
CertificateClient, CertificateClientExt,
models::{CreateCertificateParameters, CertificatePolicy, X509CertificateProperties, IssuerParameters},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let credential = DefaultAzureCredential::new()?;
let client = CertificateClient::new(
"https://your-key-vault-name.vault.azure.net/",
credential.clone(),
None,
)?;

// Create a self-signed certificate.
let policy = CertificatePolicy {
x509_certificate_properties: Some(X509CertificateProperties {
subject: Some("CN=DefaultPolicy".into()),
..Default::default()
}),
issuer_parameters: Some(IssuerParameters {
name: Some("Self".into()),
..Default::default()
}),
..Default::default()
};
let body = CreateCertificateParameters {
certificate_policy: Some(policy),
..Default::default()
};

// Wait for the certificate operation to complete and get the last status monitor.
let operation = client
.begin_create_certificate("certificate-name", body.try_into()?, None)?
.wait()
.await?
// Deserialize the CertificateOperation:
.into_body()
.await?;

if matches!(operation.status, Some(status) if status == "completed") {
let target = operation.target.ok_or("expected target")?;
println!("Created certificate {}", target);
}

Ok(())
}
```

Awaiting `wait()` will only fail if the HTTP status code does not indicate successfully fetching the status monitor.

### Replacing the async runtime

Internally, the Azure SDK uses either the `tokio` async runtime (with the `tokio` feature), or it implements asynchronous functionality using functions in the `std` namespace.
Expand Down
5 changes: 3 additions & 2 deletions sdk/core/azure_core/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ pub use models::*;
pub use options::*;
pub use pager::*;
pub use pipeline::*;
pub use poller::{Poller, PollerStatus};
pub use request::{Body, Request, RequestContent};
pub use response::{RawResponse, Response};

pub use typespec_client_core::http::response;
pub use typespec_client_core::http::{
new_http_client, AppendToUrlQuery, Context, Format, HttpClient, JsonFormat, Method, NoFormat,
StatusCode, Url,
new_http_client, AppendToUrlQuery, Context, DeserializeWith, Format, HttpClient, JsonFormat,
Method, NoFormat, StatusCode, Url,
};

#[cfg(feature = "xml")]
Expand Down
36 changes: 17 additions & 19 deletions sdk/core/azure_core/src/http/pager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use crate::http::{headers::HeaderName, response::Response};
use crate::http::{headers::HeaderName, response::Response, DeserializeWith, Format, JsonFormat};
use async_trait::async_trait;
use futures::{stream::unfold, FutureExt, Stream};
use std::{
Expand All @@ -12,8 +12,6 @@ use std::{
sync::{Arc, Mutex},
task,
};
use typespec::Error;
use typespec_client_core::http::{DeserializeWith, Format, JsonFormat};

/// The result of fetching a single page from a [`Pager`], whether there are more pages or paging is done.
pub enum PagerResult<P, C: AsRef<str>> {
Expand Down Expand Up @@ -85,10 +83,10 @@ where
pub type Pager<P, F = JsonFormat> = ItemIterator<Response<P, F>>;

#[cfg(not(target_arch = "wasm32"))]
type BoxedStream<P> = Box<dyn Stream<Item = Result<P, Error>> + Send>;
type BoxedStream<P> = Box<dyn Stream<Item = crate::Result<P>> + Send>;

#[cfg(target_arch = "wasm32")]
type BoxedStream<P> = Box<dyn Stream<Item = Result<P, Error>>>;
type BoxedStream<P> = Box<dyn Stream<Item = crate::Result<P>>>;

/// Iterates over a collection of items or individual pages of items from a service.
///
Expand Down Expand Up @@ -214,23 +212,23 @@ impl<P: Page> ItemIterator<P> {
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] F: Fn(Option<C>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef<str> + 'static,
#[cfg(target_arch = "wasm32")] F: Fn(Option<C>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
>(
make_request: F,
) -> Self {
Self::from_stream(iter_from_callback(make_request, || None, |_| {}))
}

/// Creates a [`ItemIterator<P>`] from a raw stream of [`Result<P>`](typespec::Result<P>) values.
/// Creates a [`ItemIterator<P>`] from a raw stream of [`Result<P>`](crate::Result<P>) values.
///
/// This constructor is used when you are implementing a completely custom stream and want to use it as a pager.
pub fn from_stream<
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = Result<P, Error>> + Send + 'static,
#[cfg(target_arch = "wasm32")] S: Stream<Item = Result<P, Error>> + 'static,
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = crate::Result<P>> + Send + 'static,
#[cfg(target_arch = "wasm32")] S: Stream<Item = crate::Result<P>> + 'static,
>(
stream: S,
) -> Self {
Expand All @@ -254,7 +252,7 @@ impl<P: Page> ItemIterator<P> {
}

impl<P: Page> futures::Stream for ItemIterator<P> {
type Item = Result<P::Item, Error>;
type Item = crate::Result<P::Item>;

fn poll_next(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -398,10 +396,10 @@ impl<P> PageIterator<P> {
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] F: Fn(Option<C>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
#[cfg(target_arch = "wasm32")] F: Fn(Option<C>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
>(
make_request: F,
) -> Self
Expand Down Expand Up @@ -441,8 +439,8 @@ impl<P> PageIterator<P> {
/// This constructor is used when you are implementing a completely custom stream and want to use it as a pager.
pub fn from_stream<
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = Result<P, Error>> + Send + 'static,
#[cfg(target_arch = "wasm32")] S: Stream<Item = Result<P, Error>> + 'static,
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = crate::Result<P>> + Send + 'static,
#[cfg(target_arch = "wasm32")] S: Stream<Item = crate::Result<P>> + 'static,
>(
stream: S,
) -> Self {
Expand Down Expand Up @@ -509,7 +507,7 @@ impl<P> PageIterator<P> {
}

impl<P> futures::Stream for PageIterator<P> {
type Item = Result<P, Error>;
type Item = crate::Result<P>;

fn poll_next(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -537,19 +535,19 @@ fn iter_from_callback<
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] F: Fn(Option<C>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] G: Fn() -> Option<C> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] S: Fn(Option<&str>) + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef<str> + 'static,
#[cfg(target_arch = "wasm32")] F: Fn(Option<C>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
#[cfg(target_arch = "wasm32")] G: Fn() -> Option<C> + 'static,
#[cfg(target_arch = "wasm32")] S: Fn(Option<&str>) + 'static,
>(
make_request: F,
get_next: G,
set_next: S,
) -> impl Stream<Item = Result<P, Error>> + 'static {
) -> impl Stream<Item = crate::Result<P>> + 'static {
unfold(
// We flow the `make_request` callback, 'get_next', and `set_next` through the state value so that we can avoid cloning.
(State::Init, make_request, get_next, set_next),
Expand Down
Loading