From abcde982ae6028b2b1fdc5e38acb5e04009f4e96 Mon Sep 17 00:00:00 2001 From: Scott Hiett Date: Thu, 1 Sep 2022 14:58:23 +0100 Subject: [PATCH] Transaction support! --- config/config.exs | 6 +-- config/prod.exs | 2 +- config/runtime.exs | 6 +-- lib/srh.ex | 2 +- lib/srh/http/base_router.ex | 9 ++++ lib/srh/http/command_handler.ex | 87 +++++++++++++++++++++++++++++--- lib/srh/redis/client.ex | 21 ++++++++ lib/srh/redis/client_registry.ex | 80 ++++++++++++++++++++++------- 8 files changed, 182 insertions(+), 31 deletions(-) diff --git a/config/config.exs b/config/config.exs index a42a746..814e768 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,8 +1,8 @@ import Config config :srh, - mode: "file", - file_path: "srh-config/tokens.json", - port: 8080 + mode: "file", + file_path: "srh-config/tokens.json", + port: 8080 import_config "#{config_env()}.exs" diff --git a/config/prod.exs b/config/prod.exs index 07d15f5..f4ae0ee 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -1,4 +1,4 @@ import Config config :srh, - port: 80 + port: 80 diff --git a/config/runtime.exs b/config/runtime.exs index 5698c02..c2d7a0c 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -1,6 +1,6 @@ import Config config :srh, - mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file", - file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json", - port: Integer.parse(System.get_env("PORT") || "8080") + mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file", + file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json", + port: Integer.parse(System.get_env("PORT") || "8080") diff --git a/lib/srh.ex b/lib/srh.ex index 79072df..ce975ff 100644 --- a/lib/srh.ex +++ b/lib/srh.ex @@ -5,7 +5,7 @@ defmodule Srh do def start(_type, _args) do IO.puts("Using port #{@port}") - + children = [ Srh.Auth.TokenResolver, {GenRegistry, worker_module: Srh.Redis.Client}, diff --git a/lib/srh/http/base_router.ex b/lib/srh/http/base_router.ex index 0d21677..fd60838 100644 --- a/lib/srh/http/base_router.ex +++ b/lib/srh/http/base_router.ex @@ -23,6 +23,12 @@ defmodule Srh.Http.BaseRouter do |> handle_response(conn) end + post "/multi-exec" do + conn + |> handle_extract_auth(&CommandHandler.handle_command_transaction_array(conn, &1)) + |> handle_response(conn) + end + match _ do send_resp(conn, 404, "Endpoint not found") end @@ -51,6 +57,9 @@ defmodule Srh.Http.BaseRouter do {:malformed_data, message} -> %{code: 400, message: message, json: false} + {:redis_error, data} -> + %{code: 400, message: Jason.encode!(data), json: true} + {:not_authorized, message} -> %{code: 401, message: message, json: false} diff --git a/lib/srh/http/command_handler.ex b/lib/srh/http/command_handler.ex index e32c4cf..8c0cccc 100644 --- a/lib/srh/http/command_handler.ex +++ b/lib/srh/http/command_handler.ex @@ -24,6 +24,17 @@ defmodule Srh.Http.CommandHandler do end end + def handle_command_transaction_array(conn, token) do + # Transactions use the same body format as pipelines, so we can use the same validator + case RequestValidator.validate_pipeline_redis_body(conn.body_params) do + {:ok, array_of_command_arrays} -> + do_handle_command_transaction_array(array_of_command_arrays, token) + + {:error, error_message} -> + {:malformed_data, error_message} + end + end + defp do_handle_command(command_array, token) do case TokenResolver.resolve(token) do {:ok, connection_info} -> @@ -44,6 +55,16 @@ defmodule Srh.Http.CommandHandler do end end + defp do_handle_command_transaction_array(array_of_command_arrays, token) do + case TokenResolver.resolve(token) do + {:ok, connection_info} -> + dispatch_command_transaction_array(array_of_command_arrays, connection_info) + + {:error, msg} -> + {:not_authorized, msg} + end + end + defp dispatch_command_array(_arr, _connection_info, responses \\ []) defp dispatch_command_array([current | rest], connection_info, responses) do @@ -52,9 +73,8 @@ defmodule Srh.Http.CommandHandler do {:ok, result_map} -> [result_map | responses] - {:malformed_data, result_json} -> - # TODO: change up the chain to json this at the last moment, so this isn't here - [Jason.decode!(result_json) | responses] + {:redis_error, result} -> + [result | responses] end dispatch_command_array(rest, connection_info, updated_responses) @@ -65,6 +85,61 @@ defmodule Srh.Http.CommandHandler do {:ok, Enum.reverse(responses)} end + defp dispatch_command_transaction_array( + command_array, + %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info, + responses \\ [] + ) do + case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do + {:ok, client_pid} -> + # Borrow a client, then run all of the commands (wrapped in MULTI and EXEC) + worker_pid = Client.borrow_worker(client_pid) + + wrapped_command_array = [["MULTI"] | command_array] + do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses) + + # Now manually run the EXEC - this is what contains the information to form the response, not the above + result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do + {:ok, res} -> + { + :ok, + res + |> Enum.map(&(%{result: &1})) + } + # TODO: Can there be any inline errors here? Wouldn't they fail the whole tx? + + {:error, error} -> + {:redis_error, %{error: error.message}} + end + + Client.return_worker(client_pid, worker_pid) + + result + {:error, msg} -> + {:server_error, msg} + end + end + + defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do + updated_responses = case ClientWorker.redis_command(worker_pid, current) do + {:ok, res} -> + [%{result: res} | responses] + + {:error, error} -> + [ + %{ + error: error.message + } | responses + ] + end + + do_dispatch_command_transaction_array(rest, worker_pid, updated_responses) + end + + defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do + {:ok, Enum.reverse(responses)} + end + defp dispatch_command( command_array, %{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info @@ -80,10 +155,10 @@ defmodule Srh.Http.CommandHandler do {:error, error} -> { - :malformed_data, - Jason.encode!(%{ + :redis_error, + %{ error: error.message - }) + } } end diff --git a/lib/srh/redis/client.ex b/lib/srh/redis/client.ex index 02ea029..337ccb6 100644 --- a/lib/srh/redis/client.ex +++ b/lib/srh/redis/client.ex @@ -27,6 +27,14 @@ defmodule Srh.Redis.Client do GenServer.call(client, {:find_worker}) end + def borrow_worker(client) do + GenServer.call(client, {:borrow_worker}) + end + + def return_worker(client, pid) do + GenServer.cast(client, {:return_worker, pid}) + end + def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state) when is_pid(registry_pid) do {:ok, worker} = ClientRegistry.find_worker(registry_pid) @@ -34,10 +42,23 @@ defmodule Srh.Redis.Client do {:reply, worker, state} end + def handle_call({:borrow_worker}, _from, %{registry_pid: registry_pid} = state) + when is_pid(registry_pid) do + {:ok, worker} = ClientRegistry.borrow_worker(registry_pid) + Process.send(self(), :reset_idle_death, []) + {:reply, worker, state} + end + def handle_call(_msg, _from, state) do {:reply, :ok, state} end + def handle_cast({:return_worker, pid}, %{registry_pid: registry_pid} = state) + when is_pid(pid) and is_pid(registry_pid) do + ClientRegistry.return_worker(registry_pid, pid) + {:noreply, state} + end + def handle_cast(_msg, state) do {:noreply, state} end diff --git a/lib/srh/redis/client_registry.ex b/lib/srh/redis/client_registry.ex index 75ccb6e..2a0a2bc 100644 --- a/lib/srh/redis/client_registry.ex +++ b/lib/srh/redis/client_registry.ex @@ -10,7 +10,8 @@ defmodule Srh.Redis.ClientRegistry do :ok, %{ worker_pids: [], - last_worker_index: 0 + last_worker_index: 0, + currently_borrowed_pids: [] } } end @@ -19,6 +20,14 @@ defmodule Srh.Redis.ClientRegistry do GenServer.call(registry, {:find_worker}) end + def borrow_worker(registry) do + GenServer.call(registry, {:borrow_worker}) + end + + def return_worker(registry, pid) do + GenServer.cast(registry, {:return_worker, pid}) + end + def add_worker(registry, pid) do GenServer.cast(registry, {:add_worker, pid}) end @@ -27,25 +36,31 @@ defmodule Srh.Redis.ClientRegistry do GenServer.cast(registry, {:destroy_workers}) end - def handle_call({:find_worker}, _from, state) do - case length(state.worker_pids) do - 0 -> - {:reply, {:error, :none_available}, state} - - len -> - target = state.last_worker_index + 1 - - corrected_target = - case target >= len do - true -> 0 - false -> target - end - - {:reply, {:ok, Enum.at(state.worker_pids, corrected_target)}, - %{state | last_worker_index: corrected_target}} + def handle_call({:borrow_worker}, _from, state) do + case do_find_worker(state) do + {{:error, msg}, state_update} -> + {:reply, {:error, msg}, state_update} + + {{:ok, pid}, state_update} -> + # We want to put this pid into the borrowed pids state list + { + :reply, + {:ok, pid}, + %{ + state_update + | currently_borrowed_pids: + [pid | state_update.currently_borrowed_pids] + |> Enum.uniq() + } + } end end + def handle_call({:find_worker}, _from, state) do + {res, state_update} = do_find_worker(state) + {:reply, res, state_update} + end + def handle_call(_msg, _from, state) do {:reply, :ok, state} end @@ -72,6 +87,12 @@ defmodule Srh.Redis.ClientRegistry do {:noreply, %{state | worker_pids: [], last_worker_index: 0}} end + def handle_cast({:return_worker, pid}, state) do + # Remove it from the borrowed array + {:noreply, + %{state | currently_borrowed_pids: List.delete(state.currently_borrowed_pids, pid)}} + end + def handle_cast(_msg, state) do {:noreply, state} end @@ -83,4 +104,29 @@ defmodule Srh.Redis.ClientRegistry do def handle_info(_msg, state) do {:noreply, state} end + + defp do_find_worker(state) do + filtered_pids = + state.worker_pids + |> Enum.filter(&(!Enum.member?(state.currently_borrowed_pids, &1))) + + case length(filtered_pids) do + 0 -> + {{:error, :none_available}, state} + + len -> + target = state.last_worker_index + 1 + + corrected_target = + case target >= len do + true -> 0 + false -> target + end + + { + {:ok, Enum.at(state.worker_pids, corrected_target)}, + %{state | last_worker_index: corrected_target} + } + end + end end