Skip to content

Commit

Permalink
Update tokio and use async/await
Browse files Browse the repository at this point in the history
  • Loading branch information
schrieveslaach committed Jan 6, 2020
1 parent ca37718 commit cb6141b
Show file tree
Hide file tree
Showing 25 changed files with 559 additions and 713 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ tag-prefix = ""
[dependencies]
base64 = "0.11"
error-chain = { version = "0.12", default-features = false }
futures = "0.1"
futures = "0.3"
http = "0.2"
libflate = "0.1"
log = "0.4"
mime = "0.3"
mockito = { version = "0.22", optional = true }
regex = "^1.1.0"
serde = "1"
serde_derive = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
strum = "0.16"
strum_macros = "0.16"
tar = "0.4"
tokio = "0.1"
tokio = "0.2"
dirs = "2.0"
reqwest = { version = "^0.9.6", default-features = false }
reqwest = { version = "0.10", default-features = false, features = ["json"] }
sha2 = "^0.8.0"

[dev-dependencies]
env_logger = "0.7"
spectral = "0.6"
tokio = { version = "0.2", features = ["macros"] }

[features]
default = ["reqwest-default-tls"]
Expand Down
12 changes: 5 additions & 7 deletions examples/checkregistry.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
extern crate tokio;

use std::{boxed, error};
use tokio::runtime::current_thread::Runtime;

fn main() {
#[tokio::main]
async fn main() {
let registry = match std::env::args().nth(1) {
Some(x) => x,
None => "registry-1.docker.io".into(),
};

let res = run(&registry);
let res = run(&registry).await;

if let Err(e) = res {
println!("[{}] {}", registry, e);
std::process::exit(1);
};
}

fn run(host: &str) -> Result<bool, boxed::Box<dyn error::Error>> {
let mut runtime = Runtime::new()?;
async fn run(host: &str) -> Result<bool, boxed::Box<dyn error::Error>> {
let dclient = dkregistry::v2::Client::configure()
.registry(host)
.insecure_registry(false)
.build()?;
let futcheck = dclient.is_v2_supported();

let supported = runtime.block_on(futcheck)?;
let supported = dclient.is_v2_supported().await?;
if supported {
println!("{} supports v2", host);
} else {
Expand Down
57 changes: 18 additions & 39 deletions examples/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,23 @@
extern crate futures;

use futures::prelude::*;

pub fn authenticate_client(
pub async fn authenticate_client(
mut client: dkregistry::v2::Client,
login_scope: String,
) -> impl Future<Item = dkregistry::v2::Client, Error = dkregistry::errors::Error> {
futures::future::ok::<_, dkregistry::errors::Error>(client.clone())
.and_then(|dclient| {
dclient.is_v2_supported().and_then(|v2_supported| {
if !v2_supported {
Err("API v2 not supported".into())
} else {
Ok(dclient)
}
})
})
.and_then(|dclient| {
dclient.is_auth(None).and_then(move |is_auth| {
if is_auth {
Ok(dclient)
} else {
Err("login required".into())
}
})
})
.or_else(move |_| {
client
.login(&[login_scope.as_str()])
.and_then(move |token| {
client
.is_auth(Some(token.token()))
.and_then(move |is_auth| {
if !is_auth {
Err("login failed".into())
} else {
println!("logged in!");
Ok(client.set_token(Some(token.token())).clone())
}
})
})
})
) -> Result<dkregistry::v2::Client, dkregistry::errors::Error> {
if !client.is_v2_supported().await? {
return Err("API v2 not supported".into());
}

if client.is_auth(None).await? {
return Ok(client);
}

let token = client.login(&[login_scope.as_str()]).await?;

if !client.is_auth(Some(token.token())).await? {
Err("login failed".into())
} else {
println!("logged in!");
Ok(client.set_token(Some(token.token())).clone())
}
}
17 changes: 6 additions & 11 deletions examples/image-labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ extern crate tokio;

use dkregistry::reference;
use dkregistry::v2::manifest::Manifest;
use futures::prelude::*;
use std::result::Result;
use std::str::FromStr;
use std::{env, fs, io};
use tokio::runtime::current_thread::Runtime;

mod common;

fn main() {
#[tokio::main]
async fn main() {
let dkr_ref = match std::env::args().nth(1) {
Some(ref x) => reference::Reference::from_str(x),
None => reference::Reference::from_str("quay.io/steveej/cincinnati-test-labels:0.0.0"),
Expand Down Expand Up @@ -46,21 +45,19 @@ fn main() {
}
};

let res = run(&dkr_ref, user, password);
let res = run(&dkr_ref, user, password).await;

if let Err(e) = res {
println!("[{}] {}", registry, e);
std::process::exit(1);
};
}

fn run(
async fn run(
dkr_ref: &reference::Reference,
user: Option<String>,
passwd: Option<String>,
) -> Result<(), dkregistry::errors::Error> {
let mut runtime = Runtime::new()?;

let client = dkregistry::v2::Client::configure()
.registry(&dkr_ref.registry())
.insecure_registry(false)
Expand All @@ -72,10 +69,8 @@ fn run(
let login_scope = format!("repository:{}:pull", image);
let version = dkr_ref.version();

let futures = common::authenticate_client(client, login_scope)
.and_then(|dclient| dclient.get_manifest(&image, &version));

let manifest = match runtime.block_on(futures) {
let dclient = common::authenticate_client(client, login_scope).await?;
let manifest = match dclient.get_manifest(&image, &version).await {
Ok(manifest) => Ok(manifest),
Err(e) => Err(format!("Got error {}", e)),
}?;
Expand Down
48 changes: 22 additions & 26 deletions examples/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ extern crate serde_json;
extern crate tokio;

use dkregistry::render;
use futures::prelude::*;
use futures::future::join_all;
use std::result::Result;
use std::{boxed, env, error, fs, io};

mod common;

fn main() {
#[tokio::main]
async fn main() {
let registry = match std::env::args().nth(1) {
Some(x) => x,
None => "quay.io".into(),
Expand Down Expand Up @@ -51,15 +52,15 @@ fn main() {
}
};

let res = run(&registry, &image, &version, user, password);
let res = run(&registry, &image, &version, user, password).await;

if let Err(e) = res {
println!("[{}] {}", registry, e);
std::process::exit(1);
};
}

fn run(
async fn run(
registry: &str,
image: &str,
version: &str,
Expand All @@ -80,28 +81,23 @@ fn run(

let login_scope = format!("repository:{}:pull", image);

let futures = common::authenticate_client(client, login_scope)
.and_then(|dclient| {
dclient
.get_manifest(&image, &version)
.and_then(|manifest| Ok((dclient, manifest.layers_digests(None)?)))
})
.and_then(|(dclient, layers_digests)| {
println!("{} -> got {} layer(s)", &image, layers_digests.len(),);

futures::stream::iter_ok::<_, dkregistry::errors::Error>(layers_digests)
.and_then(move |layer_digest| {
let get_blob_future = dclient.get_blob(&image, &layer_digest);
get_blob_future.inspect(move |blob| {
println!("Layer {}, got {} bytes.\n", layer_digest, blob.len());
})
})
.collect()
});

let blobs = tokio::runtime::current_thread::Runtime::new()
.unwrap()
.block_on(futures)?;
let dclient = common::authenticate_client(client, login_scope).await?;
let manifest = dclient.get_manifest(&image, &version).await?;
let layers_digests = manifest.layers_digests(None)?;

println!("{} -> got {} layer(s)", &image, layers_digests.len(),);

let blob_futures = layers_digests
.iter()
.map(|layer_digest| dclient.get_blob(&image, &layer_digest))
.collect::<Vec<_>>();

let (blobs, errors): (Vec<_>, Vec<_>) = join_all(blob_futures)
.await
.into_iter()
.partition(Result::is_ok);
let blobs: Vec<_> = blobs.into_iter().map(Result::unwrap).collect();
let _errors: Vec<_> = errors.into_iter().map(Result::unwrap_err).collect();

println!("Downloaded {} layers", blobs.len());

Expand Down
22 changes: 7 additions & 15 deletions examples/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ extern crate tokio;

mod common;

use futures::prelude::*;
use std::result::Result;
use std::{boxed, error};
use tokio::runtime::current_thread::Runtime;

fn main() {
#[tokio::main]
async fn main() {
let registry = match std::env::args().nth(1) {
Some(x) => x,
None => "registry-1.docker.io".into(),
Expand All @@ -28,15 +27,15 @@ fn main() {
println!("[{}] no $DKREG_PASSWD for login password", registry);
}

let res = run(&registry, user, password, login_scope);
let res = run(&registry, user, password, login_scope).await;

if let Err(e) = res {
println!("[{}] {}", registry, e);
std::process::exit(1);
};
}

fn run(
async fn run(
host: &str,
user: Option<String>,
passwd: Option<String>,
Expand All @@ -47,21 +46,14 @@ fn run(
.filter(Some("trace"), log::LevelFilter::Trace)
.try_init()?;

let mut runtime = Runtime::new()?;

let client = dkregistry::v2::Client::configure()
.registry(host)
.insecure_registry(false)
.username(user)
.password(passwd)
.build()?;

let futures = common::authenticate_client(client, login_scope)
.and_then(|dclient| dclient.is_v2_supported());

match runtime.block_on(futures) {
Ok(login_successful) if login_successful => Ok(()),
Err(e) => Err(Box::new(e)),
_ => Err("Login unsucessful".into()),
}
let dclient = common::authenticate_client(client, login_scope).await?;
dclient.is_v2_supported().await?;
Ok(())
}
34 changes: 17 additions & 17 deletions examples/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ extern crate tokio;

mod common;

use futures::prelude::*;
use futures::stream::StreamExt;
use std::result::Result;
use std::{boxed, error};
use tokio::runtime::current_thread::Runtime;

fn main() {
#[tokio::main]
async fn main() {
let registry = match std::env::args().nth(1) {
Some(x) => x,
None => "registry-1.docker.io".into(),
Expand All @@ -29,15 +29,15 @@ fn main() {
println!("[{}] no $DKREG_PASSWD for login password", registry);
}

let res = run(&registry, user, password, &image);
let res = run(&registry, user, password, &image).await;

if let Err(e) = res {
println!("[{}] {}", registry, e);
std::process::exit(1);
};
}

fn run(
async fn run(
host: &str,
user: Option<String>,
passwd: Option<String>,
Expand All @@ -48,7 +48,6 @@ fn run(
.filter(Some("trace"), log::LevelFilter::Trace)
.try_init()?;

let mut runtime = Runtime::new()?;
let client = dkregistry::v2::Client::configure()
.registry(host)
.insecure_registry(false)
Expand All @@ -58,17 +57,18 @@ fn run(

let login_scope = format!("repository:{}:pull", image);

let futures = common::authenticate_client(client, login_scope)
.and_then(|dclient| dclient.get_tags(&image, Some(7)).collect())
.and_then(|tags| {
for tag in tags {
println!("{:?}", tag);
}
Ok(())
});
let dclient = common::authenticate_client(client, login_scope).await?;

match runtime.block_on(futures) {
Ok(_) => Ok(()),
Err(e) => Err(Box::new(e)),
let (tags, _errors): (Vec<_>, Vec<_>) = dclient
.get_tags(&image, Some(7))
.collect::<Vec<_>>()
.await
.into_iter()
.partition(Result::is_ok);

for tag in tags {
println!("{:?}", tag);
}

Ok(())
}
Loading

0 comments on commit cb6141b

Please sign in to comment.