Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication supervisor #31

Merged
merged 16 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ walex-*.tar
.elixir_ls/

# VSCode
.vscode/
.vscode/

# .DS_Store files from macOS
.DS_Store
**/.DS_Store
21 changes: 16 additions & 5 deletions lib/walex/replication/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule WalEx.Replication.Publisher do
"""
use GenServer

alias WalEx.{Changes, Destinations, Events, Types}
alias WalEx.{Changes, Config, Destinations, Events, Types}
alias WalEx.Decoder.Messages

defmodule(State,
Expand All @@ -18,16 +18,27 @@ defmodule WalEx.Replication.Publisher do

defstruct [:relations]

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
def start_link(opts) do
name =
opts
|> Keyword.get(:app_name)
|> registry_name

GenServer.start_link(__MODULE__, opts, name: name)
end

def process_message(message, app_name) do
GenServer.cast(__MODULE__, %{message: message, app_name: app_name})
name = registry_name(app_name)

GenServer.cast(name, %{message: message, app_name: app_name})
end

defp registry_name(app_name) do
Config.Registry.set_name(:set_gen_server, __MODULE__, app_name)
end

@impl true
def init(_) do
def init(_opts) do
Process.flag(:message_queue_data, :off_heap)

{:ok, %State{}}
Expand Down
5 changes: 0 additions & 5 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule WalEx.Replication.Server do
@moduledoc """
This module is responsible for setting up the replication connection
"""

use Postgrex.ReplicationConnection

alias WalEx.Config.Registry, as: WalExRegistry
Expand Down Expand Up @@ -35,10 +34,6 @@ defmodule WalEx.Replication.Server do
def init(opts) do
app_name = Keyword.get(opts, :app_name)

if is_nil(Process.whereis(Publisher)) do
{:ok, _pid} = Publisher.start_link([])
end

{:ok, %{step: :disconnected, app_name: app_name}}
end

Expand Down
7 changes: 5 additions & 2 deletions lib/walex/replication/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule WalEx.Replication.Supervisor do

use Supervisor

alias WalEx.Replication.Server
alias WalEx.Replication.{Publisher, Server}

def start_link(opts) do
app_name = Keyword.get(opts, :app_name)
Expand All @@ -19,7 +19,10 @@ defmodule WalEx.Replication.Supervisor do
|> Keyword.get(:configs)
|> Keyword.get(:app_name)

children = [{Server, app_name: app_name}]
children = [
{Publisher, app_name: app_name},
{Server, app_name: app_name}
]

Supervisor.init(children, strategy: :one_for_all)
end
Expand Down
6 changes: 5 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule WalEx.MixProject do
aliases: aliases(),
name: "WalEx",
source_url: "https://github.com/cpursley/walex",
test_coverage: [tool: ExCoveralls]
test_coverage: [tool: ExCoveralls],
elixirc_paths: elixirc_paths(Mix.env())
]
end

Expand Down Expand Up @@ -72,4 +73,7 @@ defmodule WalEx.MixProject do
]
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]
end
12 changes: 12 additions & 0 deletions test/support/test_helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule WalEx.Support.TestHelpers do
def find_worker_pid(supervisor_pid, child_module) do
supervisor_pid
|> Supervisor.which_children()
|> find_pid(child_module)
end

defp find_pid(children, module_name) do
{_, pid, _, _} = Enum.find(children, fn {module, _, _, _} -> module == module_name end)
pid
end
end
68 changes: 25 additions & 43 deletions test/walex/config/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,29 @@ defmodule WalEx.ConfigTest do
alias WalEx.Config
alias Config.Registry, as: WalExRegistry

@base_configs [
name: :test_name,
hostname: "hostname",
username: "username",
password: "password",
database: "database",
port: 5432,
subscriptions: ["subscriptions"],
publication: "publication",
modules: [MyApp.CustomModule],
ssl: false,
ssl_opts: [verify: :verify_none]
]

setup_all do
assert {:ok, pid} = WalExRegistry.start_registry()
assert is_pid(pid)
{:ok, _pid} = WalExRegistry.start_registry()
:timer.sleep(1000)
:ok
end

describe "start_link/2" do
test "should start a process" do
assert {:ok, pid} = Config.start_link(configs: get_base_configs())
{:ok, pid} = Config.start_link(configs: @base_configs)
assert is_pid(pid)
end

Expand All @@ -26,7 +39,7 @@ defmodule WalEx.ConfigTest do
modules: ["modules"]
]

assert {:ok, pid} = Config.start_link(configs: configs)
{:ok, pid} = Config.start_link(configs: configs)
assert is_pid(pid)

assert [
Expand All @@ -52,8 +65,7 @@ defmodule WalEx.ConfigTest do

describe "get_configs/" do
setup do
assert {:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -79,8 +91,7 @@ defmodule WalEx.ConfigTest do

describe "get_configs/2" do
setup do
assert {:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -100,7 +111,7 @@ defmodule WalEx.ConfigTest do

test "should filter configs by process name" do
configs =
get_base_configs()
@base_configs
|> Keyword.replace(:name, :other_name)
|> Keyword.replace(:database, "other_database")

Expand All @@ -127,8 +138,7 @@ defmodule WalEx.ConfigTest do

describe "add_config/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -152,8 +162,7 @@ defmodule WalEx.ConfigTest do

describe "remove_config/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -175,8 +184,7 @@ defmodule WalEx.ConfigTest do

describe "replace_config/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -191,8 +199,7 @@ defmodule WalEx.ConfigTest do

describe "build_module_names/3" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand Down Expand Up @@ -220,8 +227,7 @@ defmodule WalEx.ConfigTest do

describe "to_module_name/1" do
setup do
{:ok, pid} = Config.start_link(configs: get_base_configs())
assert is_pid(pid)
{:ok, _pid} = Config.start_link(configs: @base_configs)
:ok
end

Expand All @@ -237,28 +243,4 @@ defmodule WalEx.ConfigTest do
assert "TestName" == Config.to_module_name(:"Elixir.TestName")
end
end

defp get_base_configs(keys \\ []) do
configs = [
name: :test_name,
hostname: "hostname",
username: "username",
password: "password",
database: "database",
port: 5432,
subscriptions: ["subscriptions"],
publication: "publication",
modules: [MyApp.CustomModule],
ssl: false,
ssl_opts: [verify: :verify_none]
]

case keys do
[] ->
configs

_keys ->
Keyword.take(configs, keys)
end
end
end
13 changes: 3 additions & 10 deletions test/walex/config/registry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule WalEx.Config.RegistryTest do

describe "set_name/3" do
setup do
assert {:ok, _pid} = WalExRegistry.start_registry()
{:ok, _pid} = WalExRegistry.start_registry()
:ok
end

Expand All @@ -35,21 +35,14 @@ defmodule WalEx.Config.RegistryTest do

describe "get_state/3" do
setup do
assert {:ok, _pid} = WalExRegistry.start_registry()
{:ok, _pid} = WalExRegistry.start_registry()
:ok
end

test "should set agent state" do
name = WalExRegistry.set_name(:set_agent, __MODULE__, :app_name_test)

configs = [
my_atom: :test_config,
my_number: 99,
my_string: "test config",
my_map: %{john: :doe},
my_list: [1, 9, 9, 4]
]

configs = []
Agent.start_link(fn -> configs end, name: name)

assert configs == WalExRegistry.get_state(:get_agent, __MODULE__, :app_name_test)
Expand Down
35 changes: 16 additions & 19 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,34 @@ defmodule WalEx.DatabaseTest do
@password "postgres"
@database "todos_test"

@base_configs [
name: :todos,
hostname: @hostname,
username: @username,
password: @password,
database: @database,
port: 5432,
subscriptions: ["user", "todo"],
publication: "events"
]

describe "logical replication" do
setup do
{:ok, pid} = start_database()
{:ok, database_pid} = start_database()

%{pid: pid}
%{database_pid: database_pid}
end

test "should have logical replication set up", %{pid: pid} do
test "should have logical replication set up", %{database_pid: pid} do
show_wall_level = "SHOW wal_level;"

assert is_pid(pid)
assert [%{"wal_level" => "logical"}] == query(pid, show_wall_level)
end

test "should start replication slot", %{pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(get_configs())
test "should start replication slot", %{database_pid: database_pid} do
assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs)
assert is_pid(replication_pid)
assert is_pid(database_pid)

pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"

Expand All @@ -38,19 +48,6 @@ defmodule WalEx.DatabaseTest do
end
end

def get_configs do
[
name: :todos,
hostname: @hostname,
username: @username,
password: @password,
database: @database,
port: 5432,
subscriptions: ["user", "todo"],
publication: "events"
]
end

def start_database do
Postgrex.start_link(
hostname: @hostname,
Expand Down
1 change: 0 additions & 1 deletion test/walex/decoder/decoder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ defmodule WalEx.DecoderTest do
]
}

# Adding assertion for "numeric" types, which was missing from the original implementation
assert WalEx.Decoder.decode_message(
<<82, 0, 0, 71, 92, 112, 117, 98, 108, 105, 99, 0, 116, 101, 109, 112, 0, 100, 0, 1,
0, 116, 101, 115, 116, 0, 0, 0, 6, 164, 255, 255, 255, 255>>
Expand Down
Loading
Loading