Skip to content

Commit

Permalink
v2/{catalog,tags}: use async_stream and yield
Browse files Browse the repository at this point in the history
The motivation is to reduce code complexity.

It removes the necessity of Box'ing the Stream, and moves the
responsibility of doing so if necessary to the consumer.
  • Loading branch information
steveej committed Jan 29, 2020
1 parent 82275b5 commit 4410140
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 75 deletions.
54 changes: 19 additions & 35 deletions src/v2/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,41 @@
use crate::errors::{Error, Result};
use crate::v2;
use futures::{
self,
stream::{self, BoxStream, StreamExt},
};
use async_stream::try_stream;
use futures::stream::Stream;
use futures::{self};
use reqwest::{RequestBuilder, StatusCode};
use std::pin::Pin;

/// Convenience alias for a stream of `String` repos.
pub type StreamCatalog<'a> = BoxStream<'a, Result<String>>;

#[derive(Debug, Default, Deserialize, Serialize)]
struct Catalog {
pub repositories: Vec<String>,
}

impl v2::Client {
pub fn get_catalog<'a, 'b: 'a>(&'b self, paginate: Option<u32>) -> StreamCatalog<'a> {
pub fn get_catalog<'a, 'b: 'a>(
&'b self,
paginate: Option<u32>,
) -> impl Stream<Item = Result<String>> + 'a {
let url = {
let suffix = if let Some(n) = paginate {
format!("?n={}", n)
} else {
"".to_string()
};
let ep = format!("{}/v2/_catalog{}", self.base_url.clone(), suffix);
match reqwest::Url::parse(&ep) {
Ok(url) => url,
Err(e) => {
let b = Box::new(stream::iter(vec![Err(Error::from(format!(
"failed to parse url from string '{}': {}",
ep, e
)))]));
return unsafe { Pin::new_unchecked(b) };
}
}

reqwest::Url::parse(&ep)
.chain_err(|| format!("failed to parse url from string '{}'", ep))
};

let req = self.build_reqwest(reqwest::Client::new().get(url));
let inner = stream::once(fetch_catalog(req))
.map(|r| match r {
Ok(catalog) => stream::iter(
catalog
.repositories
.into_iter()
.map(|t| Ok(t))
.collect::<Vec<_>>(),
),
Err(err) => stream::iter(vec![Err(err)]),
})
.flatten();
try_stream! {
let req = self.build_reqwest(reqwest::Client::new().get(url?));

let b = Box::new(inner);
unsafe { Pin::new_unchecked(b) }
let catalog = fetch_catalog(req).await?;

for repo in catalog.repositories {
yield repo;
}
}
}
}

Expand All @@ -70,5 +54,5 @@ async fn fetch_catalog(req: RequestBuilder) -> Result<Catalog> {
}
Err(err) => Err(format!("{}", err)),
}
.map_err(|e| Error::from(e))
.map_err(Error::from)
}
2 changes: 0 additions & 2 deletions src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ mod config;
pub use self::config::Config;

mod catalog;
pub use self::catalog::StreamCatalog;

mod auth;
pub use self::auth::TokenAuth;

pub mod manifest;

mod tags;
pub use self::tags::StreamTags;

mod blobs;

Expand Down
56 changes: 20 additions & 36 deletions src/v2/tags.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use crate::errors::{Error, Result};
use crate::v2::*;
use futures::stream::{self, BoxStream, StreamExt};
use async_stream::try_stream;
use reqwest::{self, header, Url};
use std::fmt::Debug;
use std::pin::Pin;

/// Convenience alias for a stream of `String` tags.
pub type StreamTags<'a> = BoxStream<'a, Result<String>>;

/// A chunk of tags for an image.
///
Expand All @@ -26,42 +22,30 @@ impl Client {
&'b self,
name: &'c str,
paginate: Option<u32>,
) -> StreamTags<'a> {
let inner = stream::unfold(Some(String::new()), move |last| async move {
let base_url = format!("{}/v2/{}/tags/list", self.base_url, name);

// Stream ends when response has no `Link` header.
let link = match last {
None => return None,
Some(ref s) if s == "" => None,
s => s,
};

match self.fetch_tag(paginate, &base_url, &link).await {
Ok((tags_chunk, next)) => Some((Ok(tags_chunk), next)),
Err(err) => Some((Err(err), None)),
) -> impl Stream<Item = Result<String>> + 'a {
let base_url = format!("{}/v2/{}/tags/list", self.base_url, name);
let mut link: Option<String> = None;

try_stream! {
loop {
let (tags_chunk, last) = self.fetch_tags_chunk(paginate, &base_url, &link).await?;
for tag in tags_chunk.tags {
yield tag;
}

link = match last {
None => break,
Some(ref s) if s == "" => None,
s => s,
};
}
})
.map(|r| match r {
Ok(tags_chunk) => stream::iter(
tags_chunk
.tags
.into_iter()
.map(|t| Ok(t))
.collect::<Vec<_>>(),
),
Err(err) => stream::iter(vec![Err(err)]),
})
.flatten();

let b = Box::new(inner);
unsafe { Pin::new_unchecked(b) }
}
}

async fn fetch_tag(
async fn fetch_tags_chunk(
&self,
paginate: Option<u32>,
base_url: &String,
base_url: &str,
link: &Option<String>,
) -> Result<(TagsChunk, Option<String>)> {
let url_paginated = match (paginate, link) {
Expand Down
2 changes: 1 addition & 1 deletion tests/mock/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn test_catalog_paginate() {
.build()
.unwrap();

let next = dclient.get_catalog(Some(1));
let next = Box::pin(dclient.get_catalog(Some(1)));

let (page1, next) = runtime.block_on(next.into_future());
assert_eq!(page1.unwrap().unwrap(), "r1/i1".to_owned());
Expand Down
2 changes: 1 addition & 1 deletion tests/mock/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn test_tags_paginate() {
.build()
.unwrap();

let next = dclient.get_tags(name, Some(1));
let next = Box::pin(dclient.get_tags(name, Some(1)));

let (first_tag, stream_rest) = runtime.block_on(next.into_future());
assert_eq!(first_tag.unwrap().unwrap(), "t1".to_owned());
Expand Down

0 comments on commit 4410140

Please sign in to comment.