Skip to content

Commit

Permalink
feat: add paginated query support
Browse files Browse the repository at this point in the history
  • Loading branch information
dtantsur committed Apr 13, 2020
1 parent e3a1509 commit a8bfdee
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ edition = "2018"

[features]

default = ["native-tls"]
default = ["native-tls", "stream"]
native-tls = ["reqwest/default-tls"]
rustls = ["reqwest/rustls-tls"]
stream = ["async-stream", "futures"]

[dependencies]

async-stream = { version = "^0.2", optional = true }
async-trait = "^0.1"
chrono = { version = "^0.4", features = ["serde"] }
dirs = "^2.0"
futures = { version = "^0.3", optional = true }
log = "^0.4"
osproto = "^0.2.0"
reqwest = { version = "^0.10", default-features = false, features = ["gzip", "json", "stream"] }
Expand Down
71 changes: 71 additions & 0 deletions examples/list-servers-paginated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2019 Dmitry Tantsur <divius.inside@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;
use std::str::FromStr;

use futures::pin_mut;
use futures::stream::TryStreamExt;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
pub struct Server {
pub id: String,
pub name: String,
}

#[derive(Debug, Deserialize)]
pub struct ServersRoot {
pub servers: Vec<Server>,
}

impl From<ServersRoot> for Vec<Server> {
fn from(value: ServersRoot) -> Vec<Server> {
value.servers
}
}

impl osauth::stream::Resource for Server {
type Id = String;
type Root = ServersRoot;
fn resource_id(&self) -> Self::Id {
self.id.clone()
}
}

#[tokio::main]
async fn main() {
env_logger::init();
let limit = env::args()
.nth(1)
.map(|s| FromStr::from_str(&s).expect("Expected a number"));

let session =
osauth::from_env().expect("Failed to create an identity provider from the environment");
let adapter = session.adapter(osauth::services::COMPUTE);

let sstream = adapter
.get_json_paginated::<_, Server>(&["servers"], None, limit, None)
.await
.expect("Failed to start a GET request");
pin_mut!(sstream);
while let Some(srv) = sstream
.try_next()
.await
.expect("Failed to fetch the next chunk")
{
println!("ID = {}, Name = {}", srv.id, srv.name);
}
println!("Done listing");
}
104 changes: 104 additions & 0 deletions src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

//! Adapter for a specific service.

#[cfg(feature = "stream")]
use futures::Stream;
use reqwest::{Method, RequestBuilder, Response, Url};
use serde::de::DeserializeOwned;
use serde::Serialize;

use super::config;
use super::request;
use super::services::ServiceType;
#[cfg(feature = "stream")]
use super::stream::{paginated, Resource};
use super::{ApiVersion, AuthType, Error, Session};

/// Adapter for a specific service.
Expand Down Expand Up @@ -349,6 +353,78 @@ impl<Srv: ServiceType + Send + Clone> Adapter<Srv> {
request::fetch_json(self.request(Method::GET, path, api_version).await?).await
}

/// Fetch a paginated list of JSON objects using the GET request.
///
/// ```rust,no_run
/// # async fn example() -> Result<(), osauth::Error> {
/// use futures::pin_mut;
/// use futures::stream::TryStreamExt;
/// use serde::Deserialize;
///
/// #[derive(Debug, Deserialize)]
/// pub struct Server {
/// pub id: String,
/// pub name: String,
/// }
///
/// #[derive(Debug, Deserialize)]
/// pub struct ServersRoot {
/// pub servers: Vec<Server>,
/// }
///
/// // This implementatin defines the relationship between the root resource and its items.
/// impl osauth::stream::Resource for Server {
/// type Id = String;
/// type Root = ServersRoot;
/// fn resource_id(&self) -> Self::Id {
/// self.id.clone()
/// }
/// }
///
/// // This is another required part of the pagination contract.
/// impl From<ServersRoot> for Vec<Server> {
/// fn from(value: ServersRoot) -> Vec<Server> {
/// value.servers
/// }
/// }
///
/// let adapter = osauth::from_env()
/// .expect("Failed to create an identity provider from the environment")
/// .into_adapter(osauth::services::COMPUTE);
///
/// let servers = adapter
/// .get_json_paginated::<_, Server>(&["servers"], None, None, None)
/// .await?;
///
/// pin_mut!(servers);
/// while let Some(srv) = servers.try_next().await? {
/// println!("ID = {}, Name = {}", srv.id, srv.name);
/// }
/// # Ok(()) }
/// # #[tokio::main]
/// # async fn main() { example().await.unwrap(); }
/// ```
///
/// See [request](#method.request) for an explanation of the parameters.
#[cfg(feature = "stream")]
pub async fn get_json_paginated<I, T>(
&self,
path: I,
api_version: Option<ApiVersion>,
limit: Option<usize>,
starting_with: Option<<T as Resource>::Id>,
) -> Result<impl Stream<Item = Result<T, Error>>, Error>
where
I: IntoIterator,
I::Item: AsRef<str>,
I::IntoIter: Send,
T: Resource + Unpin,
<T as Resource>::Root: Into<Vec<T>> + Send,
{
let builder = self.request(Method::GET, path, api_version).await?;
Ok(paginated(builder, limit, starting_with))
}

/// Fetch a JSON using the GET request with a query.
///
/// See `reqwest` crate documentation for how to define a query.
Expand All @@ -375,6 +451,34 @@ impl<Srv: ServiceType + Send + Clone> Adapter<Srv> {
.await
}

/// Fetch a paginated list of JSON objects using the GET request with a query.
///
/// See `reqwest` crate documentation for how to define a query.
/// See [request](#method.request) for an explanation of the parameters.
#[cfg(feature = "stream")]
pub async fn get_json_query_paginated<I, Q, T>(
&self,
path: I,
query: Q,
api_version: Option<ApiVersion>,
limit: Option<usize>,
starting_with: Option<<T as Resource>::Id>,
) -> Result<impl Stream<Item = Result<T, Error>>, Error>
where
I: IntoIterator,
I::Item: AsRef<str>,
I::IntoIter: Send,
Q: Serialize + Send,
T: Resource + Unpin,
<T as Resource>::Root: Into<Vec<T>> + Send,
{
let builder = self
.request(Method::GET, path, api_version)
.await?
.query(&query);
Ok(paginated(builder, limit, starting_with))
}

/// Issue a GET request with a query
///
/// See `reqwest` crate documentation for how to define a query.
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ mod protocol;
pub mod request;
pub mod services;
mod session;
#[cfg(feature = "stream")]
pub mod stream;
mod url;

pub use crate::adapter::Adapter;
Expand Down
115 changes: 115 additions & 0 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

use std::sync::Arc;

#[cfg(feature = "stream")]
use futures::Stream;
use log::{debug, trace};
use reqwest::header::HeaderMap;
use reqwest::{Method, RequestBuilder, Response, Url};
Expand All @@ -26,6 +28,8 @@ use super::cache;
use super::protocol::ServiceInfo;
use super::request;
use super::services::ServiceType;
#[cfg(feature = "stream")]
use super::stream::{paginated, Resource};
use super::url;
use super::{Adapter, ApiVersion, AuthType, Error};

Expand Down Expand Up @@ -422,6 +426,87 @@ impl Session {
.await
}

/// Fetch a paginated list of JSON objects using the GET request.
///
/// ```rust,no_run
/// # async fn example() -> Result<(), osauth::Error> {
/// use futures::pin_mut;
/// use futures::stream::TryStreamExt;
/// use serde::Deserialize;
///
/// #[derive(Debug, Deserialize)]
/// pub struct Server {
/// pub id: String,
/// pub name: String,
/// }
///
/// #[derive(Debug, Deserialize)]
/// pub struct ServersRoot {
/// pub servers: Vec<Server>,
/// }
///
/// // This implementatin defines the relationship between the root resource and its items.
/// impl osauth::stream::Resource for Server {
/// type Id = String;
/// type Root = ServersRoot;
/// fn resource_id(&self) -> Self::Id {
/// self.id.clone()
/// }
/// }
///
/// // This is another required part of the pagination contract.
/// impl From<ServersRoot> for Vec<Server> {
/// fn from(value: ServersRoot) -> Vec<Server> {
/// value.servers
/// }
/// }
///
/// let session =
/// osauth::from_env().expect("Failed to create an identity provider from the environment");
///
/// let servers = session
/// .get_json_paginated::<_, _, Server>(
/// osauth::services::COMPUTE,
/// &["servers"],
/// None,
/// None,
/// None
/// )
/// .await?;
///
/// pin_mut!(servers);
/// while let Some(srv) = servers.try_next().await? {
/// println!("ID = {}, Name = {}", srv.id, srv.name);
/// }
/// # Ok(()) }
/// # #[tokio::main]
/// # async fn main() { example().await.unwrap(); }
/// ```
///
/// See [request](#method.request) for an explanation of the parameters.
#[cfg(feature = "stream")]
pub async fn get_json_paginated<Srv, I, T>(
&self,
service: Srv,
path: I,
api_version: Option<ApiVersion>,
limit: Option<usize>,
starting_with: Option<<T as Resource>::Id>,
) -> Result<impl Stream<Item = Result<T, Error>>, Error>
where
Srv: ServiceType + Send + Clone,
I: IntoIterator,
I::Item: AsRef<str>,
I::IntoIter: Send,
T: Resource + Unpin,
<T as Resource>::Root: Into<Vec<T>> + Send,
{
let builder = self
.request(service, Method::GET, path, api_version)
.await?;
Ok(paginated(builder, limit, starting_with))
}

/// Fetch a JSON using the GET request with a query.
///
/// See `reqwest` crate documentation for how to define a query.
Expand Down Expand Up @@ -450,6 +535,36 @@ impl Session {
.await
}

/// Fetch a paginated list of JSON objects using the GET request with a query.
///
/// See `reqwest` crate documentation for how to define a query.
/// See [request](#method.request) for an explanation of the parameters.
#[cfg(feature = "stream")]
pub async fn get_json_query_paginated<Srv, I, Q, T>(
&self,
service: Srv,
path: I,
query: Q,
api_version: Option<ApiVersion>,
limit: Option<usize>,
starting_with: Option<<T as Resource>::Id>,
) -> Result<impl Stream<Item = Result<T, Error>>, Error>
where
Srv: ServiceType + Send + Clone,
I: IntoIterator,
I::Item: AsRef<str>,
I::IntoIter: Send,
Q: Serialize + Send,
T: Resource + Unpin,
<T as Resource>::Root: Into<Vec<T>> + Send,
{
let builder = self
.request(service, Method::GET, path, api_version)
.await?
.query(&query);
Ok(paginated(builder, limit, starting_with))
}

/// Issue a GET request with a query
///
/// See `reqwest` crate documentation for how to define a query.
Expand Down
Loading

0 comments on commit a8bfdee

Please sign in to comment.