Skip to content

Commit

Permalink
KAFKA-8678; Fix leave group protocol bug in throttling and error resp…
Browse files Browse the repository at this point in the history
…onse (#7101)

This is a bug fix PR to resolve errors introduced in #6188. The PR fixes 2 things:

1. throttle time should be set on version >= 1 instead of version >= 2
2. `getErrorResponse` should set throwable exception within LeaveGroupResponseData

The patch also adds more unit tests to guarantee correctness for leave group protocol.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
Boyang Chen authored and hachikuji committed Jul 22, 2019
1 parent 2a133ba commit f98e176
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 4 deletions.
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -65,11 +66,13 @@ public LeaveGroupRequestData data() {

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
LeaveGroupResponseData response = new LeaveGroupResponseData();
if (version() >= 2) {
response.setThrottleTimeMs(throttleTimeMs);
LeaveGroupResponseData responseData = new LeaveGroupResponseData()
.setErrorCode(Errors.forException(e).code());

if (version() >= 1) {
responseData.setThrottleTimeMs(throttleTimeMs);
}
return new LeaveGroupResponse(response);
return new LeaveGroupResponse(responseData);
}

public static LeaveGroupRequest parse(ByteBuffer buffer, short version) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class LeaveGroupResponse extends AbstractResponse {

Expand Down Expand Up @@ -72,4 +73,15 @@ public static LeaveGroupResponse parse(ByteBuffer buffer, short versionId) {
public boolean shouldClientThrottle(short version) {
return version >= 2;
}

@Override
public boolean equals(Object other) {
return other instanceof LeaveGroupResponse &&
((LeaveGroupResponse) other).data.equals(this.data);
}

@Override
public int hashCode() {
return Objects.hashCode(data);
}
}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class LeaveGroupRequestTest {

@Test
public void testLeaveConstructor() {
final String groupId = "group_id";
final String memberId = "member_id";
final int throttleTimeMs = 10;

final LeaveGroupRequestData expectedData = new LeaveGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId);

final LeaveGroupRequest.Builder builder =
new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId));

for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
LeaveGroupRequest request = builder.build(version);
assertEquals(expectedData, request.data());

int expectedThrottleTime = version >= 1 ? throttleTimeMs
: AbstractResponse.DEFAULT_THROTTLE_TIME;
LeaveGroupResponse expectedResponse = new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code())
.setThrottleTimeMs(expectedThrottleTime)
);

assertEquals(expectedResponse, request.getErrorResponse(throttleTimeMs,
Errors.NOT_CONTROLLER.exception()));
}
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;

import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class LeaveGroupResponseTest {
private final int throttleTimeMs = 10;

@Test
public void testConstructor() {
Map<Errors, Integer> expectedErrorCounts = Collections.singletonMap(Errors.NOT_COORDINATOR, 1);

LeaveGroupResponseData responseData = new LeaveGroupResponseData()
.setErrorCode(Errors.NOT_COORDINATOR.code())
.setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(responseData.toStruct(version), version);

assertEquals(expectedErrorCounts, leaveGroupResponse.errorCounts());

if (version >= 1) {
assertEquals(throttleTimeMs, leaveGroupResponse.throttleTimeMs());
} else {
assertEquals(DEFAULT_THROTTLE_TIME, leaveGroupResponse.throttleTimeMs());
}

assertEquals(Errors.NOT_COORDINATOR, leaveGroupResponse.error());
}
}

@Test
public void testShouldThrottle() {
// A dummy setup is ok.
LeaveGroupResponse response = new LeaveGroupResponse(new LeaveGroupResponseData());
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
if (version >= 2) {
assertTrue(response.shouldClientThrottle(version));
} else {
assertFalse(response.shouldClientThrottle(version));
}
}
}

@Test
public void testEquality() {
LeaveGroupResponseData responseData = new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
LeaveGroupResponse primaryResponse = new LeaveGroupResponse(responseData.toStruct(version), version);

LeaveGroupResponse secondaryResponse = new LeaveGroupResponse(responseData.toStruct(version), version);

assertEquals(primaryResponse, primaryResponse);
assertEquals(primaryResponse, secondaryResponse);
assertEquals(primaryResponse.hashCode(), secondaryResponse.hashCode());
}
}
}

0 comments on commit f98e176

Please sign in to comment.