Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3096; Leader is not set to -1 when it is shutdown if followers are down #765

Closed
wants to merge 10 commits into from

Conversation

ijuma
Copy link
Contributor

@ijuma ijuma commented Jan 13, 2016

Update leader to -1 before throwing NoReplicaOnlineException in OfflinePartitionLeaderSelector as suggested by Guozhang. This fixes the test, which is great, but it seems a bit out of place. Would it be better to change the code somewhere else?

Fix bug in waitUntilLeaderIsElectedOrChanged and simplify result type. The bug was for the following case:

leader.isDefined && oldLeaderOpt.isEmpty && newLeaderOpt.isDefined && newLeaderOpt.get != leader.get

We would consider it a successful election even though we should not. I also changed the result type as we never return None (we throw an exception instead). Fixing this bug is what uncovered the leader issue being solved in this PR.

Also included:

  • Various mechanical clean-ups found while trying to understand the code (as usual, this is in separate commits, so I can submit them separately if desired).
  • Make logging more regular in PartitionLeaderSelector.scala

The bug was for the following case:

`leader.isDefined && oldLeaderOpt.isEmpty && newLeaderOpt.isDefined && newLeaderOpt.get != leader.get`

We would consider it a successful election even though we should not.

I also changed the result type is we never return `None` (we throw an exception instead).
@ijuma
Copy link
Contributor Author

ijuma commented Jan 13, 2016

Review by @guozhangwang

topicAndPartition.partition, currentLeaderAndIsr.copy(leader = -1), controllerContext.epoch,
currentLeaderAndIsr.zkVersion)
zkUpdateSucceeded = updateSucceeded
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we update the leader before throwing NoReplicaOnlineException.

var zkUpdateSucceeded = false
while (!zkUpdateSucceeded) {
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topicAndPartition.topic,
topicAndPartition.partition, currentLeaderAndIsr.copy(leader = -1), controllerContext.epoch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "LeaderAndIsr.NoLeader" instead of -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will do.

@guozhangwang
Copy link
Contributor

LGTM overall. Personally I feel this additional logic in "selectLeader" does make sense, since in general we may not be able to trigger the last "remove replica" event when the failed replica is the controller itself, and hence from the view of the new controller the old replica leader was never failed but just no longer in the aliveBroker list any more.

So before we abort the leader election process due to either error or configured mechanism (e.g. unclean leader election not allowed), we may need to update the corresponding "source-of-truth" to reflect the abortion, that the new leader was not successfully elected, and hence the partition should be "offline".

An alternative way to do this, is when new controller is elected (in ControllerFailover), let it to browse through the ZK's stored leaderAndISRs and check if any of the brokers are no longer in the aliveBroker list, and if any trigger onBrokerFailured on each one of them. This may also be as efficient since we are already reading all the topics' replica assignment anyways in ControllerFailover.

As suggested by Guozhang.
@ijuma
Copy link
Contributor Author

ijuma commented Jan 13, 2016

Addressed your comment @guozhangwang and thanks for the review. It may be worth getting @junrao's opinion as well on which of the two approaches we should go for.

@ijuma
Copy link
Contributor Author

ijuma commented Jan 13, 2016

Started the system test branch builder:

https://jenkins.confluent.io/job/kafka_system_tests_branch_builder/337/

@ijuma
Copy link
Contributor Author

ijuma commented Jan 14, 2016

The failures in the system tests seem to be happening in the trunk build too. Will try again after the following fix to the system tests is merged:

#770

…ould-be-set-to--1

* apache/trunk: (28 commits)
  MINOR: Vagrant AWS overrideable EC2 instance name prefix
  KAFKA-2146: Fix adding partition did not find the correct startIndex.
  MINOR: Upgrade note on compacted topics behaviour on receiving message without key
  KAFKA-3080: Fix ConsoleConsumerTest by checking version when service is started
  KAFKA-2071: Replace Producer Request/Response with their org.apache.kafka.common.requests equivalents
  KAFKA-3121: Remove aggregatorSupplier and add Reduce functions
  KAFKA-3122; Fix memory leak in `Sender.completeBatch` on TOPIC_AUTHORIZATION_FAILED
  MINOR: Avoid unnecessary `ConcurrentHashMap.get`
  MINOR: Fix javadoc for `PartitionInfo.leader()`
  MINOR: MemoryRecords.sizeInBytes throws NPE when non-writable.
  KAFKA-3091: Broker persists generated ID even when the ID can't be used due to duplicates
  KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade
  KAFKA-3119: Adding -daemon option to zookeeper-server-start.sh USAGE, similar to kafka-server-start.sh
  MINOR: complete built-in stream aggregate functions
  KAFKA-3104: add windowed aggregation to KStream
  KAFKA-2999: Errors enum should be a 1 to 1 mapping of error codes and…
  KAFKA-3095: Add documentation on format of sasl.kerberos.principal.to.local.rules
  KAFKA-3098: "partition.assignment.strategy" appears twice in documentation
  KAFKA-2695: limited support for nullable byte arrays
  KAFKA-3105: Use `Utils.atomicMoveWithFallback` instead of `File.rename`
  ...
@ijuma
Copy link
Contributor Author

ijuma commented Jan 22, 2016

@guozhangwang
Copy link
Contributor

@junrao do you want to chime in to take a look at the proposed solution?

@junrao
Copy link
Contributor

junrao commented Jan 25, 2016

Thanks for the patch. A few high level comments.

  1. In OfflinePartitionLeaderSelector, if we can't find a new live leader (whether due to unclean leader election being disabled or not), it seems it's better to set the new leader to -1, instead of throwing an exception and leave the old leader in ZK. We need to make sure that the rest of the logic continues to work with a -1 leader. For example, if a follower receives a -1 leader, it probably should stop fetching from the current leader, but not start fetching again. Currently, in ReplicaManager.makeFollowers(), it seems that if the new leader is -1, we don't complete the follower transition and therefore don't stop the fetching from the current leader.
  2. This is related to the issue in KAFKA-3143. Currently, if a follower goes down, the logic to remove this replica from ISR is handled by transitioning the replica to OfflineReplica in ReplicaStateMachine. However, this logic is not called during controller failover. Suppose that broker 1 is a follower for a partition and is also the controller. If we hard kill broker 1 (therefore doesn't trigger the controlled shutdown logic), the new controller won't remove replica 1 from ISR. Instead, it relies on the leader to take replica 1 out of ISR after replica.lag.time.max.ms. Ideally, the new controller should shrink the ISR during initialization since this can be faster than waiting for the leader timeout. It seems that during the controller failover, we should also transition failed replicas to OfflineReplica too. It would be good to check if there are any other state transitions (PartitionState or ReplicaState) that are missing during the controller failover.

"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
var zkUpdateSucceeded = false
while (!zkUpdateSucceeded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure why we added the while loop here since there is already a while loop in PartitionStateMachine.electLeaderForPartition().

@wushujames
Copy link
Contributor

Hi, what's the user-visible impact of this and https://issues.apache.org/jira/browse/KAFKA-3143? From @junrao's previous comment, I can't tell whether this is a bug, or simply an optimization (reduce fetch requests to someone who isn't the leader, speed up leader failover).

I'm seeing some weirdness with regards to unclean leader election, and am wondering if this is related.

…ould-be-set-to--1

* apache/trunk: (58 commits)
  KAFKA-3207: Fix StateChangeLogger to use the right topic name
  MINOR: log connect reconfiguration error only if there was an error
  HOTFIX: fix partition ordering in assignment
  MINOR: Removed unnecessary Vagrantfile hack
  HOTFIX: fix broken WorkerSourceTask test
  HOTFIX: temp fix for ktable look up
  KAFKA-3198: Ticket Renewal Thread exits prematurely due to inverted c…
  MINOR: Fix restoring for source KTable
  KAFKA-3194: Validate security.inter.broker.protocol against the adver…
  MINOR: Some more Kafka Streams Javadocs
  KAFKA-3092: Replace SinkTask onPartitionsAssigned/onPartitionsRevoked with open/close
  KAFKA-3195; Transient test failure in OffsetCheckpointTest.testReadWrite
  MINOR: Increment ducktape dependency
  KAFKA-3121: Refactor KStream Aggregate to be Lambda-able.
  MINOR: some javadocs for kstream public api
  MINOR: fix the logic of RocksDBWindowStore using RocksDBStore Segments
  KAFKA-3170; Set default fetch_min_bytes in new consumer to 1
  KAFKA-3068: Remove retry with nodesEverSeen
  MINOR: Reorder StreamThread shutdown sequence
  KAFKA-3060: Refactor MeteredStore and RockDBStore Impl
  ...
…ould-be-set-to--1

* apache/trunk: (71 commits)
  KAFKA-3280: KafkaConsumer Javadoc contains misleading description of heartbeat behavior and correct use
  MINOR: Validate inner message compression attribute
  KAFKA-3292; ClientQuotaManager.getOrCreateQuotaSensors() may return a null ClientSensors.throttleTimeSensor
  MINOR: Connect status tracking API followup
  KAFKA-2698: Add paused() method to o.a.k.c.c.Consumer
  MINOR: enhance streams system test
  MINOR: add retry to state dir locking
  KAFKA-3259 KAFKA-3253; KIP-31/KIP-32 Follow-up
  MINOR: add useful debug log messages to KConnect source task execution
  MINOR: Add note about which files need to be edited when updating the version number
  KAFKA-3277; Update trunk version to be 0.10.0.0-SNAPSHOT
  KAFKA-3093: Add Connect status tracking API
  KAFKA-3272: Add debugging options to kafka-run-class.sh so we can easily run remote debugging
  KAFKA-3007: implement max.poll.records (KIP-41)
  HOTFIX: Add missing file for KeyValue unit test
  MINOR: KTable.count() to only take a selector for key
  KAFKA-3046: Add ByteBuffer Serializer and Deserializer
  HOTFIX: fix consumer config for streams
  KAFKA-3245: config for changelog replication factor
  KAFKA-3242: minor rename / logging change to Controller
  ...
…ould-be-set-to--1

* apache/trunk: (110 commits)
  KAFKA-3412: multiple asynchronous commits causes send failures
  MINOR: update new version in additional places
  MINOR: fix documentation version
  MINOR: Add InterfaceStability.Unstable annotations to some Kafka Streams public APIs
  Changing version to 0.10.1.0-SNAPSHOT
  MINOR: Fix FetchRequest.getErrorResponse for version 1
  KAFKA-3378; Follow-up to ensure we `finishConnect` for immediately connected keys
  MINOR: Add vagrant up wrapper for simple parallel bringup on aws
  KAFKA-3328: SimpleAclAuthorizer can lose ACLs with frequent add/remov…
  KAFKA-3424: Add CORS support to Connect REST API
  KAFKA-3427: broker can return incorrect version of fetch response when the broker hits an unknown exception
  KAFKA-3378; Client blocks forever if SocketChannel connects instantly
  KAFKA-3006: standardize KafkaConsumer API to use Collection
  KAFKA-3397: use -1(latest) as time default value for tools.GetOffsetShell
  KAFKA-3394; allow null offset metadata in commit API
  KAFKA-3422: Add overloading functions without serdes in Streams DSL
  HOTFIX: Renamed tests to match expected suffix
  KAFKA-2982; Mark the old Scala producer and related classes as deprecated
  KAFKA-3336: Unify Serializer and Deserializer into Serialization
  KAFKA-3202: System test that changes message version on the fly
  ...
…ould-be-set-to--1

* apache/trunk: (63 commits)
  KAFKA-3521: validate null keys in Streams DSL implementations
  KAFKA-3528: handle wakeups while rebalancing more gracefully
  MINOR: improve logging of consumer system tests
  KAFKA-725: Return OffsetOutOfRange error from ReplicaManager when non-follower attempts reading an offset that's above high watermark.
  KAFKA-3512: Added foreach operator
  KAFKA-3505: Fix punctuate generated record metadata
  MINOR: ensure original use of prop_file in verifiable producer
  KAFKA-3488; Avoid failing of unsent requests in consumer where possible
  KAFKA-3497: Streams ProcessorContext should support forward() based on child name
  KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks
  KAFKA-3508: Fix transient SimpleACLAuthorizerTest failures
  KAFKA-3489; Update request metrics if a client closes a connection while the broker response is in flight
  KAFKA-3510; OffsetIndex thread safety
  KAFKA-3384: Conform to POSIX kill usage
  KAFKA-2998: log warnings when client is disconnected from bootstrap brokers
  KAFKA-3464: Add system tests for Connect with Kafka security enabled
  HOTFIX: set timestamp in SinkNode
  KAFKA-3483: Restructure ducktape tests to simplify running subsets of tests
  MINOR: Clean up of SourceTaskOffsetCommiter
  MINOR: Fix small typo in design section
  ...
@guozhangwang
Copy link
Contributor

@ijuma Is this still valid? If yes could you close it?

@asfbot
Copy link

asfbot commented Dec 21, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/343/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 21, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/345/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 21, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/344/
Test FAILed (JDK 8 and Scala 2.12).

@ijuma
Copy link
Contributor Author

ijuma commented May 21, 2019

Addressed by @omkreddy via #5041.

@ijuma ijuma closed this May 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants