Skip to content

Commit

Permalink
chore(electric): Cleanup satellite- and postgres-connector-related co…
Browse files Browse the repository at this point in the history
…de (#1144)

Following on from #1044,
these are a bunch of refactorings that didn't logically fit in any other
PR.
  • Loading branch information
alco committed Apr 18, 2024
1 parent 99ab7bb commit 44533ce
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
defmodule Electric.Replication.PostgresConnectorSup do
use Supervisor
require Logger

alias Electric.Replication.Connectors
alias Electric.Replication.Postgres
alias Electric.Postgres.Extension.SchemaCache
alias Electric.Postgres.{CachedWal, Proxy}
alias Electric.Replication.SatelliteCollectorProducer

require Logger

@spec start_link(Connectors.config()) :: :ignore | {:error, any} | {:ok, pid}
def start_link(connector_config) do
Supervisor.start_link(__MODULE__, connector_config)
origin = Connectors.origin(connector_config)
Supervisor.start_link(__MODULE__, connector_config, name: name(origin))
end

@spec name(Connectors.origin()) :: Electric.reg_name()
Expand All @@ -21,44 +23,35 @@ defmodule Electric.Replication.PostgresConnectorSup do
@impl Supervisor
def init(connector_config) do
origin = Connectors.origin(connector_config)
name = name(origin)
Electric.reg(name)

postgres_producer = Postgres.LogicalReplicationProducer.name(origin)
postgres_producer_consumer = Postgres.MigrationConsumer.name(origin)
logical_replication_producer = Postgres.LogicalReplicationProducer.name(origin)
migration_consumer = Postgres.MigrationConsumer.name(origin)

write_to_pg_mode = Connectors.write_to_pg_mode(connector_config)

migration_consumer_opts = [
producer: postgres_producer,
producer: logical_replication_producer,
refresh_subscription: write_to_pg_mode == :logical_replication
]

writer_config = [conn_config: connector_config, producer: SatelliteCollectorProducer.name()]
writer_module_opts = [
conn_config: connector_config,
producer: SatelliteCollectorProducer.name(origin)
]

children = [
%{
id: :postgres_schema_cache,
start: {SchemaCache, :start_link, [connector_config]}
},
{SatelliteCollectorProducer,
name: SatelliteCollectorProducer.name(), write_to_pg_mode: write_to_pg_mode},
%{
id: :postgres_producer,
start: {Postgres.LogicalReplicationProducer, :start_link, [connector_config]}
},
%{
id: :postgres_migration_consumer,
start:
{Postgres.MigrationConsumer, :start_link, [connector_config, migration_consumer_opts]}
},
{SchemaCache, connector_config},
{SatelliteCollectorProducer, connector_config},
{Postgres.LogicalReplicationProducer, connector_config},
{Postgres.MigrationConsumer, {connector_config, migration_consumer_opts}},
if write_to_pg_mode == :logical_replication do
{Postgres.SlotServer, writer_config}
{Postgres.SlotServer, writer_module_opts}
else
{Postgres.Writer, writer_config}
{Postgres.Writer, writer_module_opts}
end,
{CachedWal.EtsBacked,
origin: origin,
subscribe_to: [{postgres_producer_consumer, []}],
subscribe_to: [{migration_consumer, []}],
wal_window_size: Connectors.get_wal_window_opts(connector_config).in_memory_size},
{Proxy, connector_config: connector_config}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ defmodule Electric.Replication.SatelliteCollectorProducer do

alias Electric.Postgres.Extension
alias Electric.Replication.Changes.NewRecord
alias Electric.Replication.Connectors

require Logger

def start_link(opts) do
GenStage.start_link(__MODULE__, opts, Keyword.take(opts, [:name]))
def start_link(connector_config) do
origin = Connectors.origin(connector_config)
GenStage.start_link(__MODULE__, connector_config, name: name(origin))
end

def name(identifier \\ :default) do
{:via, :gproc, {:n, :l, {__MODULE__, identifier}}}
def name(origin) do
Electric.name(__MODULE__, origin)
end

def store_incoming_transactions(_, []), do: :ok
Expand All @@ -32,16 +34,16 @@ defmodule Electric.Replication.SatelliteCollectorProducer do
# Internal API

@impl GenStage
def init(opts) do
table = ETS.Set.new!(ordered: true, keypos: 2)
def init(connector_config) do
table = :ets.new(nil, [:ordered_set, keypos: 2])

{:producer,
%{
table: table,
next_key: 0,
demand: 0,
starting_from: -1,
write_to_pg_mode: Keyword.get(opts, :write_to_pg_mode, :logical_replication)
write_to_pg_mode: Connectors.write_to_pg_mode(connector_config)
}}
end

Expand All @@ -52,9 +54,9 @@ defmodule Electric.Replication.SatelliteCollectorProducer do
|> maybe_update_acked_client_lsns(state.write_to_pg_mode)
|> Stream.with_index(state.next_key)
|> Enum.to_list()
|> then(&ETS.Set.put(state.table, &1))
|> then(&:ets.insert(state.table, &1))

next_key = ETS.Set.last!(state.table) + 1
next_key = :ets.last(state.table) + 1

{:noreply, events, state} = send_events_from_ets(%{state | next_key: next_key})

Expand Down Expand Up @@ -87,16 +89,16 @@ defmodule Electric.Replication.SatelliteCollectorProducer do

@impl GenStage
def handle_info({:sent_all_up_to, key}, state) do
ETS.Set.select_delete!(state.table, [{{:_, :"$1"}, [{:"=<", :"$1", key}], [true]}])
:ets.select_delete(state.table, [{{:_, :"$1"}, [{:"=<", :"$1", key}], [true]}])

{:noreply, [], state}
end

defp send_events_from_ets(%{demand: 0} = state), do: {:noreply, [], state}

defp send_events_from_ets(%{demand: demand, table: set, starting_from: from} = state) do
defp send_events_from_ets(%{demand: demand, table: table, starting_from: from} = state) do
results =
case ETS.Set.select!(set, [{{:"$1", :"$2"}, [{:>, :"$2", from}], [:"$$"]}], demand) do
case :ets.select(table, [{{:"$1", :"$2"}, [{:>, :"$2", from}], [:"$$"]}], demand) do
:"$end_of_table" -> []
{results, _continuation} -> results
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Electric.Replication.SatelliteConnector do
{SatelliteCollectorConsumer,
name: SatelliteCollectorConsumer.name(name),
subscribe_to: [{producer, cancel: :temporary}],
push_to: SatelliteCollectorProducer.name()},
push_to: SatelliteCollectorProducer.name(origin)},
{Electric.Postgres.CachedWal.Producer,
name: Electric.Postgres.CachedWal.Producer.name(name), origin: origin}
]
Expand Down
20 changes: 6 additions & 14 deletions components/electric/lib/electric/satellite/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ defmodule Electric.Satellite.Protocol do
alias Electric.Postgres.Schema
alias Electric.Postgres.CachedWal
alias Electric.Replication.Changes
alias Electric.Replication.Connectors
alias Electric.Replication.Shapes
alias Electric.Replication.Shapes.ShapeRequest
alias Electric.Satellite.Serialization
Expand Down Expand Up @@ -53,11 +52,9 @@ defmodule Electric.Satellite.Protocol do
# succeed as well
reg_name = Electric.Satellite.WebsocketServer.reg_name(client_id)

origin = Connectors.origin(state.connector_config)

with {:ok, auth} <- Electric.Satellite.Auth.validate_token(token, state.auth_provider),
true <- Electric.safe_reg(reg_name, 1000),
:ok <- ClientManager.register_client(client_id, reg_name, origin) do
:ok <- ClientManager.register_client(client_id, reg_name, state.origin) do
Logger.metadata(user_id: auth.user_id)
Logger.info("Successfully authenticated the client")
Metrics.satellite_connection_event(%{authorized_connection: 1})
Expand Down Expand Up @@ -227,7 +224,7 @@ defmodule Electric.Satellite.Protocol do
}, state}

true ->
case Shapes.validate_requests(requests, Connectors.origin(state.connector_config)) do
case Shapes.validate_requests(requests, state.origin) do
{:ok, requests} ->
query_subscription_data(id, requests, state)

Expand Down Expand Up @@ -400,10 +397,7 @@ defmodule Electric.Satellite.Protocol do
{incomplete, complete} ->
complete = Enum.reverse(complete)

case WriteValidation.validate_transactions!(
complete,
{SchemaCache, Connectors.origin(state.connector_config)}
) do
case WriteValidation.validate_transactions!(complete, {SchemaCache, state.origin}) do
{:ok, accepted} ->
{nil, send_transactions(accepted, incomplete, state)}

Expand Down Expand Up @@ -458,7 +452,7 @@ defmodule Electric.Satellite.Protocol do
including_data: msg.additional_data_source_ids,
including_subscriptions: msg.subscription_ids,
cached_wal_impl: CachedWal.EtsBacked,
origin: Connectors.origin(state.connector_config),
origin: state.origin,
advance_graph_using: {&advance_graph_by_tx/4, [state.auth.user_id]}
)

Expand Down Expand Up @@ -528,9 +522,7 @@ defmodule Electric.Satellite.Protocol do
end

defp handle_start_replication_request(msg, lsn, state) do
origin = Connectors.origin(state.connector_config)

if CachedWal.Api.lsn_in_cached_window?(origin, lsn) do
if CachedWal.Api.lsn_in_cached_window?(state.origin, lsn) do
case restore_client_state(msg.subscription_ids, msg.observed_transaction_data, lsn, state) do
{:ok, state} ->
state =
Expand Down Expand Up @@ -1147,7 +1139,7 @@ defmodule Electric.Satellite.Protocol do
including_data: observed_txn_data,
including_subscriptions: Map.keys(state.subscriptions),
cached_wal_impl: CachedWal.EtsBacked,
origin: Connectors.origin(state.connector_config),
origin: state.origin,
advance_graph_using: {&advance_graph_by_tx/4, [state.auth.user_id]}
)
|> case do
Expand Down
3 changes: 3 additions & 0 deletions components/electric/lib/electric/satellite/protocol/state.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Electric.Satellite.Protocol.State do
alias Electric.Replication.Connectors
alias Electric.Satellite.Protocol.InRep
alias Electric.Satellite.Protocol.OutRep
alias Electric.Satellite.Protocol.Telemetry
Expand All @@ -12,6 +13,7 @@ defmodule Electric.Satellite.Protocol.State do
out_rep: %OutRep{},
auth_provider: nil,
connector_config: [],
origin: "",
subscriptions: %{},
subscription_data_fun: nil,
move_in_data_fun: nil,
Expand All @@ -27,6 +29,7 @@ defmodule Electric.Satellite.Protocol.State do
out_rep: OutRep.t(),
auth_provider: Electric.Satellite.Auth.provider(),
connector_config: Keyword.t(),
origin: Connectors.origin(),
subscriptions: map(),
subscription_data_fun: fun(),
move_in_data_fun: fun(),
Expand Down
10 changes: 6 additions & 4 deletions components/electric/lib/electric/satellite/ws_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Electric.Satellite.WebsocketServer do

alias Electric.Utils
alias Electric.Postgres.CachedWal
alias Electric.Replication.Connectors
alias Electric.Replication.InitialSync
alias Electric.Satellite.Protocol
alias Electric.Satellite.Protocol.State
Expand All @@ -51,11 +52,14 @@ defmodule Electric.Satellite.WebsocketServer do

@impl WebSock
def init(opts) do
connector_config = Keyword.fetch!(opts, :connector_config)

{:ok,
schedule_ping(%State{
last_msg_time: :erlang.timestamp(),
auth_provider: Keyword.fetch!(opts, :auth_provider),
connector_config: Keyword.fetch!(opts, :connector_config),
connector_config: connector_config,
origin: Connectors.origin(connector_config),
subscription_data_fun: Keyword.fetch!(opts, :subscription_data_fun),
move_in_data_fun: Keyword.fetch!(opts, :move_in_data_fun),
out_rep: %OutRep{allowed_unacked_txs: Keyword.get(opts, :allowed_unacked_txs, 30)},
Expand Down Expand Up @@ -199,9 +203,7 @@ defmodule Electric.Satellite.WebsocketServer do
# While processing the SatInStartReplicationReq message, Protocol has determined that a new
# client has connected which needs to perform the initial sync of migrations and the current database state before
# subscribing to the replication stream.
def handle_info({:perform_initial_sync_and_subscribe, msg}, %State{} = state) do
origin = Electric.Replication.Connectors.origin(state.connector_config)

def handle_info({:perform_initial_sync_and_subscribe, msg}, %State{origin: origin} = state) do
# Fetch the latest observed LSN from the cached WAL. We have to do it before fetching migrations.
#
# If we were to do it the other way around, we could miss a migration that is committed right after the call to
Expand Down
1 change: 0 additions & 1 deletion components/electric/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ defmodule Electric.MixProject do
{:telemetry_poller, "~> 1.0"},
{:telemetry_metrics, "~> 0.6"},
{:joken, "~> 2.6"},
{:ets, "~> 0.9.0"},
{:libgraph, "~> 0.16.0"},
{:pathex, "~> 2.5.2"},
{:stream_data, "~> 0.5", only: [:dev, :test]},
Expand Down
1 change: 0 additions & 1 deletion components/electric/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"elixir_make": {:hex, :elixir_make, "0.7.8", "505026f266552ee5aabca0b9f9c229cbb496c689537c9f922f3eb5431157efc7", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "7a71945b913d37ea89b06966e1342c85cfe549b15e6d6d081e8081c493062c07"},
"epgsql": {:hex, :epgsql, "4.7.1", "d4e47cae46c18c8afa88e34d59a9b4bae16368d7ce1eb3da24fa755eb28393eb", [:rebar3], [], "hexpm", "b6d86b7dc42c8555b1d4e20880e5099d6d6d053148000e188e548f98e4e01836"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ets": {:hex, :ets, "0.9.0", "79c6a6c205436780486f72d84230c6cba2f8a9920456750ddd1e47389107d5fd", [:mix], [], "hexpm", "2861fdfb04bcaeff370f1a5904eec864f0a56dcfebe5921ea9aadf2a481c822b"},
"excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"},
"expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"},
"exqlite": {:hex, :exqlite, "0.19.0", "0f3ee29e35bed38552dd0ed59600aa81c78f867f5b5ff0e17d330148e0465483", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "55a8fbb0443f03d4a256e3458bd1203eff5037a6624b76460eaaa9080f462b06"},
Expand Down
Loading

0 comments on commit 44533ce

Please sign in to comment.