New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Storage Table Paging #83
Comments
Hello Alexei, Something like: let client = TableClient::new(&account, &master_key);
let table = CloudTable::new(client, "my_table");
// make sure to import futures::stream::StreamExt
// Pin is a Rust streaming async requirement, don't worry too much
// about it, just make sure to use it.
// Replace u64 with the type you want to receive.
let mut stream = Box::pin(table.stream_query::<u64>(Some("your query here")));
// stream.next() will handle the Continuation Token for you
while let Some(value) = stream.next().await {
// do what you want with the returned Vec
println!("{:?}", value);
} I haven't actually run this code, I just made sure it compiles. If it's not working as expected please tell us! |
Hi @MindFlavor, Thanks for the very quick response! A bit more background, I'm writing a RESTful API which will present an API like: GET /things?limit=10&next=abcd For example, the series of HTTP calls that would be typical:
The steam query interface looks excellent for backend operations that simply want all pages of data, however, to limit the amount of IO used, the frontend should query pages of data at a time, not all data at once. I've got a local implementation of the Here's the implementation (minus the error struct and tests) so that it's clear what I'm thinking of: impl std::fmt::Display for Continuation {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(next) = &self.next {
fmt.write_str(&next.partition_key)?;
fmt.write_str(".")?;
fmt.write_str(&next.row_key)?;
}
Ok(())
}
}
impl std::str::FromStr for Continuation {
type Err = ContinuationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let keys: Vec<&str> = s.split('.').collect();
match keys.len() {
0 | 1 => Ok(Continuation::start()),
2 => Ok(Continuation {
fused: false,
next: Some(ContinuationCursor {
partition_key: keys[0].to_owned(),
row_key: keys[1].to_owned(),
}),
}),
_ => Err(Self::Err::InvalidContinuationFormat),
}
}
} |
wow that's great 👍 ! However I would rethink the I was thinking to split
pub async fn execute_query<T>(
&self,
query: &str,
) -> Result<Option<(Vec<TableEntity<T>>, Option<ContinuationToken>)>, AzureError>
where
T: DeserializeOwned + Serialize { ... }
pub async fn execute_get_all<T>(
&self,
) -> Result<Option<(Vec<TableEntity<T>>, Option<ContinuationToken>)>, AzureError>
where
T: DeserializeOwned + Serialize { ... }
pub async fn continue_query<T>(
&self,
continuation_token: ContinuationToken
) -> Result<Option<(Vec<TableEntity<T>>, Option<ContinuationToken>)>, AzureError>
where
T: DeserializeOwned + Serialize { ... } This way the caller does not have to handle the continuation token himself, it will be treated as an opaque type (as it should be). What do you think? |
I like that pattern 👍, I'll give it a shot and submit a PR and then we can figure out if it actually looks good there. |
Thank you very much! |
OK, I'm most of the way there, need to fix up tests. pub async fn execute_query<T>(
&self,
query: &str,
) -> Result<(Vec<TableEntity<T>>, Option<ContinuationToken>), AzureError>
where
T: DeserializeOwned + Serialize,
{
self.continue_query(ContinuationToken::start(Some(query)))
.await
}
pub async fn execute_get_all<T>(
&self,
) -> Result<(Vec<TableEntity<T>>, Option<ContinuationToken>), AzureError>
where
T: DeserializeOwned + Serialize,
{
self.continue_query(ContinuationToken::start(None)).await
}
pub async fn continue_query<T>(
&self,
continuation_token: ContinuationToken,
) -> Result<(Vec<TableEntity<T>>, Option<ContinuationToken>), AzureError>
where
T: DeserializeOwned + Serialize,
{
log::debug!(
"query_entities(continuation_token = {:?})",
continuation_token
);
let mut path = self.table_name.to_owned();
path.push_str("?");
if let Some(ref query) = continuation_token.query {
path.push_str(&query);
}
if let Some(ref next) = continuation_token.next {
path.push_str("&NextPartitionKey=");
path.push_str(&next.partition_key);
path.push_str("&NextRowKey=");
path.push_str(&next.row_key);
}
let future_response = self.client.request_with_default_header(
path.as_str(),
&Method::GET,
None,
MetadataDetail::Full, // etag is provided through metadata only
&|req| req,
)?;
let (headers, body) =
check_status_extract_headers_and_body(future_response, StatusCode::OK).await?;
log::trace!("body == {:?}", std::str::from_utf8(&body));
let entities = serde_json::from_slice::<EntityCollection<T>>(&body)?;
let continuation = ContinuationToken::try_from((continuation_token.query, &headers))?;
match continuation.next {
Some(_) => Ok((entities.value, Some(continuation))),
None => Ok((entities.value, None)),
}
}
pub fn stream_query<'a, T>(
&'a self,
query: Option<&'a str>,
) -> impl Stream<Item = Result<Vec<TableEntity<T>>, AzureError>> + 'a
where
T: Serialize + DeserializeOwned + 'a,
{
futures::stream::unfold(ContinuationToken::start(query), move |cont| async move {
match self.continue_query(cont.clone()).await {
Ok((segment, continuation)) => match continuation {
Some(c) => Some((Ok(segment), c)),
None => None,
},
Err(err) => Some((Err(err), cont)),
}
})
} I changed the signature to remove the Option around the vec and token, it didn't seem necessary. Let me know if that seems like a bad decision to you. I haven't written much rust yet so still feeling my way around. |
Hurmm, wait, that stream query clearly doesn't work lol. I'll think a bit longer about that in the morning :) |
I took a stab at it in the aforementioned PR. Can you please take a look at this and tell me what you think? Right now it goes like this: let response = table_client.begin_get_all::<MyEntity>().await?;
println!("{:?}", response.entities);
println!("{:?}", response.continuation_token);
let mut response = table_client.begin_query::<MyEntity>("$top=2").await?;
println!("{:?}", response.entities);
println!("{:?}", response.continuation_token);
while let Some(continuation_token) = response.continuation_token {
println!("we have more data!");
response = table_client
.continue_execution::<MyEntity>(continuation_token)
.await?;
println!("{:?}", response.entities);
println!("{:?}", response.continuation_token);
} I still do not like the fact we cannot pass parameters to |
I think that API would work. I think in the implementation of the derivative functions That PR doesn't include any way to serialize the continuation as far as I can tell, so it doesn't actually solve this issue for me. For example, you could do something like: pub fn continue_from_token(token: &str, query: Option<&str>) -> Result<Self, ContinuationError> {
let keys: Vec<&str> = token.split('.').collect();
match keys.len() {
2 => Ok(ContinuationToken {
query_path: match query {
Some(q) => q.to_owned(),
None => "".to_owned(),
},
partition_key: keys[0].to_owned(),
row_key: keys[1].to_owned(),
}),
_ => Err(ContinuationError::InvalidContinuationFormat),
}
}
pub fn next_token(&self) -> String {
format!("{}.{}", self.partition_key, self.row_key)
} |
Yes, you are right, I've pushed a less copypasta version 🤦♂️ .
Sorry I missed that. A simple derive from serde should cover your needs (pushed right now). The proposed |
That's a bit atypical from what I've seen for APIs around this previously. It would technically work for what I want, but only because I can deserialize the result into my own representation of that and pull out the data. It is expected that if someone passed a next token with a different query, that they'd get back unreliable data. Here's an example of the continuation token class for the .NET cosmos table API: https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.cosmos.table.tablecontinuationtoken?view=azure-dotnet It simply exposes the next partition and row keys. I'd be happy with that implementation because then I could add whatever format I want in my own code. I could simply return the entire query + keys to the user, but it would be unusual as far as RESTful API design goes, so I'd almost certainly just end up serializing the data, deserializing it into my own struct, and then create the token I need from that. Especially because the query language is database specific, and should not be something I am telling my users. A good web API shouldn't care what database backs it, and shouldn't expose that to the caller. |
Yes you are right, but for this exact reason I have encapsulated the partition key, row key and the query parameters in the #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContinuationToken {
pub(crate) query_path: String,
pub(crate) partition_key: String,
pub(crate) row_key: String,
} This way no one can accidentally pass the wrong token: they simply cannot construct it (it's opaque). But I have the feeling I am not understanding what you mean 🙇♂️ ... Do you want to create a |
Yup! I won't be presenting the raw OData query syntax to the user. For example, let's say we have entities that can be filtered by customer. I might expect an API call to find all for a given customer like: Finally, I'd pass back the data needed to get the next page of the query results. The next call would look something like: Given that, that's the API, it would be super strange for me to expose the backend query syntax used by the database to the user. So there is no way I'd be returning that in the interface. I think there are probably tones of use cases for just keeping the entire token opaque, but in the case of a RUSTful API, it provides the wrong abstraction. If the user-provided a different query compared to the last call, thus constructing the continuation token wrong, I wouldn't expect the results to make a huge amount of sense. Given that, that's how the storage API works that this SDK is abstracting as well, I'm clearly not the only person willing to make that sacrafice. |
Ok I've added the ability to construct Something like: impl ContinuationToken {
pub fn new(previous_url: Url, next_partition_key: &str, next_row_key: &str) -> Self {
//... You can get the old and new url with |
That's a very comprehensive API and solves my problem 👍 |
Yay! 🥂 (sorry for taking so long to understand you needs 🙇♂️)! I'll fix some minor stuff and mark the PR ad complete. We'll close the issue as soon as it's merged. Update: |
No that's fine! Pleasure working with you :) I've only really got weekends to work on this at the moment but I'll put it into practice as soon as I can! |
Support for paging results for data tables was added in #874 |
I can't seem to find a sensible way to page storage table query results.
There isn't an obvious way (to me) to create a
Continuation
at a given point, nor does there appear to be a good way to serialize aContinuation
. I would like to provide a simple RESTful API that offers a single stringnext
that can be given to the API again to continue a query. This SDK doesn't appear to let me do that.I'm not all that good at rust yet, but I think what I want is for
Continuation
to implement the traitsToString
andFromStr
, so that I can simply return a string to the API caller that they can pass back to me for the next page.The text was updated successfully, but these errors were encountered: