From cb6141bb1deffafcc46b94ec78f57fe9f91cfb96 Mon Sep 17 00:00:00 2001 From: Marc Schreiber Date: Fri, 3 Jan 2020 17:14:00 +0100 Subject: [PATCH] Update tokio and use async/await --- Cargo.toml | 10 +- examples/checkregistry.rs | 12 +- examples/common/mod.rs | 57 +++---- examples/image-labels.rs | 17 +-- examples/image.rs | 48 +++--- examples/login.rs | 22 +-- examples/tags.rs | 34 ++--- examples/trace.rs | 47 ++---- src/errors.rs | 1 - src/lib.rs | 17 +-- src/mediatypes.rs | 6 +- src/v2/auth.rs | 194 ++++++++++-------------- src/v2/blobs.rs | 148 ++++++++---------- src/v2/catalog.rs | 71 +++++---- src/v2/content_digest.rs | 2 +- src/v2/manifest/manifest_schema1.rs | 1 - src/v2/manifest/manifest_schema2.rs | 58 +++---- src/v2/manifest/mod.rs | 226 +++++++++++++--------------- src/v2/mod.rs | 93 ++++-------- src/v2/tags.rs | 149 +++++++++--------- tests/mock/api_version.rs | 2 +- tests/mock/base_client.rs | 2 +- tests/mock/blobs_download.rs | 4 +- tests/mock/catalog.rs | 22 +-- tests/mock/tags.rs | 29 ++-- 25 files changed, 559 insertions(+), 713 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f50aa29..3174139 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,27 +25,27 @@ tag-prefix = "" [dependencies] base64 = "0.11" error-chain = { version = "0.12", default-features = false } -futures = "0.1" +futures = "0.3" http = "0.2" libflate = "0.1" log = "0.4" mime = "0.3" mockito = { version = "0.22", optional = true } regex = "^1.1.0" -serde = "1" -serde_derive = "1" +serde = { version = "1", features = ["derive"] } serde_json = "1" strum = "0.16" strum_macros = "0.16" tar = "0.4" -tokio = "0.1" +tokio = "0.2" dirs = "2.0" -reqwest = { version = "^0.9.6", default-features = false } +reqwest = { version = "0.10", default-features = false, features = ["json"] } sha2 = "^0.8.0" [dev-dependencies] env_logger = "0.7" spectral = "0.6" +tokio = { version = "0.2", features = ["macros"] } [features] default = ["reqwest-default-tls"] diff --git a/examples/checkregistry.rs b/examples/checkregistry.rs index 46cc245..f825377 100644 --- a/examples/checkregistry.rs +++ b/examples/checkregistry.rs @@ -1,15 +1,15 @@ extern crate tokio; use std::{boxed, error}; -use tokio::runtime::current_thread::Runtime; -fn main() { +#[tokio::main] +async fn main() { let registry = match std::env::args().nth(1) { Some(x) => x, None => "registry-1.docker.io".into(), }; - let res = run(®istry); + let res = run(®istry).await; if let Err(e) = res { println!("[{}] {}", registry, e); @@ -17,15 +17,13 @@ fn main() { }; } -fn run(host: &str) -> Result> { - let mut runtime = Runtime::new()?; +async fn run(host: &str) -> Result> { let dclient = dkregistry::v2::Client::configure() .registry(host) .insecure_registry(false) .build()?; - let futcheck = dclient.is_v2_supported(); - let supported = runtime.block_on(futcheck)?; + let supported = dclient.is_v2_supported().await?; if supported { println!("{} supports v2", host); } else { diff --git a/examples/common/mod.rs b/examples/common/mod.rs index 3426177..e4ce128 100644 --- a/examples/common/mod.rs +++ b/examples/common/mod.rs @@ -1,44 +1,23 @@ extern crate futures; -use futures::prelude::*; - -pub fn authenticate_client( +pub async fn authenticate_client( mut client: dkregistry::v2::Client, login_scope: String, -) -> impl Future { - futures::future::ok::<_, dkregistry::errors::Error>(client.clone()) - .and_then(|dclient| { - dclient.is_v2_supported().and_then(|v2_supported| { - if !v2_supported { - Err("API v2 not supported".into()) - } else { - Ok(dclient) - } - }) - }) - .and_then(|dclient| { - dclient.is_auth(None).and_then(move |is_auth| { - if is_auth { - Ok(dclient) - } else { - Err("login required".into()) - } - }) - }) - .or_else(move |_| { - client - .login(&[login_scope.as_str()]) - .and_then(move |token| { - client - .is_auth(Some(token.token())) - .and_then(move |is_auth| { - if !is_auth { - Err("login failed".into()) - } else { - println!("logged in!"); - Ok(client.set_token(Some(token.token())).clone()) - } - }) - }) - }) +) -> Result { + if !client.is_v2_supported().await? { + return Err("API v2 not supported".into()); + } + + if client.is_auth(None).await? { + return Ok(client); + } + + let token = client.login(&[login_scope.as_str()]).await?; + + if !client.is_auth(Some(token.token())).await? { + Err("login failed".into()) + } else { + println!("logged in!"); + Ok(client.set_token(Some(token.token())).clone()) + } } diff --git a/examples/image-labels.rs b/examples/image-labels.rs index 4ad80f3..208cfe8 100644 --- a/examples/image-labels.rs +++ b/examples/image-labels.rs @@ -5,15 +5,14 @@ extern crate tokio; use dkregistry::reference; use dkregistry::v2::manifest::Manifest; -use futures::prelude::*; use std::result::Result; use std::str::FromStr; use std::{env, fs, io}; -use tokio::runtime::current_thread::Runtime; mod common; -fn main() { +#[tokio::main] +async fn main() { let dkr_ref = match std::env::args().nth(1) { Some(ref x) => reference::Reference::from_str(x), None => reference::Reference::from_str("quay.io/steveej/cincinnati-test-labels:0.0.0"), @@ -46,7 +45,7 @@ fn main() { } }; - let res = run(&dkr_ref, user, password); + let res = run(&dkr_ref, user, password).await; if let Err(e) = res { println!("[{}] {}", registry, e); @@ -54,13 +53,11 @@ fn main() { }; } -fn run( +async fn run( dkr_ref: &reference::Reference, user: Option, passwd: Option, ) -> Result<(), dkregistry::errors::Error> { - let mut runtime = Runtime::new()?; - let client = dkregistry::v2::Client::configure() .registry(&dkr_ref.registry()) .insecure_registry(false) @@ -72,10 +69,8 @@ fn run( let login_scope = format!("repository:{}:pull", image); let version = dkr_ref.version(); - let futures = common::authenticate_client(client, login_scope) - .and_then(|dclient| dclient.get_manifest(&image, &version)); - - let manifest = match runtime.block_on(futures) { + let dclient = common::authenticate_client(client, login_scope).await?; + let manifest = match dclient.get_manifest(&image, &version).await { Ok(manifest) => Ok(manifest), Err(e) => Err(format!("Got error {}", e)), }?; diff --git a/examples/image.rs b/examples/image.rs index d78b65a..21895bd 100644 --- a/examples/image.rs +++ b/examples/image.rs @@ -4,13 +4,14 @@ extern crate serde_json; extern crate tokio; use dkregistry::render; -use futures::prelude::*; +use futures::future::join_all; use std::result::Result; use std::{boxed, env, error, fs, io}; mod common; -fn main() { +#[tokio::main] +async fn main() { let registry = match std::env::args().nth(1) { Some(x) => x, None => "quay.io".into(), @@ -51,7 +52,7 @@ fn main() { } }; - let res = run(®istry, &image, &version, user, password); + let res = run(®istry, &image, &version, user, password).await; if let Err(e) = res { println!("[{}] {}", registry, e); @@ -59,7 +60,7 @@ fn main() { }; } -fn run( +async fn run( registry: &str, image: &str, version: &str, @@ -80,28 +81,23 @@ fn run( let login_scope = format!("repository:{}:pull", image); - let futures = common::authenticate_client(client, login_scope) - .and_then(|dclient| { - dclient - .get_manifest(&image, &version) - .and_then(|manifest| Ok((dclient, manifest.layers_digests(None)?))) - }) - .and_then(|(dclient, layers_digests)| { - println!("{} -> got {} layer(s)", &image, layers_digests.len(),); - - futures::stream::iter_ok::<_, dkregistry::errors::Error>(layers_digests) - .and_then(move |layer_digest| { - let get_blob_future = dclient.get_blob(&image, &layer_digest); - get_blob_future.inspect(move |blob| { - println!("Layer {}, got {} bytes.\n", layer_digest, blob.len()); - }) - }) - .collect() - }); - - let blobs = tokio::runtime::current_thread::Runtime::new() - .unwrap() - .block_on(futures)?; + let dclient = common::authenticate_client(client, login_scope).await?; + let manifest = dclient.get_manifest(&image, &version).await?; + let layers_digests = manifest.layers_digests(None)?; + + println!("{} -> got {} layer(s)", &image, layers_digests.len(),); + + let blob_futures = layers_digests + .iter() + .map(|layer_digest| dclient.get_blob(&image, &layer_digest)) + .collect::>(); + + let (blobs, errors): (Vec<_>, Vec<_>) = join_all(blob_futures) + .await + .into_iter() + .partition(Result::is_ok); + let blobs: Vec<_> = blobs.into_iter().map(Result::unwrap).collect(); + let _errors: Vec<_> = errors.into_iter().map(Result::unwrap_err).collect(); println!("Downloaded {} layers", blobs.len()); diff --git a/examples/login.rs b/examples/login.rs index f1741b9..834cf7f 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -3,12 +3,11 @@ extern crate tokio; mod common; -use futures::prelude::*; use std::result::Result; use std::{boxed, error}; -use tokio::runtime::current_thread::Runtime; -fn main() { +#[tokio::main] +async fn main() { let registry = match std::env::args().nth(1) { Some(x) => x, None => "registry-1.docker.io".into(), @@ -28,7 +27,7 @@ fn main() { println!("[{}] no $DKREG_PASSWD for login password", registry); } - let res = run(®istry, user, password, login_scope); + let res = run(®istry, user, password, login_scope).await; if let Err(e) = res { println!("[{}] {}", registry, e); @@ -36,7 +35,7 @@ fn main() { }; } -fn run( +async fn run( host: &str, user: Option, passwd: Option, @@ -47,8 +46,6 @@ fn run( .filter(Some("trace"), log::LevelFilter::Trace) .try_init()?; - let mut runtime = Runtime::new()?; - let client = dkregistry::v2::Client::configure() .registry(host) .insecure_registry(false) @@ -56,12 +53,7 @@ fn run( .password(passwd) .build()?; - let futures = common::authenticate_client(client, login_scope) - .and_then(|dclient| dclient.is_v2_supported()); - - match runtime.block_on(futures) { - Ok(login_successful) if login_successful => Ok(()), - Err(e) => Err(Box::new(e)), - _ => Err("Login unsucessful".into()), - } + let dclient = common::authenticate_client(client, login_scope).await?; + dclient.is_v2_supported().await?; + Ok(()) } diff --git a/examples/tags.rs b/examples/tags.rs index 8607f1e..917ee3b 100644 --- a/examples/tags.rs +++ b/examples/tags.rs @@ -3,12 +3,12 @@ extern crate tokio; mod common; -use futures::prelude::*; +use futures::stream::StreamExt; use std::result::Result; use std::{boxed, error}; -use tokio::runtime::current_thread::Runtime; -fn main() { +#[tokio::main] +async fn main() { let registry = match std::env::args().nth(1) { Some(x) => x, None => "registry-1.docker.io".into(), @@ -29,7 +29,7 @@ fn main() { println!("[{}] no $DKREG_PASSWD for login password", registry); } - let res = run(®istry, user, password, &image); + let res = run(®istry, user, password, &image).await; if let Err(e) = res { println!("[{}] {}", registry, e); @@ -37,7 +37,7 @@ fn main() { }; } -fn run( +async fn run( host: &str, user: Option, passwd: Option, @@ -48,7 +48,6 @@ fn run( .filter(Some("trace"), log::LevelFilter::Trace) .try_init()?; - let mut runtime = Runtime::new()?; let client = dkregistry::v2::Client::configure() .registry(host) .insecure_registry(false) @@ -58,17 +57,18 @@ fn run( let login_scope = format!("repository:{}:pull", image); - let futures = common::authenticate_client(client, login_scope) - .and_then(|dclient| dclient.get_tags(&image, Some(7)).collect()) - .and_then(|tags| { - for tag in tags { - println!("{:?}", tag); - } - Ok(()) - }); + let dclient = common::authenticate_client(client, login_scope).await?; - match runtime.block_on(futures) { - Ok(_) => Ok(()), - Err(e) => Err(Box::new(e)), + let (tags, _errors): (Vec<_>, Vec<_>) = dclient + .get_tags(&image, Some(7)) + .collect::>() + .await + .into_iter() + .partition(Result::is_ok); + + for tag in tags { + println!("{:?}", tag); } + + Ok(()) } diff --git a/examples/trace.rs b/examples/trace.rs index 945eab5..6d1f7e6 100644 --- a/examples/trace.rs +++ b/examples/trace.rs @@ -6,12 +6,11 @@ extern crate tokio; mod common; use dkregistry::reference; -use futures::prelude::*; use std::str::FromStr; use std::{boxed, env, error, fs, io}; -use tokio::runtime::current_thread::Runtime; -fn main() { +#[tokio::main] +async fn main() { let dkr_ref = match std::env::args().nth(1) { Some(ref x) => reference::Reference::from_str(x), None => reference::Reference::from_str("quay.io/coreos/etcd"), @@ -44,7 +43,7 @@ fn main() { } }; - let res = run(&dkr_ref, user, password); + let res = run(&dkr_ref, user, password).await; if let Err(e) = res { println!("[{}] {:?}", registry, e); @@ -52,7 +51,7 @@ fn main() { }; } -fn run( +async fn run( dkr_ref: &reference::Reference, user: Option, passwd: Option, @@ -64,7 +63,6 @@ fn run( let image = dkr_ref.repository(); let version = dkr_ref.version(); - let mut runtime = Runtime::new()?; let client = dkregistry::v2::Client::configure() .registry(&dkr_ref.registry()) @@ -75,33 +73,18 @@ fn run( let login_scope = ""; - let futures = common::authenticate_client(client, login_scope.to_string()) - .and_then(|dclient| { - dclient - .get_manifest(&image, &version) - .and_then(|manifest| Ok((dclient, manifest.layers_digests(None)?))) - }) - .and_then(|(dclient, layers_digests)| { - let image = image.clone(); - - println!("{} -> got {} layer(s)", &image, layers_digests.len(),); - - futures::stream::iter_ok::<_, dkregistry::errors::Error>(layers_digests) - .and_then(move |layer_digest| { - let get_blob_future = dclient.get_blob(&image, &layer_digest); - get_blob_future.inspect(move |blob| { - println!("Layer {}, got {} bytes.\n", layer_digest, blob.len()); - }) - }) - .collect() - }); - - let blobs = match runtime.block_on(futures) { - Ok(blobs) => blobs, - Err(e) => return Err(Box::new(e)), - }; + let dclient = common::authenticate_client(client, login_scope.to_string()).await?; + let manifest = dclient.get_manifest(&image, &version).await?; + + let layers_digests = manifest.layers_digests(None)?; + println!("{} -> got {} layer(s)", &image, layers_digests.len(),); + + for layer_digest in &layers_digests { + let blob = dclient.get_blob(&image, &layer_digest).await?; + println!("Layer {}, got {} bytes.\n", layer_digest, blob.len()); + } - println!("Downloaded {} layers", blobs.len()); + println!("Downloaded {} layers", layers_digests.len()); Ok(()) } diff --git a/src/errors.rs b/src/errors.rs index 3887841..b7009ae 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -4,7 +4,6 @@ use base64; use http; use regex; use reqwest; -use serde_json; use std::{io, string}; error_chain! { diff --git a/src/lib.rs b/src/lib.rs index 21ef3e2..9518898 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,37 +8,36 @@ //! ```rust,no_run //! # extern crate dkregistry; //! # extern crate tokio; -//! # fn main() { -//! # fn run() -> dkregistry::errors::Result<()> { +//! # #[tokio::main] +//! # async fn main() { +//! # async fn run() -> dkregistry::errors::Result<()> { //! # -//! use tokio::runtime::current_thread::Runtime; //! use dkregistry::v2::Client; //! //! // Check whether a registry supports API v2. //! let host = "quay.io"; -//! let mut runtime = Runtime::new()?; //! let dclient = Client::configure() //! .insecure_registry(false) //! .registry(host) //! .build()?; -//! let check = dclient.is_v2_supported(); -//! match runtime.block_on(check)? { +//! match dclient.is_v2_supported().await? { //! false => println!("{} does NOT support v2", host), //! true => println!("{} supports v2", host), //! }; //! # //! # Ok(()) //! # }; -//! # run().unwrap(); +//! # run().await.unwrap(); //! # } //! ``` +#![feature(async_closure)] #![deny(missing_debug_implementations)] #[macro_use] -extern crate error_chain; +extern crate serde; #[macro_use] -extern crate serde_derive; +extern crate error_chain; #[macro_use] extern crate log; #[macro_use] diff --git a/src/mediatypes.rs b/src/mediatypes.rs index 9c76cad..4225b81 100644 --- a/src/mediatypes.rs +++ b/src/mediatypes.rs @@ -1,13 +1,9 @@ //! Media-types for API objects. -use crate::errors::*; -use futures; +use crate::errors::{Error, Result}; use mime; use strum::EnumProperty; -pub type FutureMediaType = - Box, Error = Error> + Send>; - // For schema1 types, see https://docs.docker.com/registry/spec/manifest-v2-1/ // For schema2 types, see https://docs.docker.com/registry/spec/manifest-v2-2/ diff --git a/src/v2/auth.rs b/src/v2/auth.rs index d634641..5a46920 100644 --- a/src/v2/auth.rs +++ b/src/v2/auth.rs @@ -1,10 +1,7 @@ +use crate::errors::{Error, Result}; use crate::v2::*; -use futures::future; use reqwest::{StatusCode, Url}; -/// Convenience alias for future `TokenAuth` result. -pub type FutureTokenAuth = Box + Send>; - #[derive(Debug, Default, Deserialize, Serialize)] pub struct TokenAuth { token: String, @@ -19,48 +16,43 @@ impl TokenAuth { } } -type FutureString = Box + Send>; - impl Client { - fn get_token_provider(&self) -> FutureString { + async fn get_token_provider(&self) -> Result { let url = { let ep = format!("{}/v2/", self.base_url.clone(),); match reqwest::Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse url from string '{}': {}", ep, e - )))); + ))); } } }; - let fres = self - .build_reqwest(reqwest::r#async::Client::new().get(url.clone())) + let r = self + .build_reqwest(reqwest::Client::new().get(url.clone())) .send() .map_err(|e| Error::from(format!("{}", e))) - .and_then(move |r| { - trace!("GET '{}' status: {:?}", r.url(), r.status()); - let a = r - .headers() - .get(reqwest::header::WWW_AUTHENTICATE) - .ok_or_else(|| Error::from("get_token: missing Auth header"))?; - let chal = String::from_utf8(a.as_bytes().to_vec())?; - Ok(chal) - }) - .and_then(move |hdr| { - let (mut auth_ep, service) = parse_hdr_bearer(hdr.trim_start_matches("Bearer "))?; - - trace!("Token provider: {}", auth_ep); - if let Some(sv) = service { - auth_ep += &format!("?service={}", sv); - trace!("Service identity: {}", sv); - } - Ok(auth_ep) - }); + .await?; + + trace!("GET '{}' status: {:?}", r.url(), r.status()); + let a = r + .headers() + .get(reqwest::header::WWW_AUTHENTICATE) + .ok_or_else(|| Error::from("get_token: missing Auth header"))?; + let chal = String::from_utf8(a.as_bytes().to_vec())?; + + let (mut auth_ep, service) = parse_hdr_bearer(chal.trim_start_matches("Bearer "))?; + + trace!("Token provider: {}", auth_ep); + if let Some(sv) = service { + auth_ep += &format!("?service={}", sv); + trace!("Service identity: {}", sv); + } - Box::new(fres) + Ok(auth_ep) } /// Set the token to be used for further registry requests. @@ -74,91 +66,77 @@ impl Client { /// Perform registry authentication and return an authenticated token. /// /// On success, the returned token will be valid for all requested scopes. - pub fn login(&self, scopes: &[&str]) -> FutureTokenAuth { + pub async fn login(&self, scopes: &[&str]) -> Result { let subclient = self.clone(); let creds = self.credentials.clone(); let scope = scopes .iter() .fold("".to_string(), |acc, &s| acc + "&scope=" + s); - let auth = self - .get_token_provider() - .and_then(move |token_ep| { - let auth_ep = token_ep + scope.as_str(); - trace!("login: token endpoint: {}", auth_ep); - - reqwest::Url::parse(&auth_ep).map_err(|e| { - Error::from(format!( - "failed to parse url from string '{}': {}", - auth_ep, e - )) - }) - }) - .and_then(move |u| { - let auth_req = { - let auth_req = subclient.build_reqwest(reqwest::r#async::Client::new().get(u)); - if let Some(creds) = creds { - auth_req.basic_auth(creds.0, Some(creds.1)) - } else { - auth_req - } - }; - auth_req.send().map_err(|e| e.into()) - }) - .and_then(|r| { - let status = r.status(); - trace!("login: got status {}", status); - match status { - StatusCode::OK => Ok(r), - _ => Err(format!("login: wrong HTTP status '{}'", status).into()), - } - }) - .and_then(|r| { - r.into_body() - .concat2() - .map_err(|e| format!("login: failed to fetch the whole body: {}", e).into()) - }) - .and_then(|body| { - let s = String::from_utf8(body.to_vec())?; - serde_json::from_slice(s.as_bytes()).map_err(|e| e.into()) - }) - .and_then(|token_auth: TokenAuth| { - let mut t = token_auth.token().to_string(); - - if t == "unauthenticated" { - bail!("received token with value '{}'", t) - } else if t.is_empty() { - bail!("received an empty token") - }; - - // mask the token before logging it - let chars_count = t.chars().count(); - let mask_start = std::cmp::min(1, chars_count - 1); - let mask_end = std::cmp::max(chars_count - 1, 1); - t.replace_range(mask_start..mask_end, &"*".repeat(mask_end - mask_start)); - - trace!("login: got token: {:?}", t); - - Ok(token_auth) - }); - Box::new(auth) + + let token_ep = self.get_token_provider().await?; + let auth_ep = token_ep + scope.as_str(); + trace!("login: token endpoint: {}", auth_ep); + + let url = reqwest::Url::parse(&auth_ep).map_err(|e| { + Error::from(format!( + "failed to parse url from string '{}': {}", + auth_ep, e + )) + })?; + + let auth_req = { + let auth_req = subclient.build_reqwest(reqwest::Client::new().get(url)); + if let Some(creds) = creds { + auth_req.basic_auth(creds.0, Some(creds.1)) + } else { + auth_req + } + }; + + let r = auth_req.send().await?; + let status = r.status(); + trace!("login: got status {}", status); + match status { + StatusCode::OK => {} + _ => return Err(format!("login: wrong HTTP status '{}'", status).into()), + } + + let token_auth = r.json::().await?; + let mut t = token_auth.token().to_string(); + + if t == "unauthenticated" { + bail!("received token with value '{}'", t) + } else if t.is_empty() { + bail!("received an empty token") + }; + + // mask the token before logging it + let chars_count = t.chars().count(); + let mask_start = std::cmp::min(1, chars_count - 1); + let mask_end = std::cmp::max(chars_count - 1, 1); + t.replace_range(mask_start..mask_end, &"*".repeat(mask_end - mask_start)); + + trace!("login: got token: {:?}", t); + + Ok(token_auth) } /// Check whether the client is authenticated with the registry. - pub fn is_auth(&self, token: Option<&str>) -> FutureBool { + pub async fn is_auth(&self, token: Option<&str>) -> Result { let url = { let ep = format!("{}/v2/", self.base_url.clone(),); match Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse url from string '{}': {}", ep, e - )))); + ))); } } }; - let req = self.build_reqwest(reqwest::r#async::Client::new().get(url.clone())); + let req = self.build_reqwest(reqwest::Client::new().get(url.clone())); let req = if let Some(t) = token { req.bearer_auth(t) } else { @@ -168,21 +146,15 @@ impl Client { trace!("Sending request to '{}'", url); - let fres = req - .send() - .map_err(|e| Error::from(format!("{}", e))) - .and_then(move |resp| { - trace!("GET '{:?}'", resp); - - let status = resp.status(); - match status { - reqwest::StatusCode::OK => Ok(true), - reqwest::StatusCode::UNAUTHORIZED => Ok(false), - _ => Err(format!("is_auth: wrong HTTP status '{}'", status).into()), - } - }); + let resp = req.send().await?; + trace!("GET '{:?}'", resp); - Box::new(fres) + let status = resp.status(); + match status { + reqwest::StatusCode::OK => Ok(true), + reqwest::StatusCode::UNAUTHORIZED => Ok(false), + _ => Err(format!("is_auth: wrong HTTP status '{}'", status).into()), + } } } diff --git a/src/v2/blobs.rs b/src/v2/blobs.rs index c226813..9850818 100644 --- a/src/v2/blobs.rs +++ b/src/v2/blobs.rs @@ -1,114 +1,92 @@ +use crate::errors::{Error, Result}; use crate::v2::*; -use futures::Stream; use reqwest; use reqwest::StatusCode; -/// Convenience alias for future binary blob. -pub type FutureBlob = Box, Error = Error> + Send>; - impl Client { /// Check if a blob exists. - pub fn has_blob(&self, name: &str, digest: &str) -> FutureBool { + pub async fn has_blob(&self, name: &str, digest: &str) -> Result { let url = { let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest); match reqwest::Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(futures::future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse url from string: {}", e - )))); + ))); } } }; - let fres = self - .build_reqwest(reqwest::r#async::Client::new().head(url)) + let res = self + .build_reqwest(reqwest::Client::new().head(url)) .send() - .inspect(|res| trace!("Blob HEAD status: {:?}", res.status())) - .and_then(|res| match res.status() { - StatusCode::OK => Ok(true), - _ => Ok(false), - }) - .map_err(|e| format!("{}", e).into()); - Box::new(fres) + .await?; + + trace!("Blob HEAD status: {:?}", res.status()); + + match res.status() { + StatusCode::OK => Ok(true), + _ => Ok(false), + } } /// Retrieve blob. - pub fn get_blob(&self, name: &str, digest: &str) -> FutureBlob { - let fres_digest = futures::future::result(ContentDigest::try_new(digest.to_string())); + pub async fn get_blob(&self, name: &str, digest: &str) -> Result> { + let digest = ContentDigest::try_new(digest.to_string())?; - let fres_blob = { + let blob = { let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest); - reqwest::Url::parse(&ep).map_err(|e|{ - crate::errors::Error::from(format!( - "failed to parse url from string: {}", - e - )) - }) - .map(|url|{ - self.build_reqwest(reqwest::r#async::Client::new() - .get(url)) - .send() - .map_err(|e| crate::errors::Error::from(format!("{}", e))) - }) - .into_future() - .flatten() - .and_then(|res| { - trace!("GET {} status: {}", res.url(), res.status()); - let status = res.status(); + let url = reqwest::Url::parse(&ep) + .map_err(|e| Error::from(format!("failed to parse url from string: {}", e)))?; - if status.is_success() - // Let client errors through to populate them with the body - || status.is_client_error() - { - Ok(res) - } else { - Err(crate::errors::Error::from(format!( - "GET request failed with status '{}'", - status - ))) - } - }).and_then(|mut res| { - std::mem::replace(res.body_mut(), reqwest::r#async::Decoder::empty()) - .concat2() - .map_err(|e| crate::errors::Error::from(format!("{}", e))) - .join(futures::future::ok(res)) - }).map_err(|e| crate::errors::Error::from(format!("{}", e))) - .and_then(|(body, res)| { - let body_vec = body.to_vec(); - let len = body_vec.len(); - let status = res.status(); + let res = self + .build_reqwest(reqwest::Client::new().get(url)) + .send() + .await?; - if status.is_success() { - trace!("Successfully received blob with {} bytes ", len); - Ok(body_vec) - } else if status.is_client_error() { - Err(Error::from(format!( - "GET request failed with status '{}' and body of size {}: {:#?}", - status, - len, - String::from_utf8_lossy(&body_vec) - ))) - } else { - // We only want to handle success and client errors here - error!( - "Received unexpected HTTP status '{}' after fetching the body. Please submit a bug report.", - status - ); - Err(Error::from(format!( - "GET request failed with status '{}'", - status - ))) - } - }) - }; + trace!("GET {} status: {}", res.url(), res.status()); + let status = res.status(); + + if !(status.is_success() + // Let client errors through to populate them with the body + || status.is_client_error()) + { + return Err(Error::from(format!( + "GET request failed with status '{}'", + status + ))); + } + + let status = res.status(); + let body_vec = res.bytes().await?.to_vec(); + let len = body_vec.len(); - let fres = fres_digest.join(fres_blob).and_then(|(digest, body)| { - digest.try_verify(&body)?; - Ok(body) - }); + if status.is_success() { + trace!("Successfully received blob with {} bytes ", len); + Ok(body_vec) + } else if status.is_client_error() { + Err(Error::from(format!( + "GET request failed with status '{}' and body of size {}: {:#?}", + status, + len, + String::from_utf8_lossy(&body_vec) + ))) + } else { + // We only want to handle success and client errors here + error!( + "Received unexpected HTTP status '{}' after fetching the body. Please submit a bug report.", + status + ); + Err(Error::from(format!( + "GET request failed with status '{}'", + status + ))) + } + }?; - Box::new(fres) + digest.try_verify(&blob)?; + Ok(blob) } } diff --git a/src/v2/catalog.rs b/src/v2/catalog.rs index f474f59..b1bc484 100644 --- a/src/v2/catalog.rs +++ b/src/v2/catalog.rs @@ -1,11 +1,11 @@ use crate::errors::{Error, Result}; use crate::v2; -use futures::{self, stream, Future, Stream}; -use reqwest::StatusCode; -use serde_json; +use futures::{self, stream::{self, BoxStream, StreamExt}}; +use reqwest::{RequestBuilder, StatusCode}; +use std::pin::Pin; /// Convenience alias for a stream of `String` repos. -pub type StreamCatalog = Box>; +pub type StreamCatalog<'a> = BoxStream<'a, Result>; #[derive(Debug, Default, Deserialize, Serialize)] struct Catalog { @@ -13,7 +13,7 @@ struct Catalog { } impl v2::Client { - pub fn get_catalog(&self, paginate: Option) -> StreamCatalog { + pub fn get_catalog<'a, 'b: 'a>(&'b self, paginate: Option) -> StreamCatalog<'a> { let url = { let suffix = if let Some(n) = paginate { format!("?n={}", n) @@ -24,37 +24,48 @@ impl v2::Client { match reqwest::Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(stream::once::<_, _>(Err(Error::from(format!( + let b = Box::new(stream::iter(vec![Err(Error::from(format!( "failed to parse url from string '{}': {}", ep, e - ))))); + )))])); + return unsafe { Pin::new_unchecked(b) }; } } }; - let req = self.build_reqwest(reqwest::r#async::Client::new().get(url)); - - let fres = req - .send() - .from_err() - .and_then(|r| { - let status = r.status(); - trace!("Got status: {:?}", status); - match status { - StatusCode::OK => Ok(r), - _ => Err(format!("get_catalog: wrong HTTP status '{}'", status).into()), - } - }) - .and_then(|r| { - r.into_body().concat2().map_err(|e| { - format!("get_catalog: failed to fetch the whole body: {}", e).into() - }) + let req = self.build_reqwest(reqwest::Client::new().get(url)); + let inner = stream::once(fetch_catalog(req)) + .map(|r| match r { + Ok(catalog) => stream::iter( + catalog + .repositories + .into_iter() + .map(|t| Ok(t)) + .collect::>(), + ), + Err(err) => stream::iter(vec![Err(err)]), }) - .and_then(|body| -> Result { - serde_json::from_slice(&body).map_err(|e| e.into()) - }) - .map(|cat| futures::stream::iter_ok(cat.repositories.into_iter())) - .flatten_stream(); - Box::new(fres) + .flatten(); + + let b = Box::new(inner); + unsafe { Pin::new_unchecked(b) } + } +} + +async fn fetch_catalog(req: RequestBuilder) -> Result { + match req.send().await { + Ok(r) => { + let status = r.status(); + trace!("Got status: {:?}", status); + match status { + StatusCode::OK => r + .json::() + .await + .map_err(|e| format!("get_catalog: failed to fetch the whole body: {}", e)), + _ => Err(format!("get_catalog: wrong HTTP status '{}'", status)), + } + } + Err(err) => Err(format!("{}", err)), } + .map_err(|e| Error::from(e)) } diff --git a/src/v2/content_digest.rs b/src/v2/content_digest.rs index cbffd39..0daa155 100644 --- a/src/v2/content_digest.rs +++ b/src/v2/content_digest.rs @@ -1,4 +1,4 @@ -use crate::v2::*; +use crate::errors::Result; /// Implements types and methods for content verification use sha2::{self, Digest}; diff --git a/src/v2/manifest/manifest_schema1.rs b/src/v2/manifest/manifest_schema1.rs index 9e48a11..645b042 100644 --- a/src/v2/manifest/manifest_schema1.rs +++ b/src/v2/manifest/manifest_schema1.rs @@ -1,4 +1,3 @@ -use crate::v2::*; use std::collections::HashMap; /// Manifest version 2 schema 1, signed. diff --git a/src/v2/manifest/manifest_schema2.rs b/src/v2/manifest/manifest_schema2.rs index 6a8ac9c..27dcb58 100644 --- a/src/v2/manifest/manifest_schema2.rs +++ b/src/v2/manifest/manifest_schema2.rs @@ -1,7 +1,4 @@ -use crate::v2::{Error, FutureResult}; -use futures::future::{self, Future}; -use futures::stream::Stream; -use serde_json; +use crate::errors::{Error, Result}; /// Manifest version 2 schema 2. /// @@ -90,11 +87,11 @@ impl ManifestSchema2Spec { } /// Fetch the config blob for this manifest - pub(crate) fn fetch_config_blob( + pub(crate) async fn fetch_config_blob( self, client: crate::v2::Client, repo: String, - ) -> FutureResult { + ) -> Result { let url = { let ep = format!( "{}/v2/{}/blobs/{}", @@ -105,43 +102,32 @@ impl ManifestSchema2Spec { match reqwest::Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse url from string '{}': {}", ep, e - )))); + ))); } } }; - let manifest_future = client - .build_reqwest(reqwest::r#async::Client::new().get(url.clone())) + let r = client + .build_reqwest(reqwest::Client::new().get(url.clone())) .send() - .map_err(|e| crate::v2::Error::from(format!("{}", e))) - .and_then(move |r| { - let status = r.status(); - trace!("GET {:?}: {}", url, &status); - - if status.is_success() { - Ok(r) - } else { - Err(format!("wrong HTTP status '{}'", status).into()) - } - }) - .and_then(|r| { - r.into_body() - .concat2() - .map_err(|e| Error::from(format!("{}", e))) - }) - .and_then(|body| { - let config_blob = serde_json::from_slice::(&body)?; - - Ok(ManifestSchema2 { - manifest_spec: self, - config_blob, - }) - }); - - Box::new(manifest_future) + .await?; + + let status = r.status(); + trace!("GET {:?}: {}", url, &status); + + if !status.is_success() { + return Err(format!("wrong HTTP status '{}'", status).into()); + } + + let config_blob = r.json::().await?; + + Ok(ManifestSchema2 { + manifest_spec: self, + config_blob, + }) } } diff --git a/src/v2/manifest/mod.rs b/src/v2/manifest/mod.rs index 98a13b0..a85c114 100644 --- a/src/v2/manifest/mod.rs +++ b/src/v2/manifest/mod.rs @@ -1,9 +1,6 @@ -//! Manifest types. +use crate::errors::{Error, Result}; use crate::mediatypes; use crate::v2::*; - -use futures::future::Either; -use futures::{future, Stream}; use mime; use reqwest::{self, header, StatusCode, Url}; use std::iter::FromIterator; @@ -20,20 +17,21 @@ impl Client { /// /// The name and reference parameters identify the image. /// The reference may be either a tag or digest. - pub fn get_manifest(&self, name: &str, reference: &str) -> FutureManifest { - Box::new( - self.get_manifest_and_ref(name, reference) - .map(|(manifest, _)| manifest), - ) + pub async fn get_manifest(&self, name: &str, reference: &str) -> Result { + self.get_manifest_and_ref(name, reference) + .await + .map(|(manifest, _)| manifest) } /// Fetch an image manifest and return it with its digest. /// /// The name and reference parameters identify the image. /// The reference may be either a tag or digest. - pub fn get_manifest_and_ref(&self, name: &str, reference: &str) -> FutureManifestAndRef { - let name = name.to_string(); - + pub async fn get_manifest_and_ref( + &self, + name: &str, + reference: &str, + ) -> Result<(Manifest, Option)> { let url = { let ep = format!( "{}/v2/{}/manifests/{}", @@ -44,10 +42,10 @@ impl Client { match reqwest::Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse url from string '{}': {}", ep, e - )))); + ))); } } }; @@ -76,100 +74,83 @@ impl Client { let client_spare0 = self.clone(); - let fres = self + let res = self .build_reqwest( - reqwest::r#async::Client::new() - .get(url) + reqwest::Client::new() + .get(url.clone()) .headers(accept_headers), ) .send() - .map_err(|e| Error::from(format!("{}", e))) - .and_then(|res| { - let status = res.status(); - trace!("GET '{}' status: {:?}", res.url(), status); + .await?; - match status { - StatusCode::OK => Ok(res), - _ => Err(format!("GET {}: wrong HTTP status '{}'", res.url(), status).into()), - } - }) - .and_then(|res| { - future::ok((res.headers().clone(), res.url().clone())).join( - res.into_body() - .concat2() - .map_err(|e| Error::from(format!("{}", e))), - ) - }) - .map_err(|e| Error::from(format!("{}", e))) - .and_then(|((headers, url), body)| { - let content_digest = match headers.get("docker-content-digest") { - Some(content_digest_value) => Some( - content_digest_value - .to_str() - .map_err(|e| Error::from(format!("{}", e)))? - .to_string(), - ), - None => { - debug!("cannot find manifestref in headers"); - None - } - }; + let status = res.status(); + trace!("GET '{}' status: {:?}", res.url(), status); - let header_content_type = headers.get(header::CONTENT_TYPE); - let media_type = evaluate_media_type(header_content_type, &url)?; + match status { + StatusCode::OK => {} + _ => return Err(format!("GET {}: wrong HTTP status '{}'", res.url(), status).into()), + } - trace!( - "content-type: {:?}, media-type: {:?}", - header_content_type, - media_type - ); + let headers = res.headers(); + let content_digest = match headers.get("docker-content-digest") { + Some(content_digest_value) => Some( + content_digest_value + .to_str() + .map_err(|e| Error::from(format!("{}", e)))? + .to_string(), + ), + None => { + debug!("cannot find manifestref in headers"); + None + } + }; - Ok((body, content_digest, media_type)) - }) - .and_then(move |(body, content_digest, media_type)| { - match media_type { - mediatypes::MediaTypes::ManifestV2S1Signed => { - Either::A(futures::future::result( - serde_json::from_slice::(&body) - .map_err(|e| Error::from(format!("{}", e))) - .map(Manifest::S1Signed), - )) - } - mediatypes::MediaTypes::ManifestV2S2 => Either::B( - futures::future::result(serde_json::from_slice::( - &body, - )) - .map_err(|e| Error::from(format!("{}", e))) - .and_then(move |m| { - m.fetch_config_blob(client_spare0, name.to_string()) - .map(Manifest::S2) - }), - ), - mediatypes::MediaTypes::ManifestList => Either::A(futures::future::result( - serde_json::from_slice::(&body) - .map_err(|e| Error::from(format!("{}", e))) - .map(Manifest::ML), - )), - unsupported => Either::A(future::err(Error::from(format!( - "unsupported mediatype '{:?}'", - unsupported - )))), - } - .and_then(|manifest| Ok((manifest, content_digest))) - }); - Box::new(fres) + let header_content_type = headers.get(header::CONTENT_TYPE); + let media_type = evaluate_media_type(header_content_type, &url)?; + + trace!( + "content-type: {:?}, media-type: {:?}", + header_content_type, + media_type + ); + + match media_type { + mediatypes::MediaTypes::ManifestV2S1Signed => Ok(( + res.json::() + .await + .map(Manifest::S1Signed)?, + content_digest, + )), + mediatypes::MediaTypes::ManifestV2S2 => { + let m = res.json::().await?; + Ok(( + m.fetch_config_blob(client_spare0, name.to_string()) + .await + .map(Manifest::S2)?, + content_digest, + )) + } + mediatypes::MediaTypes::ManifestList => Ok(( + res.json::().await.map(Manifest::ML)?, + content_digest, + )), + unsupported => Err(Error::from(format!( + "unsupported mediatype '{:?}'", + unsupported + ))), + } } /// Check if an image manifest exists. /// /// The name and reference parameters identify the image. /// The reference may be either a tag or digest. - pub fn has_manifest( + pub async fn has_manifest( &self, name: &str, reference: &str, mediatypes: Option<&[&str]>, - ) -> mediatypes::FutureMediaType { + ) -> Result> { let url = { let ep = format!( "{}/v2/{}/manifests/{}", @@ -180,10 +161,10 @@ impl Client { match Url::parse(&ep) { Ok(url) => url, Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse url from string '{}': {}", ep, e - )))); + ))); } } }; @@ -201,10 +182,10 @@ impl Client { } { Ok(x) => x, Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to match mediatypes: {}", e - )))); + ))); } }; @@ -213,45 +194,44 @@ impl Client { match header::HeaderValue::from_str(&accept_type.to_string()) { Ok(header_value) => accept_headers.insert(header::ACCEPT, header_value), Err(e) => { - return Box::new(future::err::<_, _>(Error::from(format!( + return Err(Error::from(format!( "failed to parse mime '{}' as accept_header: {}", accept_type, e - )))); + ))); } }; } - let fres = self - .build_reqwest(reqwest::r#async::Client::new().get(url.clone())) + + trace!("HEAD {:?}", url); + + let r = self + .build_reqwest(reqwest::Client::new().get(url.clone())) .headers(accept_headers) .send() - .map_err(Error::from) - .inspect(move |_| { - trace!("HEAD {:?}", url); - }) - .and_then(|r| { - let status = r.status(); - let media_type = - evaluate_media_type(r.headers().get(header::CONTENT_TYPE), &r.url())?; + .await + .map_err(Error::from)?; - trace!( - "Manifest check status '{:?}', headers '{:?}, media-type: {:?}", - r.status(), - r.headers(), - media_type - ); + let status = r.status(); + let media_type = + evaluate_media_type(r.headers().get(header::CONTENT_TYPE), &r.url())?; - let res = match status { - StatusCode::MOVED_PERMANENTLY - | StatusCode::TEMPORARY_REDIRECT - | StatusCode::FOUND - | StatusCode::OK => Some(media_type), - StatusCode::NOT_FOUND => None, - _ => bail!("has_manifest: wrong HTTP status '{}'", &status), - }; - Ok(res) - }); - Box::new(fres) + trace!( + "Manifest check status '{:?}', headers '{:?}, media-type: {:?}", + r.status(), + r.headers(), + media_type + ); + + let res = match status { + StatusCode::MOVED_PERMANENTLY + | StatusCode::TEMPORARY_REDIRECT + | StatusCode::FOUND + | StatusCode::OK => Some(media_type), + StatusCode::NOT_FOUND => None, + _ => bail!("has_manifest: wrong HTTP status '{}'", &status), + }; + Ok(res) } } diff --git a/src/v2/mod.rs b/src/v2/mod.rs index cc3e96b..3630a21 100644 --- a/src/v2/mod.rs +++ b/src/v2/mod.rs @@ -9,31 +9,27 @@ //! ```rust,no_run //! # extern crate dkregistry; //! # extern crate tokio; -//! # fn main() { -//! # fn run() -> dkregistry::errors::Result<()> { +//! # #[tokio::main] +//! # async fn main() { +//! # async fn run() -> dkregistry::errors::Result<()> { //! # -//! use tokio::runtime::current_thread::Runtime; //! use dkregistry::v2::Client; //! //! // Retrieve an image manifest. -//! let mut runtime = Runtime::new()?; //! let dclient = Client::configure() //! .registry("quay.io") //! .build()?; -//! let fetch = dclient.get_manifest("coreos/etcd", "v3.1.0"); -//! let manifest = runtime.block_on(fetch)?; +//! let manifest = dclient.get_manifest("coreos/etcd", "v3.1.0").await?; //! # //! # Ok(()) //! # }; -//! # run().unwrap(); +//! # run().await.unwrap(); //! # } //! ``` -use super::errors::*; -use crate::v2::manifest::Manifest; +use crate::errors::*; use futures::prelude::*; use reqwest::StatusCode; -use serde_json; mod config; pub use self::config::Config; @@ -42,7 +38,7 @@ mod catalog; pub use self::catalog::StreamCatalog; mod auth; -pub use self::auth::{FutureTokenAuth, TokenAuth}; +pub use self::auth::{TokenAuth}; pub mod manifest; @@ -50,7 +46,6 @@ mod tags; pub use self::tags::StreamTags; mod blobs; -pub use self::blobs::FutureBlob; mod content_digest; pub(crate) use self::content_digest::ContentDigest; @@ -65,74 +60,52 @@ pub struct Client { token: Option, } -/// Convenience alias for an arbitrary future type -pub type FutureResult = Box + Send>; - -/// Convenience alias for a future boolean result. -pub type FutureBool = Box + Send>; - -/// Convenience alias for a future manifest blob. -pub type FutureManifest = Box + Send>; - -/// Convenience alias for a future manifest blob and ref. -pub type FutureManifestAndRef = - Box), Error = Error> + Send>; - impl Client { pub fn configure() -> Config { Config::default() } /// Ensure remote registry supports v2 API. - pub fn ensure_v2_registry(self) -> impl Future + Send { - self.is_v2_supported() - .map(move |ok| (ok, self)) - .and_then(|(ok, client)| { - if !ok { - bail!("remote server does not support docker-registry v2 API") - } else { - Ok(client) - } - }) + pub async fn ensure_v2_registry(self) -> Result { + if !self.is_v2_supported().await? { + bail!("remote server does not support docker-registry v2 API") + } else { + Ok(self) + } } /// Check whether remote registry supports v2 API. - pub fn is_v2_supported(&self) -> impl Future { + pub async fn is_v2_supported(&self) -> Result { let api_header = "Docker-Distribution-API-Version"; let api_version = "registry/2.0"; // GET request to bare v2 endpoint. let v2_endpoint = format!("{}/v2/", self.base_url); - let get_v2 = reqwest::Url::parse(&v2_endpoint) + let request = reqwest::Url::parse(&v2_endpoint) .chain_err(|| format!("failed to parse url string '{}'", &v2_endpoint)) .map(|url| { trace!("GET {:?}", url); - self.build_reqwest(reqwest::r#async::Client::new().get(url)) - }) - .into_future() - .and_then(|req| req.send().from_err()); - - // Check status code and API headers according to spec: - // https://docs.docker.com/registry/spec/api/#api-version-check - get_v2 - .and_then(move |r| match (r.status(), r.headers().get(api_header)) { - (StatusCode::OK, Some(x)) => Ok(x == api_version), - (StatusCode::UNAUTHORIZED, Some(x)) => Ok(x == api_version), - (s, v) => { - trace!("Got unexpected status {}, header version {:?}", s, v); - Ok(false) - } - }) - .inspect(|b| { - trace!("v2 API supported: {}", b); - }) + self.build_reqwest(reqwest::Client::new().get(url)) + })?; + + let response = request.send().await?; + + let b = match (response.status(), response.headers().get(api_header)) { + (StatusCode::OK, Some(x)) => Ok(x == api_version), + (StatusCode::UNAUTHORIZED, Some(x)) => Ok(x == api_version), + (s, v) => { + trace!("Got unexpected status {}, header version {:?}", s, v); + Ok(false) + } + }; + + trace!("v2 API supported: {:?}", b); + + b } /// Takes reqwest's async RequestBuilder and injects an authentication header if a token is present - fn build_reqwest( - &self, - req_builder: reqwest::r#async::RequestBuilder, - ) -> reqwest::r#async::RequestBuilder { + fn build_reqwest(&self, req_builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder { let mut builder = req_builder; if let Some(token) = &self.token { diff --git a/src/v2/tags.rs b/src/v2/tags.rs index 574e0d8..262a22b 100644 --- a/src/v2/tags.rs +++ b/src/v2/tags.rs @@ -1,8 +1,12 @@ +use crate::errors::{Error, Result}; use crate::v2::*; +use futures::stream::{self, BoxStream, StreamExt}; use reqwest::{self, header, Url}; +use std::fmt::Debug; +use std::pin::Pin; /// Convenience alias for a stream of `String` tags. -pub type StreamTags = Box + Send>; +pub type StreamTags<'a> = BoxStream<'a, Result>; /// A chunk of tags for an image. /// @@ -18,12 +22,13 @@ struct TagsChunk { impl Client { /// List existing tags for an image. - pub fn get_tags(&self, name: &str, paginate: Option) -> StreamTags { - let client = self.clone(); - let base_url = format!("{}/v2/{}/tags/list", self.base_url, name); - - let fres = futures::stream::unfold(Some(String::new()), move |last| { - let client = client.clone(); + pub fn get_tags<'a, 'b: 'a, 'c: 'a>( + &'b self, + name: &'c str, + paginate: Option, + ) -> StreamTags<'a> { + let inner = stream::unfold(Some(String::new()), async move |last| { + let base_url = format!("{}/v2/{}/tags/list", self.base_url, name); // Stream ends when response has no `Link` header. let link = match last { @@ -32,80 +37,80 @@ impl Client { s => s, }; - let url_paginated = match (paginate, link) { - (Some(p), None) => format!("{}?n={}", base_url, p), - (None, Some(l)) => format!("{}?next_page={}", base_url, l), - (Some(p), Some(l)) => format!("{}?n={}&next_page={}", base_url, p, l), - _ => base_url.to_string(), - }; - - let freq = futures::future::result(Url::parse(&url_paginated)) - .map_err(|e| Error::from(format!("{}", e))) - .inspect(|url| trace!("GET {}", url)) - .map(move |url| { - // receive the next page of tags - - client - .build_reqwest(reqwest::r#async::Client::new().get(url.clone())) - .header(header::ACCEPT, "application/json") - .send() - // ensure the status is OK - .map_err(|e| Error::from(format!("{}", e))) - }) - .flatten() - .and_then(|resp| { - resp.error_for_status() - .map_err(|e| Error::from(format!("{}", e))) - }) - .and_then(|resp| { - // ensure the CONTENT_TYPE header is application/json - - let ct_hdr = resp.headers().get(header::CONTENT_TYPE).cloned(); - let ok = match ct_hdr { - None => false, - Some(ref ct) => ct.to_str().map_err(|e| Error::from(format!("{}", e)))?.starts_with("application/json"), - }; - if !ok { - // TODO:(steveeJ): Make this an error once Satellite - // returns the content type correctly - debug!("get_tags: wrong content type '{:?}', ignoring...", ct_hdr); - } - Ok(resp) - }) - .and_then(|resp| { - // extract the response body and parse the LINK header - - let hdr = resp.headers().get(header::LINK).cloned(); - trace!("next_page {:?}", hdr); - resp.into_body() - .concat2() - .map_err(|e| { - format!("get_tags: failed to fetch the whole body: {}", e).into() - }) - .and_then(move |body| Ok((body, parse_link(hdr)))) - }) - .and_then(|(body, hdr)| -> Result<(TagsChunk, Option)> { - serde_json::from_slice(&body) - .map_err(|e| e.into()) - .map(|tags_chunk| (tags_chunk, hdr)) - }) - .map(|(tags_chunk, link)| { - (futures::stream::iter_ok(tags_chunk.tags.into_iter()), link) - }) - .map_err(|e| format!("{}", e)); - - Some(freq) + match self.fetch_tag(paginate, &base_url, &link).await { + Ok((tags_chunk, next)) => Some((Ok(tags_chunk), next)), + Err(err) => Some((Err(err), None)), + } + }) + .map(|r| match r { + Ok(tags_chunk) => stream::iter( + tags_chunk + .tags + .into_iter() + .map(|t| Ok(t)) + .collect::>(), + ), + Err(err) => stream::iter(vec![Err(err)]), }) .flatten(); - Box::new(fres) + let b = Box::new(inner); + unsafe { Pin::new_unchecked(b) } + } + + async fn fetch_tag( + &self, + paginate: Option, + base_url: &String, + link: &Option, + ) -> Result<(TagsChunk, Option)> { + let url_paginated = match (paginate, link) { + (Some(p), None) => format!("{}?n={}", base_url, p), + (None, Some(l)) => format!("{}?next_page={}", base_url, l), + (Some(p), Some(l)) => format!("{}?n={}&next_page={}", base_url, p, l), + _ => base_url.to_string(), + }; + let url = Url::parse(&url_paginated).map_err(|e| Error::from(format!("{}", e)))?; + + let resp = self + .build_reqwest(reqwest::Client::new().get(url.clone())) + .header(header::ACCEPT, "application/json") + .send() + .await? + .error_for_status() + .map_err(|e| Error::from(format!("{}", e)))?; + + // ensure the CONTENT_TYPE header is application/json + let ct_hdr = resp.headers().get(header::CONTENT_TYPE).cloned(); + + trace!("page url {:?}", ct_hdr); + + let ok = match ct_hdr { + None => false, + Some(ref ct) => ct + .to_str() + .map_err(|e| Error::from(format!("{}", e)))? + .starts_with("application/json"), + }; + if !ok { + // TODO:(steveeJ): Make this an error once Satellite + // returns the content type correctly + debug!("get_tags: wrong content type '{:?}', ignoring...", ct_hdr); + } + + // extract the response body and parse the LINK header + let next = parse_link(resp.headers().get(header::LINK)); + trace!("next_page {:?}", next); + + let tags_chunk = resp.json::().await?; + Ok((tags_chunk, next)) } } /// Parse a `Link` header. /// /// Format is described at https://docs.docker.com/registry/spec/api/#listing-image-tags#pagination. -fn parse_link(hdr: Option) -> Option { +fn parse_link(hdr: Option<&header::HeaderValue>) -> Option { // TODO(lucab): this a brittle string-matching parser. Investigate // whether there is a a common library to do this, in the future. diff --git a/tests/mock/api_version.rs b/tests/mock/api_version.rs index 474cae0..a0eb48c 100644 --- a/tests/mock/api_version.rs +++ b/tests/mock/api_version.rs @@ -3,7 +3,7 @@ extern crate mockito; extern crate tokio; use self::mockito::mock; -use self::tokio::runtime::current_thread::Runtime; +use self::tokio::runtime::Runtime; static API_VERSION_K: &'static str = "Docker-Distribution-API-Version"; static API_VERSION_V: &'static str = "registry/2.0"; diff --git a/tests/mock/base_client.rs b/tests/mock/base_client.rs index 5941c7e..11c0b62 100644 --- a/tests/mock/base_client.rs +++ b/tests/mock/base_client.rs @@ -3,7 +3,7 @@ extern crate mockito; extern crate tokio; use self::mockito::mock; -use self::tokio::runtime::current_thread::Runtime; +use self::tokio::runtime::Runtime; static API_VERSION_K: &'static str = "Docker-Distribution-API-Version"; static API_VERSION_V: &'static str = "registry/2.0"; diff --git a/tests/mock/blobs_download.rs b/tests/mock/blobs_download.rs index 6ac62c3..51332c2 100644 --- a/tests/mock/blobs_download.rs +++ b/tests/mock/blobs_download.rs @@ -4,10 +4,10 @@ extern crate sha2; extern crate tokio; use self::mockito::mock; -use self::tokio::runtime::current_thread::Runtime; +use self::tokio::runtime::Runtime; use crate::mock::blobs_download::sha2::Digest; -type Fallible = Result>; +type Fallible = Result>; #[test] fn test_blobs_has_layer() { diff --git a/tests/mock/catalog.rs b/tests/mock/catalog.rs index 88f3d48..c22368a 100644 --- a/tests/mock/catalog.rs +++ b/tests/mock/catalog.rs @@ -3,9 +3,9 @@ extern crate futures; extern crate mockito; extern crate tokio; -use self::futures::Stream; +use self::futures::StreamExt; use self::mockito::mock; -use self::tokio::runtime::current_thread::Runtime; +use self::tokio::runtime::Runtime; #[test] fn test_catalog_simple() { @@ -29,7 +29,11 @@ fn test_catalog_simple() { let futcheck = dclient.get_catalog(None); - let res = runtime.block_on(futcheck.collect()).unwrap(); + let (res, _errors): (Vec<_>, Vec<_>) = runtime + .block_on(futcheck.collect::>()) + .into_iter() + .partition(Result::is_ok); + let res: Vec<_> = res.into_iter().map(Result::unwrap).collect(); assert_eq!(res, vec!["r1/i1", "r2"]); mockito::reset(); @@ -70,15 +74,15 @@ fn test_catalog_paginate() { let next = dclient.get_catalog(Some(1)); - let (page1, next) = runtime.block_on(next.into_future()).ok().unwrap(); - assert_eq!(page1, Some("r1/i1".to_owned())); + let (page1, next) = runtime.block_on(next.into_future()); + assert_eq!(page1.unwrap().unwrap(), "r1/i1".to_owned()); - let (page2, next) = runtime.block_on(next.into_future()).ok().unwrap(); + let (page2, next) = runtime.block_on(next.into_future()); // TODO(lucab): implement pagination - assert_eq!(page2, None); + assert!(page2.is_none()); - let (end, _) = runtime.block_on(next.into_future()).ok().unwrap(); - assert_eq!(end, None); + let (end, _) = runtime.block_on(next.into_future()); + assert!(end.is_none()); mockito::reset(); } diff --git a/tests/mock/tags.rs b/tests/mock/tags.rs index 01fcd80..74b221b 100644 --- a/tests/mock/tags.rs +++ b/tests/mock/tags.rs @@ -3,9 +3,9 @@ extern crate futures; extern crate mockito; extern crate tokio; -use self::futures::Stream; +use self::futures::StreamExt; use self::mockito::mock; -use self::tokio::runtime::current_thread::Runtime; +use self::tokio::runtime::Runtime; #[test] fn test_tags_simple() { @@ -31,8 +31,9 @@ fn test_tags_simple() { let futcheck = dclient.get_tags(name, None); - let res = runtime.block_on(futcheck.collect()).unwrap(); - assert_eq!(res, vec!["t1", "t2"]); + let res = runtime.block_on(futcheck.collect::>()); + assert_eq!(res.get(0).unwrap().as_ref().unwrap(), &String::from("t1")); + assert_eq!(res.get(1).unwrap().as_ref().unwrap(), &String::from("t2")); mockito::reset(); } @@ -75,14 +76,14 @@ fn test_tags_paginate() { let next = dclient.get_tags(name, Some(1)); - let (first_tag, stream_rest) = runtime.block_on(next.into_future()).ok().unwrap(); - assert_eq!(first_tag, Some("t1".to_owned())); + let (first_tag, stream_rest) = runtime.block_on(next.into_future()); + assert_eq!(first_tag.unwrap().unwrap(), "t1".to_owned()); - let (second_tag, stream_rest) = runtime.block_on(stream_rest.into_future()).ok().unwrap(); - assert_eq!(second_tag, Some("t2".to_owned())); + let (second_tag, stream_rest) = runtime.block_on(stream_rest.into_future()); + assert_eq!(second_tag.unwrap().unwrap(), "t2".to_owned()); - let (end, _) = runtime.block_on(stream_rest.into_future()).ok().unwrap(); - assert_eq!(end, None); + let (end, _) = runtime.block_on(stream_rest.into_future()); + assert!(end.is_none()); mockito::reset(); } @@ -108,8 +109,8 @@ fn test_tags_404() { let futcheck = dclient.get_tags(name, None); - let res = runtime.block_on(futcheck.collect()); - assert!(res.is_err()); + let res = runtime.block_on(futcheck.collect::>()); + assert!(res.get(0).unwrap().as_ref().is_err()); mockito::reset(); } @@ -137,8 +138,8 @@ fn test_tags_missing_header() { let futcheck = dclient.get_tags(name, None); - let res = runtime.block_on(futcheck.collect()); - assert!(!res.is_err()); + let res = runtime.block_on(futcheck.collect::>()); + assert!(!res.get(0).unwrap().as_ref().is_err()); mockito::reset(); }