Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prestwich/super pending #566

Merged
merged 15 commits into from Nov 12, 2021
Merged

Prestwich/super pending #566

merged 15 commits into from Nov 12, 2021

Conversation

prestwich
Copy link
Collaborator

@prestwich prestwich commented Nov 8, 2021

Motivation

Provide a reasonable and working alternative to gas escalator middleware.

Solution

  • Introduce a new Future EscalatingPending that broadcasts a series of transactions
    • functions similarly to PendingTransaction but with a simpler (but less informative) state machine
    • it returns the first receipt to confirm.
    • it also returns the first RPC error to occur
    • it allows users to configure polling and broadcast intervals
  • Add a middleware method send_escalating
    • allows users to instantiate with a TypedTransaction and an escalation policy
    • may be better as a middleware extension trait?

things I don't like

  • Invariant enforcement is done in the Middleware method rather than the constructor.
    • This is brittle
    • it's hard to see how to sign and nonce correctly otherwise
  • Naming

PR Checklist

  • Added Tests
  • Added Documentation
  • Updated the changelog

{
// then if we have a TX to broadcast, start
// broadcasting it
if let Some(next_to_broadcast) = this.txns.pop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we have already broadcast the most escalated tx, we will just wait on that one indefinitely?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

completed!(this, Ok(receipt));
}
// rewake until drained
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cx.waker().wake_by_ref() necessary? Won't executor automatically put the EscalatingPending future back in queue? Other paths here that return Poll::Pending don't call wake

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the goal is to explicitly notify the executor that this is ready to be polled again immediately. i.e. override the executor's scheduling and jump to the front of the queue as much as we have the ability to, as we know that there's no async operation happening to wait on

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this section looks like it could be wrapped in a loop over all futures? So that Poll::Ready(Ok(None)) => merely results in a continue?
Is the order in which they should be polled here important, if not then Vec::swap_remove could be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it kinda is right now, it's just ensuring that it allows the executor to schedule each step of the loop and interleave other work

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping over all futures is an anti-pattern I think, the state machine may be slightly easier to write but as @prestwich says it's better to do it on a per-fut basis

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the CheckingReceipts state essentially a FuturesOrdered but we only poll one future at a time?

Copy link
Owner

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor qs, love the direction

ethers-providers/src/lib.rs Outdated Show resolved Hide resolved
ethers-providers/src/lib.rs Outdated Show resolved Hide resolved
ethers-providers/src/lib.rs Outdated Show resolved Hide resolved
r
})
.map(|req| async move {
self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really bad UX? Should we change it to something better?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a shortcut function for signing transactions might be good?

broadcast_checks!(cx, this, fut);
}
Sleeping(instant) => {
if instant.elapsed() > *this.polling_interval {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gakonst please check this pattern out, as I am unclear why we use Delay futures elsewhere. is there some significant behavior difference I'm missing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the futures_timer Delay type uses an additional global timerhandle thread that makes sure the delayed task gets woken up at the right time, however, futures_timer hasn't been updated in a while there's also tokio::time::Sleep now, which I guess works similarly but probably uses some Runtime Handle instead.

looking at the Sleeping state, there might be a case were the Future returns Pending and gets not woken up again (elpased < polling_interval)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::time::Sleep won't work in WASM iirc

I believe all common executors schedule futures for polling in frames regardless of whether the waker has explicitly requested waking

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't finish the previous thought before submitting the tx

given that this could cause issues with uncommon executors, should we audit all our futures to ensure that all polls either request rewaking or poll another future?

@gakonst gakonst marked this pull request as ready for review November 8, 2021 21:36
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this could even be two different types.
An additional Stream type that processes all transactions.
The EscalatingPending Future that polls the underlying stream and returns the first value?

completed!(this, Ok(receipt));
}
// rewake until drained
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this section looks like it could be wrapped in a loop over all futures? So that Poll::Ready(Ok(None)) => merely results in a continue?
Is the order in which they should be polled here important, if not then Vec::swap_remove could be useful.

@prestwich
Copy link
Collaborator Author

test failure is unrelated to these changes

@gakonst @mattsse do you think that this belongs on the main middleware or on some sort of extension trait?

@mattsse
Copy link
Collaborator

mattsse commented Nov 9, 2021

@gakonst @mattsse do you think that this belongs on the main middleware or on some sort of extension trait?

What, would be nice is if this could be somehow integrated with the current send_transaction -> PendingTx, so that we can do something like p.send_transaction(tx).escalate(||...).await but p.send_transaction(tx).await.await still works.

I believe this would be possible with an additional future type (SendTx or something?) that would have Output = PendingTransaction and a fn escalate. But this would mean, async fn send_transaction would become fn send_transaction() -> SendTx.

this would be a breaking change however.

The easiest way would probably be to add another provider function, as is the case now.

@prestwich
Copy link
Collaborator Author

the problem with that is that the signing must be done in the top-level middleware. It can't be done by the provider that the pending transaction has access to. So that feature is blocked by an object-safe Middleware trait

@mattsse
Copy link
Collaborator

mattsse commented Nov 9, 2021

ah I see, makes sense, this would then be limited to only the SignerMiddleware...

@prestwich
Copy link
Collaborator Author

Not exactly, as the raw provider may sign in many cases (using eth_sign or similar). However, relying on the provider would result in a conflict if any SignerMiddleware were used.

E.g SignerMiddleware( Provider ). The SignerMiddleware may be signing with address 0x1234... where the node controls address 0xabcd.... This would cause errors and other unexpected behavior if we choose to have signing in the Provider held by a PendingTransaction

Copy link
Owner

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good, I am OK with placing it as an extra function as-is. I also liked @mattsse's idea of adding an extra type on top of PendingTransaction so that we can do send_transaction(tx).await?.escalator(...).await? to escalate and send_transaction(tx).await?.await? for simple usage

}

pub fn broadcast_interval(mut self, duration: u64) -> Self {
self.broadcast_interval = Duration::from_secs(duration);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use milliseconds here, just for flexbility sake

Suggested change
self.broadcast_interval = Duration::from_secs(duration);
self.broadcast_interval = Duration::from_millis(duration);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to change it to use Into<Duration> to match the behavior of PendingTransaction

}

pub fn polling_interval(mut self, duration: u64) -> Self {
self.polling_interval = Duration::from_secs(duration);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.polling_interval = Duration::from_secs(duration);
self.polling_interval = Duration::from_millis(duration);

}

pub fn polling_interval(mut self, duration: u64) -> Self {
self.polling_interval = Duration::from_secs(duration);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.polling_interval = Duration::from_secs(duration);
self.polling_interval = Duration::from_millis(duration);

completed!(this, Ok(receipt));
}
// rewake until drained
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping over all futures is an anti-pattern I think, the state machine may be slightly easier to write but as @prestwich says it's better to do it on a per-fut basis

@prestwich
Copy link
Collaborator Author

latest commit uses FuturesUnordered. I think this is better as @mattsse says. We don't require ordering here, so this functions similarly to select_all, but is easier to manage

@prestwich
Copy link
Collaborator Author

I have no other outstanding changes so I'll update the changelog and this is ready for consideration

Copy link
Owner

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. Thank you for the hard work, we can iterate on it post-merge. @mattsse any further comments?

ethers-providers/src/pending_escalator.rs Outdated Show resolved Hide resolved
@mattsse
Copy link
Collaborator

mattsse commented Nov 11, 2021

I like it. Thank you for the hard work, we can iterate on it post-merge. @mattsse any further comments?

lgtm

@gakonst gakonst merged commit 203b2e8 into master Nov 12, 2021
@gakonst gakonst deleted the prestwich/super-pending branch November 12, 2021 01:23
@gakonst
Copy link
Owner

gakonst commented Nov 12, 2021

gg. Should we receive the escalator middleware?

meetmangukiya pushed a commit to meetmangukiya/ethers-rs that referenced this pull request Mar 21, 2022
meetmangukiya pushed a commit to meetmangukiya/ethers-rs that referenced this pull request Mar 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants