Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8443 Broker support for fetch from followers #6832

Merged
merged 56 commits into from Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
97edd2f
Copying work over from prototype PR
mumrah May 28, 2019
ab759b7
Don't check replica ID when handling fetch requests
mumrah May 28, 2019
6cc6f2c
Add PartitionInfo to ReplicaSelector interface
mumrah May 30, 2019
837e68f
Rename some classes
mumrah May 30, 2019
edbb322
backing out a file
mumrah May 30, 2019
adf3be5
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah May 30, 2019
17a0cee
Fix scala 2.11 compile error
mumrah May 31, 2019
4476cd3
Update FetchSession logic
mumrah Jun 3, 2019
1ba0c42
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 3, 2019
09bd67f
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 5, 2019
cb50eea
Expose partition read replica over JMX and add system test
mumrah Jun 7, 2019
8ccd43d
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 9, 2019
e68fe10
Feedback from PR on ReplicaSelector interface
mumrah Jun 11, 2019
c369c64
Feedback from PR for ReplicaManager
mumrah Jun 11, 2019
224531d
More PR feedback, fix the tests
mumrah Jun 11, 2019
c6052bc
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 11, 2019
8bb5af9
Fix logic error in ReplicaManager
mumrah Jun 11, 2019
7619ff8
More feedback from PR. Moving stuff around
mumrah Jun 12, 2019
61b500c
Fixup
mumrah Jun 12, 2019
55be218
Add missing case for fetch satisfaction
mumrah Jun 13, 2019
c4d1984
More feedback from PR
mumrah Jun 13, 2019
5ea8727
Fix fetch request test
mumrah Jun 14, 2019
80f1cd7
Add a couple unit tests for new DelayedFetch behavior
mumrah Jun 14, 2019
cff7193
Cleanup
mumrah Jun 14, 2019
13465a4
More feedback
mumrah Jun 17, 2019
e8ae4d6
Move "leader" to PartitionView, fix DelayedFetch logic
mumrah Jun 18, 2019
318d234
Fix DelayedFetch to work on non-leader
mumrah Jun 19, 2019
36b51e2
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 19, 2019
ccaf11a
Ensure that we get full LogOffsetMetadata when looking at offsets on …
mumrah Jun 19, 2019
80fc0f9
Fixup after merge from upstream
mumrah Jun 19, 2019
2143989
Change last caught up time to time since last caught up
mumrah Jun 19, 2019
a30d8b5
More fixup after merge from trunk
mumrah Jun 20, 2019
a065482
Don't fast return when HW is missing for non-sessionized requests
mumrah Jun 20, 2019
097445d
Fixing a test
mumrah Jun 20, 2019
a91da1a
Always get full offset snapshot
mumrah Jun 21, 2019
0a060ee
Remove unused code
mumrah Jun 21, 2019
c3f2e56
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 21, 2019
08fd4ed
Finishing touches
mumrah Jun 25, 2019
ba41b2e
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jun 25, 2019
df8a731
Return response immediately if there are no cached HW for the follower
mumrah Jun 27, 2019
81b6002
Don't return the leader as the preferred read replica
mumrah Jun 27, 2019
5158d03
Some cleanup from PR feedback
mumrah Jun 27, 2019
5114ae1
Let Replica track the follower's highwatermark rather than FetchSession
mumrah Jun 28, 2019
bb31468
Simplify the HW check a bit
mumrah Jul 1, 2019
4eaef8a
Change the way we update the HW in Replica
mumrah Jul 2, 2019
cd3c865
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jul 2, 2019
26385af
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jul 2, 2019
027ebaf
Add some commentary
mumrah Jul 2, 2019
99c8ee0
Increase collection time in test to reduce flakiness
mumrah Jul 2, 2019
345f5bf
More PR feedback
mumrah Jul 3, 2019
54fdd6b
Merge remote-tracking branch 'apache/trunk' into KAFKA-8443
mumrah Jul 3, 2019
250b8a5
Fix locking for HW and LSO in Log
mumrah Jul 3, 2019
2b27158
More cleanup
mumrah Jul 3, 2019
c0f2e7f
Remove LeaderReplicaSelector, make that the default behavior
mumrah Jul 3, 2019
35d3592
Add logging
mumrah Jul 3, 2019
4649c14
Separating out the system test
mumrah Jul 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;

import org.apache.kafka.common.TopicPartition;

import java.util.Optional;

public class LeaderReplicaSelector implements ReplicaSelector {
@Override
public Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView) {
return partitionView.leader();
}
}
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;

import org.apache.kafka.common.TopicPartition;

import java.util.Comparator;
import java.util.Optional;

public class MostCaughtUpReplicaSelector implements ReplicaSelector {
mumrah marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView) {
return partitionView.replicas().stream()
.max(Comparator.comparing(ReplicaView::logOffset)
.thenComparing(ReplicaView::lastCaughtUpTimeMs)
.thenComparing(replicaInfo -> replicaInfo.endpoint().id()));
}
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;

import org.apache.kafka.common.TopicPartition;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class RackAwareReplicaSelector implements ReplicaSelector {

private final MostCaughtUpReplicaSelector tieBreaker = new MostCaughtUpReplicaSelector();

@Override
public Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView) {
if (clientMetadata.rackId != null && !clientMetadata.rackId.isEmpty()) {
Set<ReplicaView> sameRackReplicas = partitionView.replicas().stream()
.filter(replicaInfo -> clientMetadata.rackId.equalsIgnoreCase(replicaInfo.endpoint().rack()))
.collect(Collectors.toSet());
if (sameRackReplicas.isEmpty()) {
return partitionView.leader();
} else {
Optional<ReplicaView> leader = partitionView.leader().filter(sameRackReplicas::contains);
if (leader.isPresent()) {
// Use the leader if it's in this rack
return leader;
} else {
// Otherwise, get the most caught-up replica
PartitionView sameRackPartition = new PartitionView() {
@Override
public Set<ReplicaView> replicas() {
return sameRackReplicas;
}

@Override
public Optional<ReplicaView> leader() {
return partitionView.leader();
}
};
return tieBreaker.select(topicPartition, clientMetadata, sameRackPartition);
mumrah marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
return partitionView.leader();
}
}
}
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* Pluggable interface to select a preferred read replica given the current set of replicas for a partition
* and information from the client.
*/
public interface ReplicaSelector extends Configurable, Closeable {

/**
* Select the preferred replica a client should use for fetching. If no replica is available, this will return an
* empty optional.
*/
Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView);
@Override
default void close() throws IOException {
// No-op by default
}

@Override
default void configure(Map<String, ?> configs) {
// No-op by default
}

/**
* Holder for all the client metadata required to determine a preferred replica.
*/
class ClientMetadata {
mumrah marked this conversation as resolved.
Show resolved Hide resolved
public static final ClientMetadata NO_METADATA = new ClientMetadata("", "", null, null, null);

public final String rackId;
public final String clientId;
public final InetAddress clientAddress;
public final KafkaPrincipal principal;
public final String listenerName;

public ClientMetadata(String rackId, String clientId, InetAddress clientAddress, KafkaPrincipal principal, String listenerName) {
this.rackId = rackId;
this.clientId = clientId;
this.clientAddress = clientAddress;
this.principal = principal;
this.listenerName = listenerName;
}
}

/**
* View of a partition used by {@link ReplicaSelector} to determine a preferred replica.
*/
interface PartitionView {
Set<ReplicaView> replicas();

Optional<ReplicaView> leader();
}

/**
* View of a replica used by {@link ReplicaSelector} to determine a preferred replica.
*/
interface ReplicaView {
boolean isLeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of this since we expose the leader directly from PartitionView?


Node endpoint();

long logOffset();
mumrah marked this conversation as resolved.
Show resolved Hide resolved

long lastCaughtUpTimeMs();
mumrah marked this conversation as resolved.
Show resolved Hide resolved
}

}


Expand Up @@ -329,7 +329,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(highWatermark);
result = 31 * result + Long.hashCode(lastStableOffset);
result = 31 * result + Long.hashCode(logStartOffset);
result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0);
result = 31 * result + Objects.hashCode(preferredReadReplica);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In FetchRequest.java, it would be useful to add a comment above the following line to summarize the changes.

private static final Schema FETCH_REQUEST_V11 = new Schema(

result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
result = 31 * result + (records != null ? records.hashCode() : 0);
return result;
Expand Down
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.junit.Test;

import java.net.InetAddress;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ReplicaSelectorTest {
@Test
public void testLeaderSelector() {
TopicPartition tp = new TopicPartition("test", 0);

Set<ReplicaSelector.ReplicaView> replicaViewSet = replicaInfoSet();
ReplicaSelector.PartitionView partitionView = partitionInfo(replicaViewSet);

ReplicaSelector selector = new LeaderReplicaSelector();
Optional<ReplicaSelector.ReplicaView> selected;

selected = selector.select(tp, ReplicaSelector.ClientMetadata.NO_METADATA, partitionView);
assertOptional(selected, replicaInfo -> {
assertTrue(replicaInfo.isLeader());
assertEquals(replicaInfo.endpoint().id(), 0);
});

selected = selector.select(tp, ReplicaSelector.ClientMetadata.NO_METADATA, partitionInfo(Collections.emptySet()));
assertFalse(selected.isPresent());
}

@Test
public void testSameRackSelector() {
TopicPartition tp = new TopicPartition("test", 0);

Set<ReplicaSelector.ReplicaView> replicaViewSet = replicaInfoSet();
ReplicaSelector.PartitionView partitionView = partitionInfo(replicaViewSet);

ReplicaSelector selector = new RackAwareReplicaSelector();
Optional<ReplicaSelector.ReplicaView> selected = selector.select(tp, metadata("rack-b"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect replica to be in rack-b", replicaInfo.endpoint().rack(), "rack-b");
assertEquals("Expected replica 3 since it is more caught-up", replicaInfo.endpoint().id(), 3);
});

selected = selector.select(tp, metadata("not-a-rack"), partitionView);
assertOptional(selected, replicaInfo -> {
assertTrue("Expect leader when we can't find any nodes in given rack", replicaInfo.isLeader());
});

selected = selector.select(tp, metadata("rack-a"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect replica to be in rack-a", replicaInfo.endpoint().rack(), "rack-a");
assertTrue("Expect the leader since it's in rack-a", replicaInfo.isLeader());
});


}

static Set<ReplicaSelector.ReplicaView> replicaInfoSet() {
return Stream.of(
replicaInfo(new Node(0, "host0", 1234, "rack-a"), true, 4, 10),
replicaInfo(new Node(1, "host1", 1234, "rack-a"), false, 2, 5),
replicaInfo(new Node(2, "host2", 1234, "rack-b"), false, 3, 7),
replicaInfo(new Node(3, "host3", 1234, "rack-b"), false, 4, 8)

).collect(Collectors.toSet());
}

static ReplicaSelector.ReplicaView replicaInfo(Node node, boolean isLeader, long logOffset, long lastCaughtUpTimeMs) {
return new ReplicaSelector.ReplicaView() {

@Override
public boolean isLeader() {
return isLeader;
}

@Override
public Node endpoint() {
return node;
}

@Override
public long logOffset() {
return logOffset;
}

@Override
public long lastCaughtUpTimeMs() {
return lastCaughtUpTimeMs;
}
};
}

static ReplicaSelector.PartitionView partitionInfo(Set<ReplicaSelector.ReplicaView> replicaViewSet) {
return new ReplicaSelector.PartitionView() {
@Override
public Set<ReplicaSelector.ReplicaView> replicas() {
return replicaViewSet;
}

@Override
public Optional<ReplicaSelector.ReplicaView> leader() {
return replicaViewSet.stream().filter(ReplicaSelector.ReplicaView::isLeader).findFirst();
}
};
}

static ReplicaSelector.ClientMetadata metadata(String rack) {
return new ReplicaSelector.ClientMetadata(rack, "test-client",
InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, "test");

}
}