Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream mailFolder messages along with '@odata.deltaLink' #44

Closed
madanbisht opened this issue Aug 26, 2020 · 26 comments
Closed

Stream mailFolder messages along with '@odata.deltaLink' #44

madanbisht opened this issue Aug 26, 2020 · 26 comments
Labels
enhancement New feature or request question Further information is requested

Comments

@madanbisht
Copy link

madanbisht commented Aug 26, 2020

Hi David,
In our use case we are trying to stream all the messages from 'inbox' mailFolder(using delta()) to an agent application as shown below.

Stream<Message> messages = OdataFactory
                    .request()
                    .users('test@mydomain.com')
                    .mailFolders('inbox')
                    .messages()
                    .delta()
                    .stream();

Here we are trying to append '@odata.deltaLink' in the final data to the agent, so that next time agent can use the '@odata.deltaLink' to get incremental changes in the mailFolder. Can you please help to fulfill this requirement using stream?

Thanks
Madan

@davidmoten
Copy link
Owner

Here's the documentation https://github.com/davidmoten/odata-client#delta-collections.

CollectionPage<Message> delta = OdataFactory
                    .request()
                    .users('test@mydomain.com')
                    .mailFolders('inbox')
                    .messages()
                    .delta()
                    .get();

delta.stream()...

// a while later
delta = delta.nextDelta();
...

@davidmoten
Copy link
Owner

BTW the first call to delta() lists all messages I think so you might want to use .deltaTokenLatest() as per documentation.

@davidmoten davidmoten added the question Further information is requested label Aug 27, 2020
@davidmoten
Copy link
Owner

Is this question about serialization? If you serialize the CollectionPage<Message> to json it should have the @odata.deltaLink field in it.

@madanbisht
Copy link
Author

Thanks for the response,
We are expecting millions of messages in a Mail-folder and service will not be able to process all the messages locally. So we are streaming the messages received from exchange server. Initially service will pull k records(where k is the pagesize) & send to the agent, then it will again fetch next k records & send to agent, this will keep on going until it gets 'deltaLink' in the response. As soon as service receives 'deltaLink' in the response it will send the records(from last page) to agent, also since 'deltaLink' is received so it will stop pulling data.

Now we need to append 'deltaLink' with the last set of records(from the page containing deltaLink) to the agent. Can you please suggest how to fulfill this requirement?

@davidmoten
Copy link
Owner

You can use the CollectionPage.deltaLink() method. Note also that when you serialize a CollectionPage to JSON it will include the deltaLink field if present.

@davidmoten
Copy link
Owner

BTW in terms of page size you should also read https://github.com/davidmoten/odata-client#your-own-page-size.

@madanbisht
Copy link
Author

madanbisht commented Aug 31, 2020

Thanks David for the consideration,

Here we are using stream(as given below) and it seems that the library automatically serializes based on mail message records like collectionPage.currentPage();, since the data on agent doesn't shows the nextlink or deltalink. Any suggestions here?


public Flux<Stream<Message>> GetMails()
{
       Stream<Message> collectionPage = OdataFactory
                    .request()
                    .users('test@mydomain.com')
                    .mailFolders('inbox')
                    .messages()
                    .delta()
                    .maxPageSize(20)
                    .stream();

       return Flux.just(collectionPage);
}

@davidmoten
Copy link
Owner

davidmoten commented Aug 31, 2020

Ah well this boils down to how a Stream serializes doesn't it. A Stream is not a List but it is like one for serialization purposes (the library you are using presumably does something predictable for Streams). Just imagine you take all the Message objects out of a CollectionPage object and add them to an ArrayList. What makes you think that the metadata will go across with it? It won't!

What's the maximum number of messages you want to go across in one call to getMails? It certainly won't be 20 with your current code because the stream keeps getting more pages. You could just return Flux<CollectionPage<Message>> if you were happy with returning one page per call. Then the serialization would include nextLink, and deltaLink (if at last page). It's ugly to couple an API to an internal libraries classes, you could always create your own Page object and return that Flux<Page<Message>>. Of course if you only care about the JSON representation across the network then it doesn't matter.

BTW, you do realize that the Message object won't include email attachments, especially large ones, and there is weirdness with special attachments like Reference attachments? If you want to capture the whole SMTP message reliably you can get it in MIME format as a stream from Graph api using odata-client-msgraph. I can show you how if is of interest.

@madanbisht
Copy link
Author

Apologize, '.maxPageSize(20)' is not relevant in the function( 'GetMails') shared above, as the stream will automatically pull the the pages util deltalink is received.

Agree we can pull all the records using 'nextPage().get()' until deltalink is received and finally add these records into a CustomPage along with deltalink. Using this approach the response on agent will have records as well as the deltalink. In this approach we need to store all the records into a list locally and if we consider a scenario where Mail-folder has millions of messages then this approach will impact the performance and also service may not be able to store all these messages locally, if it doesn't have enough memory.

This is why we are going for stream but the only challenge using stream is how to send 'deltalink' in the response to the agent, so that next time agent can make request for only incremental changes.

@davidmoten
Copy link
Owner

Thanks for the detail, I'll give you an example of what to do shortly.

davidmoten added a commit that referenced this issue Sep 2, 2020
@davidmoten
Copy link
Owner

I'm adding a method with the signature

Stream<ObjectOrDeltaLink<T>> streamWithDeltaLink()

to CollectionPage<T>.

The approach to solve your problem is to take a stream of n Message objects and convert that to a stream of n+1 wrapper objects where the first n objects contain a Message and the last object has an Optional deltaLink (after all, not every stream has a deltaLink at the end).

When Stream<ObjectOrDeltaLink<Message>> is serialized you should see;

[
{ "object": MESSAGE_JSON, "deltaLink": null}, 
...
{ "object": MESSAGE_JSON, "deltaLink": null},
{ "object": null, "deltaLink": "https://blahblah" }
]

I'll finish tests and then let you try it out.

@davidmoten
Copy link
Owner

davidmoten commented Sep 2, 2020

I've merged the change into master. So now you can do this:

public Flux<ObjectOrDeltaLink<Message>> getMails()
{
       Stream<ObjectOrDeltaLink<Message>> stream = 
               client
                    .users('test@mydomain.com')
                    .mailFolders('inbox')
                    .messages()
                    .delta()
                    .deltaTokenLatest()
                    .streamWithDeltaLink();

       return Flux.fromStream(stream);
}

Note the use of deltaTokenLatest (are you familiar with the effects of that option?) and you should not need to rebuild a client from a factory every call. A client built once can be used for the lifetime of an application and is threadsafe. Note also that you were returning a Flux using Flux.just and you should be using Flux.fromStream.

What library are you using to expose a Flux across the network (what library is doing the serialization of a Flux into JSON)? Is it WebFlux?

@madanbisht
Copy link
Author

madanbisht commented Sep 2, 2020

Thank you David, yes we are using WebFlux.
I hope the above code changes will be part of release 0.1.35, when it will be released?

@davidmoten
Copy link
Owner

I'll look at a release in the next day or so. In the meantime you can just do this to use the SNAPSHOT version locally:

git clone https://github.com/davidmoten/odata-client.git
cd odata-client
mvn clean install

@madanbisht
Copy link
Author

Hi David,
Thanks a lot, we have tested and the new code changes produces output as per our expectation.

Just small request can we have method 'streamWithDeltaLink' should be part of 'CollectionPageNonEntityRequest' and 'CollectionEntityRequestOptionsBuilder' also?

Otherwise we are ok with the current code changes as well and below is the invocation:

public Flux<ObjectOrDeltaLink<Message>> getMails()
{
Stream<ObjectOrDeltaLink<Message>> stream = 
               client
                    .users('test@mydomain.com')
                    .mailFolders('inbox')
                    .messages()
                    .delta()
                    .get()
                    .streamWithDeltaLink();

return Flux.fromStream(stream);
}

@davidmoten
Copy link
Owner

Glad to hear it works @madanbisht, thanks for testing the change. I've added the extra methods as requested and I'll build a release shortly.

@davidmoten
Copy link
Owner

0.1.35 is on Maven Central now.

@madanbisht
Copy link
Author

madanbisht commented Sep 4, 2020

Thank you, now we are using build 0.1.35 and its perfectly working.

Just one small request like deltalink, is it possible to add nextlink in the response as it will be useful to handle below failure scenario.

Considering a scenario where mailbox has lets say 2 Million mails, using stream we will be able to pull all mails including deltalink(required for incremental messages).
If somehow the connection gets broken in between, lets say after pulling 1 Million mails then agent doesn't have the information on what data it has received and I think in this situation agent has to make the request again to pull all the data, including mails which it has already received.

Adding nextlink in the response to the agent will enable the agent to initiate request only for remaining mails(which it doesn't received yet) using nextlink.

@davidmoten
Copy link
Owner

davidmoten commented Sep 4, 2020

Ha, getting complicated for you! To solve that problem I would call nextDelta more often. You get a failure you have fewer messages to repeat processing for. The other reason that you should call nextDelta more often is that the delta tokens themselves have limited lifetimes, they expire!

Are you trying to guarantee processing of every email? If so then I doubt using deltas is an appropriate method especially as deltaTokens expire. At my work we guarantee processing of emails by pulling down unread emails from the mailbox and only marking them as read once they have been persisted to a queue for processing locally. There appears to be an index on the read/unread status because performance is still good for this request even though the mailbox has grown a lot (in our case 2000 emails a day). Good luck with getting O365 to scale to millions of messages per day in one mailbox, have you tested this? What's the plan for removing messages from the mailbox? If you are doing that anyway why don't you just stream all messages from the mailbox and delete them when they are processed successfully?

@madanbisht madanbisht reopened this Sep 4, 2020
@madanbisht
Copy link
Author

madanbisht commented Sep 4, 2020

Hi David,
Actually we are not assuming millions of messages per day in one mailbox, in fact its impossible and below is the use case I was talking about.

The intent of our application is to retrieve mails from an account mailbox and store it somewhere in a disk, so that the mails can be restored back into the mailbox whenever required. Considering a scenario where an account is 5 to 10 years old, the account can have millions of messages and our application need pull all these messages.

Once all mails for an account retrieved successfully, application will use deltalink to pull incremental mails in future. If the next pull(incremental) happens after 1 year from the last pull(very first pull) then again the account can have Millions of new mails.

@madanbisht
Copy link
Author

One more concern please correct, as you said that deltaToken has limited lifetimes then I think deltaLink will not be applicable after a week or month.

@davidmoten
Copy link
Owner

There's not much out there but here's something that talks about delta links expiring:

https://stackoverflow.com/questions/51933002/syncstatenotfound-error-how-to-fix-or-avoid

and this says that they expire within 7 days:

http://www.msfttoday.com/duration-of-change-tracking-tokens-for-identity-and-education-resources/

@madanbisht
Copy link
Author

madanbisht commented Sep 7, 2020

Thank you David for sharing it, it has an impact in our use case.

Also your suggestions are appreciated on the above use case, i.e. error handling(due to connection error) for those scenario where application uses stream to process Millions of messages for an account(which is 5 to 10 years old) .

@davidmoten
Copy link
Owner

The intent of our application is to retrieve mails from an account mailbox and store it somewhere in a disk, so that the mails can be restored back into the mailbox whenever required. Considering a scenario where an account is 5 to 10 years old, the account can have millions of messages and our application need pull all these messages.

Use raw SMTP format as I've already commented, not Message json. That way you retain everything about the email including all attachments no matter how big and the SMTP headers. You'll need to confirm the practicality of this for restoring an email to an account.

Once all mails for an account retrieved successfully, application will use deltalink to pull incremental mails in future. If the next pull(incremental) happens after 1 year from the last pull(very first pull) then again the account can have Millions of new mails.

Solution for your error handling scenario is to pull more often to reduce the stream size and to account for expiry of deltaLink and handle duplicates sensibly. Handling of duplication is an inevitability, make sure you account for it. Microsoft suggests using a webhook for notification of changes so you don't have to poll for them. Worth looking at too.

@davidmoten davidmoten added the enhancement New feature or request label Sep 8, 2020
@madanbisht
Copy link
Author

Thanks David for your kind support and understanding.

Also we are interested on streamming SMTP message from Graph api using odata-client-msgraph. If required, we will create a new thread for it.

@davidmoten
Copy link
Owner

Re SMTP MIME format, see https://github.com/davidmoten/odata-client#download-an-email-in-smtp-mime-format

davidmoten added a commit that referenced this issue Sep 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants