Skip to content

Commit 4015721

Browse files
committed
Code for step 3
1 parent aba2e1b commit 4015721

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

lib/rate_limiters/leaky_bucket.ex

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
defmodule PaymentsClient.RateLimiters.LeakyBucket do
2+
use GenServer
3+
4+
require Logger
5+
6+
alias PaymentsClient.RateLimiter
7+
8+
@behaviour RateLimiter
9+
10+
def start_link(opts) do
11+
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
12+
end
13+
14+
@impl true
15+
def init(opts) do
16+
state = %{
17+
request_queue: :queue.new(),
18+
request_queue_size: 0,
19+
request_queue_poll_rate:
20+
RateLimiter.calculate_refresh_rate(opts.timeframe_max_requests, opts.timeframe, opts.timeframe_units),
21+
send_after_ref: nil
22+
}
23+
24+
{:ok, state, {:continue, :initial_timer}}
25+
end
26+
27+
# ---------------- Client facing function ----------------
28+
29+
@impl RateLimiter
30+
def make_request(request_handler, response_handler) do
31+
GenServer.cast(__MODULE__, {:enqueue_request, request_handler, response_handler})
32+
end
33+
34+
# ---------------- Server Callbacks ----------------
35+
36+
@impl true
37+
def handle_continue(:initial_timer, state) do
38+
{:noreply, %{state | send_after_ref: schedule_timer(state.request_queue_poll_rate)}}
39+
end
40+
41+
@impl true
42+
def handle_cast({:enqueue_request, request_handler, response_handler}, state) do
43+
updated_queue = :queue.in({request_handler, response_handler}, state.request_queue)
44+
new_queue_size = state.request_queue_size + 1
45+
46+
{:noreply, %{state | request_queue: updated_queue, request_queue_size: new_queue_size}}
47+
end
48+
49+
@impl true
50+
def handle_info(:pop_from_request_queue, %{request_queue_size: 0} = state) do
51+
# No work to do as the queue size is zero...schedule the next timer
52+
{:noreply, %{state | send_after_ref: schedule_timer(state.request_queue_poll_rate)}}
53+
end
54+
55+
def handle_info(:pop_from_request_queue, state) do
56+
{{:value, {request_handler, response_handler}}, new_request_queue} = :queue.out(state.request_queue)
57+
start_message = "Request started #{NaiveDateTime.utc_now()}"
58+
59+
Task.Supervisor.async_nolink(RateLimiter.TaskSupervisor, fn ->
60+
{req_module, req_function, req_args} = request_handler
61+
{resp_module, resp_function} = response_handler
62+
63+
response = apply(req_module, req_function, req_args)
64+
apply(resp_module, resp_function, [response])
65+
66+
Logger.info("#{start_message}\nRequest completed #{NaiveDateTime.utc_now()}")
67+
end)
68+
69+
{:noreply,
70+
%{
71+
state
72+
| request_queue: new_request_queue,
73+
send_after_ref: schedule_timer(state.request_queue_poll_rate),
74+
request_queue_size: state.request_queue_size - 1
75+
}}
76+
end
77+
78+
def handle_info({ref, _result}, state) do
79+
Process.demonitor(ref, [:flush])
80+
81+
{:noreply, state}
82+
end
83+
84+
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
85+
{:noreply, state}
86+
end
87+
88+
defp schedule_timer(queue_poll_rate) do
89+
Process.send_after(self(), :pop_from_request_queue, queue_poll_rate)
90+
end
91+
end

0 commit comments

Comments
 (0)