-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(electric): Use two replications slots to allow clients to resume replication after Electric restarts #1043
Conversation
79ee783
to
f6de6d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, exactly what I had in mind myself, and additional thanks for cleaning and testing the LSN module. One big question from me
# TODO: make sure we're not removing transactions that are about to be requested by a newly | ||
# connected client. See VAX-1552. | ||
# | ||
# TODO(optimization): do not run this after every consumed transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I have 2 questions here:
- Why are we running this whenever we see a
Commit
? We haveack_fn
set toack
being the previous function - I believe it makes more sense to advance the slot as we do right now - once stored in memory at least (although I'll agree that it matters a bit less given that we're advancing a secondary slot here). - I see the "optimization" note you've left here, but I'm not sure I'm comfortable leaving this as a TODO. Advancing a slot on every transaction is fine-ish, however we're opening a new fully separate connection to PG on every received transaction. This is going to be extremely slow and costly for us, and PG won't be able to keep up with the amount of connections we're opening for one-shot queries. I'd be against even a pooling approach here - I believe we need to just open and hold a stable connection to PG to perform these tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimization note wasn't meant to be left in the code for long. I split the code into multiple PRs to help myself draw boundaries between changes to different components and to get to a working version faster to then be able to focus on the edge cases and inefficiencies.
So, point taken. Holding a persistent connection might be the way to go if the current approach of advancing the slot often remains in place. Though I'm hoping to implement more of a batched approach, e.g. advancing every N transactions or in size-on-disk increments, at which point relying on a connection pool may be sufficient. I'll circle back to this discussion when I have more concrete ideas to share but I'm expecting those to be implemented in a follow-up PR in any case.
N.B.: I had a connection pool implemented as part of the early implementation work but had to take that out for now just to keep the scope of changes that are already spread over multiple PRs manageable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With or without batching, holding a single open connection just for slot advancement is fine, since only one process is likely to ever do logical replication stream consumption. I think just spawn a sibling GenServer that holds the advancing connection - to split the responsibility a little - and just cast
to that server on ack
- this makes the whole approach easy and doesn't clog up logical replication consumer with ack message handling since cast
s will be sent to a different process. And if that genserver goes down and we lose a couple casts in the middle - no problem, worst case we'll have slightly more WAL stored at a point in time, but it'll resolve on any next cast.
I'm not sure how you want to structure the PR merge sequence, but I would insist on adding this functionality in this PR. It's not much code, but the difference in perfomance implications is huge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
9a369aa
to
a8e903e
Compare
a8e903e
to
bce37fa
Compare
end | ||
|
||
defimpl List.Chars do | ||
def to_charlist(lsn), do: ~c'#{Electric.Postgres.Lsn.to_iolist(lsn)}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: do you need to wrap this with the ~c
sigil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mix formatter has been adding those since Elixir v1.15.0:
[Code] Code.format_string!/2 now converts 'charlists' into ~c"charlists" by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super
origin = state.origin | ||
|
||
%Transaction{ | ||
transaction | ||
| lsn: end_lsn, | ||
# Make sure not to pass state.field into ack function, as this | ||
# will create a copy of the whole state in memory when sending a message | ||
ack_fn: fn -> ack(conn, origin, end_lsn) end | ||
ack_fn: fn -> ack(repl_conn, origin, end_lsn) end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this ack
should not operate on repl_conn
.
ack_fn
function is going to be called in a different process later down the line - in particular, in WAL cache consumer stage. This means that if this repl connection is closed for any reason, old transcations will call it with a stale value and probably crash - I don't know how :epgsql.standby_status_update/3
acts when it's passed a nonexistent pid.
To decouple that a little and make sure that the process calling the ack_fn
doesn't crash, I'd rather ack_fn
sent a GenServer.cast/2
to a process that would actually hold the repl conn and do the acknowledge. If the cast fails because the holding GenServer is temporarily dead -- no problem, main code continues to operate, and worst case we acknowledge on the next magic write anyway. So I'd rather put this "advancing" repl conn into a separate GenServer that can restart and reconnect separately, and just GenServer.cast
into it - maybe make it a :gproc
via-tuple using the origin so that we don't even care where the repl process currently lives.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we've discussed elsewhere, it is fine to acknowledge transactions inside LogicalReplicationProducer and rely on Elixir's supervision tree to restart things if anything fails. Since we now have a replication slot with a persistent starting point, we can replay already-seen transactions when a new replication connection opens.
8141723
to
e9fcd3d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice design, I like it! a couple of nitpicks, but nothing more. Let's coordinate a release before merging this though
|> Stream.map(fn %Transaction{lsn: lsn} = tx -> | ||
{lsn_to_position(lsn), %{tx | ack_fn: nil}} | ||
end) | ||
|> Stream.map(fn %Transaction{} = tx -> {lsn_to_position(tx.lsn), tx} end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: I don't understand why you don't like destructuring assignment of lsn
, given that it's faster than dynamic dispatch of a .
operator. Doesn't really matter in this case, just caught my eye.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that I don't like it. As a rule of thumb, I use pattern matching in function heads when the same field is used multiple times in the function body. Because we have instances in the code where some fields are pattern-matched while others are accessed via the dot operator, the split usually looks arbitrary.
I believe that BEAM is smart enough to optimize uses of the dot operator if there's pattern-matching/guard in the function head. Can't find the source now, but here's at least a partial confirmation of that https://elixirforum.com/t/way-too-much-detail-about-matching-in-the-head-vs-accessing-maps-in-the-body/49167/2
|
||
defp emit_events(state, events) do | ||
{:noreply, Enum.reverse(events), state} | ||
state = %{state | queue: queue_remaining, queue_Len: queue_len - demand, demand: 0} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo that should lead to a runtime error, possibly means that this code path is not covered by test (although this is caught by Dialyzer)
state = %{state | queue: queue_remaining, queue_Len: queue_len - demand, demand: 0} | |
state = %{state | queue: queue_remaining, queue_len: queue_len - demand, demand: 0} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had to stop working on this abruptly yesterday and pushed what I had ready by then. This condition requires a specific test setup to cover. I think having dialyzer complain about it is good enough.
fba560a
to
b2cc4d5
Compare
Now that we're using two replication slots, a magic write is no longer sufficient for releasing obsolete WAL records. We have to advance the main slot's starting point instead.
With a persistent main slot now in use, it is possible that Electric consumes the same transaction it has already seen before, after it restarts.
Control flow is simplified and it's easier to see opportunities for batching in the refactored code.
No need to spread the acknowledgement logic across multiple processes. Once a transaction has been emitted to the gen stage consumer, it doesn't need to be replayed by Postgres until Electric restars.
b2cc4d5
to
314550e
Compare
…ig (#1044) Follow-up to #1043. This PR makes the Elixir process run by `EtsBacked` scoped under the connector origin, similar to how other Postgres connector processes behave. It also updates telemetry metrics to use more precise names for what the `EtsBacked` cache actually stores: instead of "cache count" it's "transaction count".
Continuing on from #1099, this PR introduces the concept of a "persistent replication slot". Now, instead of having an auto-advancing replication slot, Electric can keep around WAL records that contain transactions it has already seen and acknowledged previously. As a result, when Electric restarts, it can repopulate its in-memory cache of transactions and be able to resume clients' replication streams.
New configuration options
ELECTRIC_TXN_CACHE_SIZE
andELECTRIC_RESUMABLE_WAL_WINDOW
have been added. The relevant docs are updated in #1050.