Skip to content

Commit

Permalink
chore: format changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvjain99 committed May 13, 2024
1 parent 743dd81 commit 35226e5
Showing 1 changed file with 52 additions and 18 deletions.
70 changes: 52 additions & 18 deletions apps/vmq_server/src/vmq_message_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,54 @@ start() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

write(SubscriberId, Msg) ->
vmq_redis:query(vmq_message_store_redis_client, ["RPUSH", term_to_binary(SubscriberId), term_to_binary(Msg)], ?RPUSH, ?MSG_STORE_WRITE).
vmq_redis:query(
vmq_message_store_redis_client,
["RPUSH", term_to_binary(SubscriberId), term_to_binary(Msg)],
?RPUSH,
?MSG_STORE_WRITE
).

read(_SubscriberId, _MsgRef) ->
{error, not_supported}.

delete(SubscriberId) ->
vmq_redis:query(vmq_message_store_redis_client, ["DEL", term_to_binary(SubscriberId)], ?DEL, ?MSG_STORE_DELETE).
vmq_redis:query(
vmq_message_store_redis_client,
["DEL", term_to_binary(SubscriberId)],
?DEL,
?MSG_STORE_DELETE
).

delete(SubscriberId, _MsgRef) ->
vmq_redis:query(vmq_message_store_redis_client, ["LPOP", term_to_binary(SubscriberId), 1], ?LPOP, ?MSG_STORE_DELETE).
vmq_redis:query(
vmq_message_store_redis_client,
["LPOP", term_to_binary(SubscriberId), 1],
?LPOP,
?MSG_STORE_DELETE
).

find(SubscriberId) ->
case vmq_redis:query(vmq_message_store_redis_client, ["LRANGE", term_to_binary(SubscriberId), "0", "-1"], ?FIND, ?MSG_STORE_FIND) of
case
vmq_redis:query(
vmq_message_store_redis_client,
["LRANGE", term_to_binary(SubscriberId), "0", "-1"],
?FIND,
?MSG_STORE_FIND
)
of
{ok, MsgsInB} ->
DMsgs = lists:foldr(fun(MsgB, Acc) ->
Msg = binary_to_term(MsgB),
D = #deliver{msg = Msg, qos = Msg#vmq_msg.qos},
[D | Acc] end, [], MsgsInB),
DMsgs = lists:foldr(
fun(MsgB, Acc) ->
Msg = binary_to_term(MsgB),
D = #deliver{msg = Msg, qos = Msg#vmq_msg.qos},
[D | Acc]
end,
[],
MsgsInB
),
{ok, DMsgs};
Res -> Res
Res ->
Res
end.

%% ===================================================================
Expand All @@ -53,17 +81,23 @@ find(SubscriberId) ->
{atom(), {atom(), atom(), list()}, permanent, pos_integer(), worker, [atom()]}
]}}.
init([]) ->
StoreCfgs = application:get_env(vmq_server, message_store, [{redis,[{connect_options,"[{sentinel, [{endpoints, [{\"localhost\", 26379}]}]},{database,2}]"}]}]),
StoreCfgs = application:get_env(vmq_server, message_store, [
{redis, [
{connect_options, "[{sentinel, [{endpoints, [{\"localhost\", 26379}]}]},{database,2}]"}
]}
]),
Redis = proplists:get_value(redis, StoreCfgs),

{ok,
{{one_for_one, 5, 10}, [
{eredis, {eredis, start_link, [
[
{username, proplists:get_value(username, Redis)},
{password, proplists:get_value(password, Redis)},
{name, {local, vmq_message_store_redis_client}} |
vmq_schema_util:parse_list(proplists:get_value(connect_options, Redis))
]
]}, permanent, 5000, worker, [eredis]}
{eredis,
{eredis, start_link, [
[
{username, proplists:get_value(username, Redis)},
{password, proplists:get_value(password, Redis)},
{name, {local, vmq_message_store_redis_client}}
| vmq_schema_util:parse_list(proplists:get_value(connect_options, Redis))
]
]},
permanent, 5000, worker, [eredis]}
]}}.

0 comments on commit 35226e5

Please sign in to comment.