Skip to content

Commit

Permalink
Skip already applied migrations in MigrationConsumer
Browse files Browse the repository at this point in the history
With a persistent main slot now in use, it is possible that Electric consumes
the same transaction it has already seen before, after it restarts.
  • Loading branch information
alco committed Apr 9, 2024
1 parent 9f0c7ac commit e9fcd3d
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do
change
end

defp process_migrations(transactions, state) do
defp process_migrations(transactions, %{loader: loader} = state) do
{:ok, %{version: schema_version}} = SchemaLoader.load(loader)

{state, num_applied_migrations} =
transactions
|> Enum.flat_map(&transaction_changes_to_migrations(&1, state))
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> transactions_to_migrations(state)
|> skip_applied_migrations(schema_version)
|> Enum.reduce({state, 0}, fn migration, {state, num_applied} ->
{perform_migration(migration, state), num_applied + 1}
end)
Expand All @@ -139,6 +141,12 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do
end
end

defp transactions_to_migrations(transactions, state) do
transactions
|> Enum.flat_map(&transaction_changes_to_migrations(&1, state))
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
end

defp transaction_changes_to_migrations(%Transaction{changes: changes}, state) do
for %NewRecord{record: record, relation: relation} <- changes, is_ddl_relation(relation) do
{:ok, version} = SchemaLoader.tx_version(state.loader, record)
Expand All @@ -147,6 +155,10 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do
end
end

defp skip_applied_migrations(migrations, schema_version) do
Enum.drop_while(migrations, fn {version, _stmts} -> version <= schema_version end)
end

defp perform_migration({version, stmts}, state) do
{:ok, loader, schema_version} = apply_migration(version, stmts, state)

Expand Down

0 comments on commit e9fcd3d

Please sign in to comment.