Skip to content

Commit

Permalink
feat: isolate resource manager processes
Browse files Browse the repository at this point in the history
  • Loading branch information
mononym committed May 4, 2022
1 parent 8ff552c commit e0e7116
Show file tree
Hide file tree
Showing 25 changed files with 640 additions and 302 deletions.
40 changes: 14 additions & 26 deletions apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
Expand Up @@ -181,34 +181,22 @@ create(
request_timeout => RequestTimeout,
resource_id => ResourceId
},
case
emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_http,
Config#{
base_url => BaseUrl,
pool_type => random
},
#{}
)
of
{ok, already_created} ->
{ok, State};
{ok, _} ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
_ = emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_http,
Config#{
base_url => BaseUrl,
pool_type => random
},
#{}
),
{ok, State}.

update(Config, State) ->
case create(Config) of
{ok, NewState} ->
ok = destroy(State),
{ok, NewState};
{error, Reason} ->
{error, Reason}
end.
{ok, NewState} = create(Config),
ok = destroy(State),
{ok, NewState}.

authenticate(#{auth_method := _}, _) ->
ignore;
Expand Down
34 changes: 11 additions & 23 deletions apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl
Expand Up @@ -146,31 +146,19 @@ create(#{filter := Filter} = Config) ->
filter_template => FilterTemplate,
resource_id => ResourceId
},
case
emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_mongo,
Config,
#{}
)
of
{ok, already_created} ->
{ok, NState};
{ok, _} ->
{ok, NState};
{error, Reason} ->
{error, Reason}
end.
_ = emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_mongo,
Config,
#{}
),
{ok, NState}.

update(Config, State) ->
case create(Config) of
{ok, NewState} ->
ok = destroy(State),
{ok, NewState};
{error, Reason} ->
{error, Reason}
end.
{ok, NewState} = create(Config),
ok = destroy(State),
{ok, NewState}.

authenticate(#{auth_method := _}, _) ->
ignore;
Expand Down
32 changes: 11 additions & 21 deletions apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
Expand Up @@ -99,29 +99,19 @@ create(
query_timeout => QueryTimeout,
resource_id => ResourceId
},
case
emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_mysql,
Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}},
#{}
)
of
{ok, _} ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
_ = emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_mysql,
Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}},
#{}
),
{ok, State}.

update(Config, State) ->
case create(Config) of
{ok, NewState} ->
ok = destroy(State),
{ok, NewState};
{error, Reason} ->
{error, Reason}
end.
{ok, NewState} = create(Config),
ok = destroy(State),
{ok, NewState}.

authenticate(#{auth_method := _}, _) ->
ignore;
Expand Down
34 changes: 11 additions & 23 deletions apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
Expand Up @@ -96,31 +96,19 @@ create(
password_hash_algorithm => Algorithm,
resource_id => ResourceId
},
case
emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_pgsql,
Config#{prepare_statement => #{ResourceId => Query}},
#{}
)
of
{ok, already_created} ->
{ok, State};
{ok, _} ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
_ = emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_pgsql,
Config#{prepare_statement => #{ResourceId => Query}},
#{}
),
{ok, State}.

update(Config, State) ->
case create(Config) of
{ok, NewState} ->
ok = destroy(State),
{ok, NewState};
{error, Reason} ->
{error, Reason}
end.
{ok, NewState} = create(Config),
ok = destroy(State),
{ok, NewState}.

authenticate(#{auth_method := _}, _) ->
ignore;
Expand Down
24 changes: 8 additions & 16 deletions apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl
Expand Up @@ -115,22 +115,14 @@ create(
cmd => NCmd,
resource_id => ResourceId
},
case
emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_redis,
Config,
#{}
)
of
{ok, already_created} ->
{ok, NState};
{ok, _} ->
{ok, NState};
{error, Reason} ->
{error, Reason}
end
_ = emqx_resource:create_local(
ResourceId,
?RESOURCE_GROUP,
emqx_connector_redis,
Config,
#{}
),
{ok, NState}
catch
error:{unsupported_cmd, _Cmd} ->
{error, {unsupported_cmd, Cmd}};
Expand Down
6 changes: 2 additions & 4 deletions apps/emqx_authz/src/emqx_authz_http.erl
Expand Up @@ -52,10 +52,8 @@ description() ->

init(Config) ->
NConfig = parse_config(Config),
case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of
{error, Reason} -> error({load_config_error, Reason});
{ok, Id} -> NConfig#{annotations => #{id => Id}}
end.
{ok, Id} = emqx_authz_utils:create_resource(emqx_connector_http, NConfig),
NConfig#{annotations => #{id => Id}}.

destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove_local(Id).
Expand Down
20 changes: 8 additions & 12 deletions apps/emqx_authz/src/emqx_authz_mongodb.erl
Expand Up @@ -46,18 +46,14 @@ description() ->
"AuthZ with MongoDB".

init(#{filter := Filter} = Source) ->
case emqx_authz_utils:create_resource(emqx_connector_mongo, Source) of
{error, Reason} ->
error({load_config_error, Reason});
{ok, Id} ->
Source#{
annotations => #{id => Id},
filter_template => emqx_authz_utils:parse_deep(
Filter,
?PLACEHOLDERS
)
}
end.
{ok, Id} = emqx_authz_utils:create_resource(emqx_connector_mongo, Source),
Source#{
annotations => #{id => Id},
filter_template => emqx_authz_utils:parse_deep(
Filter,
?PLACEHOLDERS
)
}.

destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove_local(Id).
Expand Down
8 changes: 2 additions & 6 deletions apps/emqx_authz/src/emqx_authz_mysql.erl
Expand Up @@ -52,12 +52,8 @@ description() ->
init(#{query := SQL} = Source0) ->
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of
{error, Reason} ->
error({load_config_error, Reason});
{ok, Id} ->
Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}
end.
{ok, Id} = emqx_authz_utils:create_resource(emqx_connector_mysql, Source),
Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}.

destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove_local(Id).
Expand Down
34 changes: 14 additions & 20 deletions apps/emqx_authz/src/emqx_authz_postgresql.erl
Expand Up @@ -54,26 +54,20 @@ init(#{query := SQL0} = Source) ->
?PLACEHOLDERS
),
ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql),
case
emqx_resource:create_local(
ResourceID,
?RESOURCE_GROUP,
emqx_connector_pgsql,
Source#{prepare_statement => #{ResourceID => SQL}},
#{}
)
of
{ok, _} ->
Source#{
annotations =>
#{
id => ResourceID,
placeholders => PlaceHolders
}
};
{error, Reason} ->
error({load_config_error, Reason})
end.
_ = emqx_resource:create_local(
ResourceID,
?RESOURCE_GROUP,
emqx_connector_pgsql,
Source#{prepare_statement => #{ResourceID => SQL}},
#{}
),
Source#{
annotations =>
#{
id => ResourceID,
placeholders => PlaceHolders
}
}.

destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove_local(Id).
Expand Down
14 changes: 5 additions & 9 deletions apps/emqx_authz/src/emqx_authz_redis.erl
Expand Up @@ -50,15 +50,11 @@ description() ->
init(#{cmd := CmdStr} = Source) ->
Cmd = tokens(CmdStr),
CmdTemplate = emqx_authz_utils:parse_deep(Cmd, ?PLACEHOLDERS),
case emqx_authz_utils:create_resource(emqx_connector_redis, Source) of
{error, Reason} ->
error({load_config_error, Reason});
{ok, Id} ->
Source#{
annotations => #{id => Id},
cmd_template => CmdTemplate
}
end.
{ok, Id} = emqx_authz_utils:create_resource(emqx_connector_redis, Source),
Source#{
annotations => #{id => Id},
cmd_template => CmdTemplate
}.

destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove_local(Id).
Expand Down
21 changes: 8 additions & 13 deletions apps/emqx_authz/src/emqx_authz_utils.erl
Expand Up @@ -38,19 +38,14 @@

create_resource(Module, Config) ->
ResourceID = make_resource_id(Module),
case
emqx_resource:create_local(
ResourceID,
?RESOURCE_GROUP,
Module,
Config,
#{}
)
of
{ok, already_created} -> {ok, ResourceID};
{ok, _} -> {ok, ResourceID};
{error, Reason} -> {error, Reason}
end.
_ = emqx_resource:create_local(
ResourceID,
?RESOURCE_GROUP,
Module,
Config,
#{}
),
{ok, ResourceID}.

cleanup_resources() ->
lists:foreach(
Expand Down
21 changes: 8 additions & 13 deletions apps/emqx_bridge/src/emqx_bridge.erl
Expand Up @@ -273,19 +273,14 @@ create(Type, Name, Conf) ->
name => Name,
config => Conf
}),
case
emqx_resource:create_local(
resource_id(Type, Name),
<<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{}
)
of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason}
end.
_ = emqx_resource:create_local(
resource_id(Type, Name),
<<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{}
),
maybe_disable_bridge(Type, Name, Conf).

update(BridgeId, {OldConf, Conf}) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge/test/emqx_bridge_SUITE.erl
Expand Up @@ -134,7 +134,7 @@ setup_fake_telemetry_data() ->
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end,
NEvents = 3,
BackInTime = 0,
Timeout = 1_000,
Timeout = 11_000,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
ok = emqx_bridge:load(),
{ok, _} = snabbkaffe_collector:receive_events(Sub),
Expand Down
2 changes: 2 additions & 0 deletions apps/emqx_connector/src/emqx_connector.erl
Expand Up @@ -125,6 +125,8 @@ lookup_raw(Type, Name) ->
end
end.

-spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) ->
ok | {error, Reason :: term()}.
create_dry_run(Type, Conf) ->
emqx_bridge:create_dry_run(Type, Conf).

Expand Down

0 comments on commit e0e7116

Please sign in to comment.