Skip to content

Commit

Permalink
Merge pull request #46 from cpursley/test-database-connection
Browse files Browse the repository at this point in the history
Test database connection
  • Loading branch information
cpursley committed Feb 15, 2024
2 parents a4ffe64 + 73e9842 commit bc06feb
Show file tree
Hide file tree
Showing 12 changed files with 427 additions and 79 deletions.
3 changes: 2 additions & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
erlang 26.1.2
elixir 1.15.7-otp-26
elixir 1.15.7-otp-26
rust 1.63.0
2 changes: 0 additions & 2 deletions lib/mix/tasks/walex.drop.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ defmodule Mix.Tasks.Walex.Drop do
@moduledoc """
Drops the database
"""

use Mix.Task

alias Mix.Tasks.Walex.Helpers

@test_database "todos_test"
Expand Down
17 changes: 7 additions & 10 deletions lib/mix/tasks/walex.setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ defmodule Mix.Tasks.Walex.Setup do
@moduledoc """
Creates, migrates and seeds the database
"""

use Mix.Task

alias Mix.Tasks.Walex.Helpers

@test_database "todos_test"
@base_configs [
hostname: "localhost",
username: "postgres",
password: "postgres",
database: @test_database
]

@shortdoc "Set up test database and tables"
def run(_) do
Expand All @@ -17,14 +21,7 @@ defmodule Mix.Tasks.Walex.Setup do

defp setup_test_database do
Helpers.create_database(@test_database)

{:ok, pid} =
Postgrex.start_link(
hostname: "localhost",
username: "postgres",
password: "postgres",
database: @test_database
)
{:ok, pid} = Postgrex.start_link(@base_configs)

create_database_logic(pid)
create_database_tables(pid)
Expand Down
1 change: 0 additions & 1 deletion lib/walex/config/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule WalEx.Config do
@moduledoc """
Configuration
"""

use Agent

alias WalEx.Config.Registry, as: WalExRegistry
Expand Down
13 changes: 6 additions & 7 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,21 @@ defmodule WalEx.Replication.Server do

@impl true
def handle_connect(state) do
temp_slot = "walex_temp_slot_" <> Integer.to_string(:rand.uniform(9_999))

query = "CREATE_REPLICATION_SLOT #{temp_slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
query =
"CREATE_REPLICATION_SLOT #{slot_name(state.app_name)} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"

{:query, query, %{state | step: :create_slot}}
end

@impl true
def handle_result([%Postgrex.Result{rows: rows} | _results], state = %{step: :create_slot}) do
slot_name = rows |> hd |> hd

def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do
publication =
state.app_name
|> WalEx.Config.get_configs([:publication])
|> Keyword.get(:publication)

query =
"START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')"
"START_REPLICATION SLOT #{slot_name(state.app_name)} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')"

{:stream, query, [], %{state | step: :streaming}}
end
Expand All @@ -83,4 +80,6 @@ defmodule WalEx.Replication.Server do

@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time, do: System.os_time(:microsecond) - @epoch

defp slot_name(app_name), do: to_string(app_name) <> "_walex"
end
14 changes: 12 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule WalEx.MixProject do
name: "WalEx",
source_url: "https://github.com/cpursley/walex",
test_coverage: [tool: ExCoveralls],
elixirc_paths: elixirc_paths(Mix.env())
elixirc_paths: elixirc_paths(Mix.env()),
compilers: compilers()
]
end

Expand Down Expand Up @@ -43,7 +44,8 @@ defmodule WalEx.MixProject do
{:ex_doc, "~> 0.31.1", only: :dev, runtime: false},
{:sobelow, "~> 0.12", only: [:dev, :test], runtime: false},
{:credo, "~> 1.7.3", only: [:dev, :test], runtime: false},
{:excoveralls, "~> 0.10", only: [:dev, :test], runtime: false}
{:excoveralls, "~> 0.10", only: [:dev, :test], runtime: false},
{:rambo, "~> 0.3.4", only: [:dev, :test], runtime: false}
]
end

Expand Down Expand Up @@ -76,4 +78,12 @@ defmodule WalEx.MixProject do

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]

defp compilers do
unless Mix.env() == :prod do
Mix.compilers() ++ [:rambo]
else
Mix.compilers()
end
end
end
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"},
"postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"rambo": {:hex, :rambo, "0.3.4", "8962ac3bd1a633ee9d0e8b44373c7913e3ce3d875b4151dcd060886092d2dce7", [:mix], [], "hexpm", "0cc54ed089fbbc84b65f4b8a774224ebfe60e5c80186fafc7910b3e379ad58f1"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"req": {:hex, :req, "0.4.8", "2b754a3925ddbf4ad78c56f30208ced6aefe111a7ea07fb56c23dccc13eb87ae", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "7146e51d52593bb7f20d00b5308a5d7d17d663d6e85cd071452b613a8277100c"},
"retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"},
Expand Down
61 changes: 60 additions & 1 deletion test/support/test_helpers.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
defmodule WalEx.Support.TestHelpers do
def find_worker_pid(supervisor_pid, child_module) do
require Logger

def tap_debug(to_tap, label) do
to_tap |> tap(&Logger.debug(label <> inspect(&1)))
end

def find_child_pid(supervisor_pid, child_module) do
supervisor_pid
|> Supervisor.which_children()
|> find_pid(child_module)
Expand All @@ -10,6 +16,59 @@ defmodule WalEx.Support.TestHelpers do
pid
end

def get_database_pid(supervisor_pid) do
find_child_pid(supervisor_pid, DBConnection.ConnectionPool)
end

def start_database(configs) do
case Postgrex.start_link(configs) do
{:ok, conn} ->
Logger.info("Database is running.")

{:ok, conn}

{:error, {:already_started, conn}} ->
Logger.info("Database is already running.")

{:ok, conn}

{:error, reason} ->
Logger.error("Error connecting to the database. Reason: #{inspect(reason)}")

{:error, reason}
end
end

def terminate_database_connection(database_pid, username) do
query =
"SELECT pg_terminate_backend(pg_backend_pid()) FROM pg_stat_activity WHERE usename = $1"

Postgrex.query(database_pid, query, [username])
end

def wait_for_restart do
Logger.debug("waiting")
:timer.sleep(3000)
Logger.debug("done waiting")
end

def query(pid, query) do
pid
|> Postgrex.query!(query, [])
|> map_rows_to_columns()
end

defp map_rows_to_columns(%Postgrex.Result{columns: columns, rows: rows}) do
Enum.map(rows, fn row -> Enum.zip(columns, row) |> Map.new() end)
end

def pg_replication_slots(database_pid) do
pg_replication_slots_query =
"SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"

query(database_pid, pg_replication_slots_query)
end

def update_user(database_pid) do
update_user = """
UPDATE \"user\" SET age = 30 WHERE id = 1
Expand Down

0 comments on commit bc06feb

Please sign in to comment.