Skip to content

Commit

Permalink
[fix][broker] ServerCnx broken after recent cherry-picks (apache#19521)
Browse files Browse the repository at this point in the history
I broke all release branches when I cherry picked 2847dd1 to them. This change takes some of the underlying logic from apache#19409, without taking the async logic.

* Make changes to `ServerCnx` to make tests pass

Tests are currently failing, so passing tests will show that this solution is correct.

- [x] `doc-not-needed`

(cherry picked from commit 8246da2)
(cherry picked from commit 15e4198)
  • Loading branch information
michaeljmarshall committed Feb 15, 2023
1 parent fb5477b commit 6132b46
Showing 1 changed file with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -626,15 +626,6 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {

// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
if (!isValidRoleAndOriginalPrincipal()) {
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
return;
}
}
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
Expand All @@ -651,16 +642,16 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
}

// According to auth result, send newConnected or newAuthChallenge command.
private State doAuthentication(AuthData clientData,
private void doAuthentication(AuthData clientData,
boolean useOriginalAuthState,
int clientProtocolVersion,
String clientVersion) throws Exception {

// The original auth state can only be set on subsequent auth attempts (and only
// in presence of a proxy and if the proxy is forwarding the credentials).
// In this case, the re-validation needs to be done against the original client
// credentials.
boolean useOriginalAuthState = (originalAuthState != null);
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
AuthData brokerData = authState.authenticate(clientData);

Expand Down Expand Up @@ -693,6 +684,15 @@ private State doAuthentication(AuthData clientData,

if (state != State.Connected) {
// First time authentication is done
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
if (!isValidRoleAndOriginalPrincipal()) {
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
return;
}
}
completeConnect(clientProtocolVersion, clientVersion);
} else {
// If the connection was already ready, it means we're doing a refresh
Expand All @@ -706,18 +706,16 @@ private State doAuthentication(AuthData clientData,
}
}
}
} else {

return State.Connected;
}

// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
}
}
return State.Connecting;
}

public void refreshAuthenticationCredentials() {
Expand Down Expand Up @@ -804,6 +802,8 @@ protected void handleConnect(CommandConnect connect) {
return;
}

state = State.Connecting;

try {
byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
AuthData clientData = AuthData.of(authData);
Expand Down Expand Up @@ -851,8 +851,6 @@ protected void handleConnect(CommandConnect connect) {
log.debug("[{}] Authenticate role : {}", remoteAddress, role);
}

state = doAuthentication(clientData, clientProtocolVersion, clientVersion);

// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
Expand Down Expand Up @@ -894,6 +892,7 @@ protected void handleConnect(CommandConnect connect) {
remoteAddress, originalPrincipal);
}
}
doAuthentication(clientData, false, clientProtocolVersion, clientVersion);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
Expand All @@ -917,7 +916,7 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {

try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData, authResponse.getProtocolVersion(),
doAuthentication(clientData, originalAuthState != null, authResponse.getProtocolVersion(),
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
Expand Down

0 comments on commit 6132b46

Please sign in to comment.