Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Config

config :srh,
mode: "file",
file_path: "srh-config/tokens.json",
port: 8080
mode: "file",
file_path: "srh-config/tokens.json",
port: 8080

import_config "#{config_env()}.exs"
2 changes: 1 addition & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Config

config :srh,
port: 80
port: 80
6 changes: 3 additions & 3 deletions config/runtime.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Config

config :srh,
mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
port: Integer.parse(System.get_env("PORT") || "8080")
mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
port: Integer.parse(System.get_env("PORT") || "8080")
2 changes: 1 addition & 1 deletion lib/srh.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Srh do

def start(_type, _args) do
IO.puts("Using port #{@port}")

children = [
Srh.Auth.TokenResolver,
{GenRegistry, worker_module: Srh.Redis.Client},
Expand Down
9 changes: 9 additions & 0 deletions lib/srh/http/base_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ defmodule Srh.Http.BaseRouter do
|> handle_response(conn)
end

post "/multi-exec" do
conn
|> handle_extract_auth(&CommandHandler.handle_command_transaction_array(conn, &1))
|> handle_response(conn)
end

match _ do
send_resp(conn, 404, "Endpoint not found")
end
Expand Down Expand Up @@ -51,6 +57,9 @@ defmodule Srh.Http.BaseRouter do
{:malformed_data, message} ->
%{code: 400, message: message, json: false}

{:redis_error, data} ->
%{code: 400, message: Jason.encode!(data), json: true}

{:not_authorized, message} ->
%{code: 401, message: message, json: false}

Expand Down
87 changes: 81 additions & 6 deletions lib/srh/http/command_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ defmodule Srh.Http.CommandHandler do
end
end

def handle_command_transaction_array(conn, token) do
# Transactions use the same body format as pipelines, so we can use the same validator
case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
{:ok, array_of_command_arrays} ->
do_handle_command_transaction_array(array_of_command_arrays, token)

{:error, error_message} ->
{:malformed_data, error_message}
end
end

defp do_handle_command(command_array, token) do
case TokenResolver.resolve(token) do
{:ok, connection_info} ->
Expand All @@ -44,6 +55,16 @@ defmodule Srh.Http.CommandHandler do
end
end

defp do_handle_command_transaction_array(array_of_command_arrays, token) do
case TokenResolver.resolve(token) do
{:ok, connection_info} ->
dispatch_command_transaction_array(array_of_command_arrays, connection_info)

{:error, msg} ->
{:not_authorized, msg}
end
end

defp dispatch_command_array(_arr, _connection_info, responses \\ [])

defp dispatch_command_array([current | rest], connection_info, responses) do
Expand All @@ -52,9 +73,8 @@ defmodule Srh.Http.CommandHandler do
{:ok, result_map} ->
[result_map | responses]

{:malformed_data, result_json} ->
# TODO: change up the chain to json this at the last moment, so this isn't here
[Jason.decode!(result_json) | responses]
{:redis_error, result} ->
[result | responses]
end

dispatch_command_array(rest, connection_info, updated_responses)
Expand All @@ -65,6 +85,61 @@ defmodule Srh.Http.CommandHandler do
{:ok, Enum.reverse(responses)}
end

defp dispatch_command_transaction_array(
command_array,
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info,
responses \\ []
) do
case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
{:ok, client_pid} ->
# Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
worker_pid = Client.borrow_worker(client_pid)

wrapped_command_array = [["MULTI"] | command_array]
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)

# Now manually run the EXEC - this is what contains the information to form the response, not the above
result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
{:ok, res} ->
{
:ok,
res
|> Enum.map(&(%{result: &1}))
}
# TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?

{:error, error} ->
{:redis_error, %{error: error.message}}
end

Client.return_worker(client_pid, worker_pid)

result
{:error, msg} ->
{:server_error, msg}
end
end

defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
updated_responses = case ClientWorker.redis_command(worker_pid, current) do
{:ok, res} ->
[%{result: res} | responses]

{:error, error} ->
[
%{
error: error.message
} | responses
]
end

do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
end

defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do
{:ok, Enum.reverse(responses)}
end

defp dispatch_command(
command_array,
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
Expand All @@ -80,10 +155,10 @@ defmodule Srh.Http.CommandHandler do

{:error, error} ->
{
:malformed_data,
Jason.encode!(%{
:redis_error,
%{
error: error.message
})
}
}
end

Expand Down
21 changes: 21 additions & 0 deletions lib/srh/redis/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,38 @@ defmodule Srh.Redis.Client do
GenServer.call(client, {:find_worker})
end

def borrow_worker(client) do
GenServer.call(client, {:borrow_worker})
end

def return_worker(client, pid) do
GenServer.cast(client, {:return_worker, pid})
end

def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state)
when is_pid(registry_pid) do
{:ok, worker} = ClientRegistry.find_worker(registry_pid)
Process.send(self(), :reset_idle_death, [])
{:reply, worker, state}
end

def handle_call({:borrow_worker}, _from, %{registry_pid: registry_pid} = state)
when is_pid(registry_pid) do
{:ok, worker} = ClientRegistry.borrow_worker(registry_pid)
Process.send(self(), :reset_idle_death, [])
{:reply, worker, state}
end

def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end

def handle_cast({:return_worker, pid}, %{registry_pid: registry_pid} = state)
when is_pid(pid) and is_pid(registry_pid) do
ClientRegistry.return_worker(registry_pid, pid)
{:noreply, state}
end

def handle_cast(_msg, state) do
{:noreply, state}
end
Expand Down
80 changes: 63 additions & 17 deletions lib/srh/redis/client_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule Srh.Redis.ClientRegistry do
:ok,
%{
worker_pids: [],
last_worker_index: 0
last_worker_index: 0,
currently_borrowed_pids: []
}
}
end
Expand All @@ -19,6 +20,14 @@ defmodule Srh.Redis.ClientRegistry do
GenServer.call(registry, {:find_worker})
end

def borrow_worker(registry) do
GenServer.call(registry, {:borrow_worker})
end

def return_worker(registry, pid) do
GenServer.cast(registry, {:return_worker, pid})
end

def add_worker(registry, pid) do
GenServer.cast(registry, {:add_worker, pid})
end
Expand All @@ -27,25 +36,31 @@ defmodule Srh.Redis.ClientRegistry do
GenServer.cast(registry, {:destroy_workers})
end

def handle_call({:find_worker}, _from, state) do
case length(state.worker_pids) do
0 ->
{:reply, {:error, :none_available}, state}

len ->
target = state.last_worker_index + 1

corrected_target =
case target >= len do
true -> 0
false -> target
end

{:reply, {:ok, Enum.at(state.worker_pids, corrected_target)},
%{state | last_worker_index: corrected_target}}
def handle_call({:borrow_worker}, _from, state) do
case do_find_worker(state) do
{{:error, msg}, state_update} ->
{:reply, {:error, msg}, state_update}

{{:ok, pid}, state_update} ->
# We want to put this pid into the borrowed pids state list
{
:reply,
{:ok, pid},
%{
state_update
| currently_borrowed_pids:
[pid | state_update.currently_borrowed_pids]
|> Enum.uniq()
}
}
end
end

def handle_call({:find_worker}, _from, state) do
{res, state_update} = do_find_worker(state)
{:reply, res, state_update}
end

def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end
Expand All @@ -72,6 +87,12 @@ defmodule Srh.Redis.ClientRegistry do
{:noreply, %{state | worker_pids: [], last_worker_index: 0}}
end

def handle_cast({:return_worker, pid}, state) do
# Remove it from the borrowed array
{:noreply,
%{state | currently_borrowed_pids: List.delete(state.currently_borrowed_pids, pid)}}
end

def handle_cast(_msg, state) do
{:noreply, state}
end
Expand All @@ -83,4 +104,29 @@ defmodule Srh.Redis.ClientRegistry do
def handle_info(_msg, state) do
{:noreply, state}
end

defp do_find_worker(state) do
filtered_pids =
state.worker_pids
|> Enum.filter(&(!Enum.member?(state.currently_borrowed_pids, &1)))

case length(filtered_pids) do
0 ->
{{:error, :none_available}, state}

len ->
target = state.last_worker_index + 1

corrected_target =
case target >= len do
true -> 0
false -> target
end

{
{:ok, Enum.at(state.worker_pids, corrected_target)},
%{state | last_worker_index: corrected_target}
}
end
end
end