Skip to content

Commit

Permalink
Merge d94dbf0 into 002dc85
Browse files Browse the repository at this point in the history
  • Loading branch information
savonarola committed Apr 30, 2024
2 parents 002dc85 + d94dbf0 commit 8baf562
Show file tree
Hide file tree
Showing 35 changed files with 635 additions and 83 deletions.
37 changes: 29 additions & 8 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -
Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
{ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel};
handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
{ok, {close, ReasonName}, Channel};
{ok, ?REPLY_CLOSE(ReasonName), Channel};
handle_out(auth, {ReasonCode, Properties}, Channel) ->
{ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
handle_out(Type, Data, Channel) ->
Expand Down Expand Up @@ -1406,6 +1406,16 @@ handle_timeout(
{_, Quota2} ->
{ok, clean_timer(TimerName, Channel#channel{quota = Quota2})}
end;
handle_timeout(
_TRef,
connection_expire,
#channel{conn_state = ConnState} = Channel0
) ->
Channel1 = clean_timer(connection_expire, Channel0),
case ConnState of
disconnected -> {ok, Channel1};
_ -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel1)
end;
handle_timeout(TRef, Msg, Channel) ->
case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
[] ->
Expand Down Expand Up @@ -1801,18 +1811,23 @@ log_auth_failure(Reason) ->
%% Merge authentication result into ClientInfo
%% Authentication result may include:
%% 1. `is_superuser': The superuser flag from various backends
%% 2. `acl': ACL rules from JWT, HTTP auth backend
%% 3. `client_attrs': Extra client attributes from JWT, HTTP auth backend
%% 4. Maybe more non-standard fields used by hook callbacks
%% 2. `expire_at`: Authentication validity deadline, the client will be disconnected after this time
%% 3. `acl': ACL rules from JWT, HTTP auth backend
%% 4. `client_attrs': Extra client attributes from JWT, HTTP auth backend
%% 5. Maybe more non-standard fields used by hook callbacks
merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) ->
IsSuperuser = maps:get(is_superuser, AuthResult0, false),
AuthResult = maps:without([client_attrs], AuthResult0),
ExpireAt = maps:get(expire_at, AuthResult0, undefined),
AuthResult = maps:without([client_attrs, expire_at], AuthResult0),
Attrs0 = maps:get(client_attrs, ClientInfo, #{}),
Attrs1 = maps:get(client_attrs, AuthResult0, #{}),
Attrs = maps:merge(Attrs0, Attrs1),
NewClientInfo = maps:merge(
ClientInfo#{client_attrs => Attrs},
AuthResult#{is_superuser => IsSuperuser}
AuthResult#{
is_superuser => IsSuperuser,
auth_expire_at => ExpireAt
}
),
fix_mountpoint(NewClientInfo).

Expand Down Expand Up @@ -2219,10 +2234,16 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = trim_conninfo(NConnInfo),
conn_state = connected
}.
}).

schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := undefined}}) ->
Channel;
schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := ExpireAt}}) ->
Interval = max(0, ExpireAt - erlang:system_time(millisecond)),
ensure_timer(connection_expire, Interval, Channel).

trim_conninfo(ConnInfo) ->
maps:without(
Expand Down
1 change: 1 addition & 0 deletions apps/emqx/test/emqx_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ clientinfo(InitProps) ->
clientid => <<"clientid">>,
username => <<"username">>,
is_superuser => false,
auth_expire_at => undefined,
is_bridge => false,
mountpoint => undefined
},
Expand Down
54 changes: 54 additions & 0 deletions apps/emqx/test/emqx_connection_expire_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_connection_expire_SUITE).

-compile(export_all).
-compile(nowarn_export_all).

-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

all() -> emqx_common_test_helpers:all(?MODULE).

%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------

init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].

end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).

t_disonnect_by_auth_info(_) ->
_ = process_flag(trap_exit, true),

_ = meck:new(emqx_access_control, [passthrough, no_history]),
_ = meck:expect(emqx_access_control, authenticate, fun(_) ->
{ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
end),

{ok, C} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(C),

receive
{disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok
after 5000 ->
ct:fail("Client should be disconnected by timeout")
end.
1 change: 1 addition & 0 deletions apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ end).
-type state() :: #{atom() => term()}.
-type extra() :: #{
is_superuser := boolean(),
expire_at => pos_integer(),
atom() => term()
}.
-type user_info() :: #{
Expand Down
51 changes: 35 additions & 16 deletions apps/emqx_auth_jwt/src/emqx_authn_jwt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ authenticate(
Credential,
#{
verify_claims := VerifyClaims0,
disconnect_after_expire := DisconnectAfterExpire,
jwk := JWK,
acl_claim_name := AclClaimName,
from := From
Expand All @@ -86,11 +87,12 @@ authenticate(
JWT = maps:get(From, Credential),
JWKs = [JWK],
VerifyClaims = render_expected(VerifyClaims0, Credential),
verify(JWT, JWKs, VerifyClaims, AclClaimName);
verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire);
authenticate(
Credential,
#{
verify_claims := VerifyClaims0,
disconnect_after_expire := DisconnectAfterExpire,
jwk_resource := ResourceId,
acl_claim_name := AclClaimName,
from := From
Expand All @@ -106,7 +108,7 @@ authenticate(
{ok, JWKs} ->
JWT = maps:get(From, Credential),
VerifyClaims = render_expected(VerifyClaims0, Credential),
verify(JWT, JWKs, VerifyClaims, AclClaimName)
verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire)
end.

destroy(#{jwk_resource := ResourceId}) ->
Expand All @@ -125,6 +127,7 @@ create2(#{
secret := Secret0,
secret_base64_encoded := Base64Encoded,
verify_claims := VerifyClaims,
disconnect_after_expire := DisconnectAfterExpire,
acl_claim_name := AclClaimName,
from := From
}) ->
Expand All @@ -136,6 +139,7 @@ create2(#{
{ok, #{
jwk => JWK,
verify_claims => VerifyClaims,
disconnect_after_expire => DisconnectAfterExpire,
acl_claim_name => AclClaimName,
from => From
}}
Expand All @@ -145,20 +149,23 @@ create2(#{
algorithm := 'public-key',
public_key := PublicKey,
verify_claims := VerifyClaims,
disconnect_after_expire := DisconnectAfterExpire,
acl_claim_name := AclClaimName,
from := From
}) ->
JWK = create_jwk_from_public_key(PublicKey),
{ok, #{
jwk => JWK,
verify_claims => VerifyClaims,
disconnect_after_expire => DisconnectAfterExpire,
acl_claim_name => AclClaimName,
from => From
}};
create2(
#{
use_jwks := true,
verify_claims := VerifyClaims,
disconnect_after_expire := DisconnectAfterExpire,
acl_claim_name := AclClaimName,
from := From
} = Config
Expand All @@ -173,6 +180,7 @@ create2(
{ok, #{
jwk_resource => ResourceId,
verify_claims => VerifyClaims,
disconnect_after_expire => DisconnectAfterExpire,
acl_claim_name => AclClaimName,
from => From
}}.
Expand Down Expand Up @@ -211,23 +219,12 @@ render_expected([{Name, ExpectedTemplate} | More], Variables) ->
Expected = emqx_authn_utils:render_str(ExpectedTemplate, Variables),
[{Name, Expected} | render_expected(More, Variables)].

verify(undefined, _, _, _) ->
verify(undefined, _, _, _, _) ->
ignore;
verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire) ->
case do_verify(JWT, JWKs, VerifyClaims) of
{ok, Extra} ->
IsSuperuser = emqx_authn_utils:is_superuser(Extra),
Attrs = emqx_authn_utils:client_attrs(Extra),
try
ACL = acl(Extra, AclClaimName),
Result = maps:merge(IsSuperuser, maps:merge(ACL, Attrs)),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
{error, bad_username_or_password}
end;
extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire);
{error, {missing_claim, Claim}} ->
%% it's a invalid token, so it's ok to log
?TRACE_AUTHN_PROVIDER("missing_jwt_claim", #{jwt => JWT, claim => Claim}),
Expand All @@ -242,6 +239,25 @@ verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
{error, bad_username_or_password}
end.

extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire) ->
IsSuperuser = emqx_authn_utils:is_superuser(Extra),
Attrs = emqx_authn_utils:client_attrs(Extra),
ExpireAt = expire_at(DisconnectAfterExpire, Extra),
try
ACL = acl(Extra, AclClaimName),
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
{error, bad_username_or_password}
end.

expire_at(false, _Extra) -> #{};
expire_at(true, #{<<"exp">> := ExpireTime}) -> #{expire_at => ExpireTime};
expire_at(true, #{}) -> #{}.

acl(Claims, AclClaimName) ->
case Claims of
#{AclClaimName := Rules} ->
Expand Down Expand Up @@ -379,3 +395,6 @@ parse_rule(Rule) ->
{error, Reason} ->
throw({bad_acl_rule, Reason})
end.

merge_maps([]) -> #{};
merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).
6 changes: 6 additions & 0 deletions apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ common_fields() ->
desc => ?DESC(acl_claim_name)
}},
{verify_claims, fun verify_claims/1},
{disconnect_after_expire, fun disconnect_after_expire/1},
{from, fun from/1}
] ++ emqx_authn_schema:common_fields().

Expand Down Expand Up @@ -188,6 +189,11 @@ verify_claims(required) ->
verify_claims(_) ->
undefined.

disconnect_after_expire(type) -> boolean();
disconnect_after_expire(desc) -> ?DESC(?FUNCTION_NAME);
disconnect_after_expire(default) -> false;
disconnect_after_expire(_) -> undefined.

do_check_verify_claims([]) ->
true;
%% _Expected can't be invalid since tuples may come only from converter
Expand Down
20 changes: 13 additions & 7 deletions apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ t_hmac_based(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => [{<<"username">>, <<"${username}">>}]
verify_claims => [{<<"username">>, <<"${username}">>}],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),

Expand Down Expand Up @@ -179,7 +180,8 @@ t_public_key(_) ->
use_jwks => false,
algorithm => 'public-key',
public_key => PublicKey,
verify_claims => []
verify_claims => [],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),

Expand Down Expand Up @@ -207,7 +209,8 @@ t_jwt_in_username(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => []
verify_claims => [],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),

Expand All @@ -229,7 +232,8 @@ t_complex_template(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => [{<<"id">>, <<"${username}-${clientid}">>}]
verify_claims => [{<<"id">>, <<"${username}-${clientid}">>}],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),

Expand Down Expand Up @@ -269,7 +273,7 @@ t_jwks_renewal(_Config) ->
algorithm => 'public-key',
ssl => #{enable => false},
verify_claims => [],

disconnect_after_expire => false,
use_jwks => true,
endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH,
refresh_interval => 1000,
Expand Down Expand Up @@ -366,7 +370,8 @@ t_verify_claims(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => [{<<"foo">>, <<"bar">>}]
verify_claims => [{<<"foo">>, <<"bar">>}],
disconnect_after_expire => false
},
{ok, State0} = emqx_authn_jwt:create(?AUTHN_ID, Config0),

Expand Down Expand Up @@ -456,7 +461,8 @@ t_verify_claim_clientid(_) ->
algorithm => 'hmac-based',
secret => Secret,
secret_base64_encoded => false,
verify_claims => [{<<"cl">>, <<"${clientid}">>}]
verify_claims => [{<<"cl">>, <<"${clientid}">>}],
disconnect_after_expire => false
},
{ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),

Expand Down

0 comments on commit 8baf562

Please sign in to comment.