Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2247: Merge kafka.utils.Time and kafka.common.utils.Time #2095

Closed

Conversation

ijuma
Copy link
Contributor

@ijuma ijuma commented Nov 3, 2016

Also:

  • Make all implementations of Time thread-safe as they are accessed from multiple threads in some cases.
  • Change default implementation of MockTime to use two separate variables for nanoTime and currentTimeMillis as they have different origins.

@ijuma ijuma force-pushed the kafka-2247-consolidate-time-interfaces branch from 5126636 to 1d73d10 Compare November 3, 2016 11:25
@ijuma ijuma force-pushed the kafka-2247-consolidate-time-interfaces branch from 1d73d10 to 3fbb64a Compare November 3, 2016 12:34
@shikhar
Copy link
Contributor

shikhar commented Nov 3, 2016

Maybe worth addressing https://issues.apache.org/jira/browse/KAFKA-4356 in this PR


private val lock = new Object
private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
private var periodStartNs: Long = time.nanoseconds
private var observedSoFar: Double = 0.0

def maybeThrottle(observed: Double) {
val msPerSec = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong preference but maybe better here:
TimeUnit.SECONDS.toMillis(1)
TimeUnit.SECONDS.toNanos(1)

those methods do perform more math though, not sure whether this is a hot method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those methods don't work with doubles and they rely on integer division. I used the TimeUnit methods where possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both these vals are Int though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course they are. But the calculations where they are used involve doubles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean, I can make that change.

@@ -21,13 +21,28 @@
*/
public interface Time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on Javadoc and the nanoseconds caveat

private long autoTickMs = 0;
private final long autoTickMs;

// Values from `nanoTime` and `currentTimeMillis` are not comparable, so we store them separately to catch bugs
Copy link
Contributor

@shikhar shikhar Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this class make sense but not sure the comment is accurate in terms of catching bugs - don't see any tracking of incorrect usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that you would assert in the test, this class just provides some plumbing. With the previous code, incorrect usage of System.nanoTime versus System.currentTimeMillis would go unnoticed even if you asserted since they would always have the same "origin". How do you suggest we make the comment clearer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, maybe
s/"so we store them separately to catch bugs where this is incorrectly assumed to be true"/"so we store them separately to allow for catching bugs where this is incorrectly assumed to be true"/

@shikhar
Copy link
Contributor

shikhar commented Nov 3, 2016

LGTM aside from minor comments.

@ijuma
Copy link
Contributor Author

ijuma commented Nov 3, 2016

Thanks for the review @shikhar. There's a separate PR (#2100) for KAFKA-4356. I addressed the other comments.

@ijuma
Copy link
Contributor Author

ijuma commented Nov 4, 2016

Build passed. @hachikuji, maybe you can take a look when you have time. The result of a bit of time and no internet. :)

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma This mostly looks good, had a few nits. But I also haven't gotten a clean run of streams tests. For example, this failure seems consistent:

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > queryOnRebalance[0] FAILED
    java.lang.AssertionError: Condition not met within timeout 30000. waiting for metadata, store and value to be non null
        at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:279)
        at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.verifyAllKVKeys(QueryableStateIntegrationTest.java:263)
        at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.queryOnRebalance(QueryableStateIntegrationTest.java:356)

This is merged with trunk and I'm getting clean runs there, so I think this is somehow related to these changes. (Notably, the integration test surprisingly uses kafka.utils.MockTime.)

/**
* A time implementation that uses the system clock and sleep call
* A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance
* of this class.
*/
public class SystemTime implements Time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this could just be made package private now. Are you maintaining for compatibility with anyone who was relying on this even though it's not really considered public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought it wasn't worth breaking all the existing users since it's just a minor optimisation to reuse the same instance even though it's an internal class. But I don't feel strongly and it was a close call. So I am happy to change it if you think we should.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happier maintaining compatibility, just wanted to make sure we were doing so knowingly and intentionally.

@@ -63,7 +63,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val clusterId: String) extends Logging {
val clusterId: String,
time: Time) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just for completeness in KafkaServer.scala? We don't seem to take advantage of it in any test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just a little more consistent.

@@ -48,7 +49,7 @@ class LogManager(val logDirs: Array[File],
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
time: Time) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we lost the private val here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private val is not necessary here. A constructor without any qualifier in a plain class (i.e. not a case class) is not available outside the class body (i.e. constructor). And a field won't be created unless it's used outside the constructor (whereas val causes the field to be created, which is a bit odd if it's private, it's only needed in edge cases like if you want the field to be serialised or something like that).

import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

import org.apache.kafka.common.utils.{SystemTime, Time}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemTime import is no longer used.

import org.apache.kafka.common.requests.{AbstractRequestResponse, AbstractRequest}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
import org.apache.kafka.common.utils.{SystemTime, Time}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemTime no longer used. (And while you're at it given similar changes elsewhere, RequestOrResponse, AbstractRequest, and AbstractRequestResponse aren't used either.)

@@ -30,11 +30,9 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime

import org.apache.kafka.common.utils.{SystemTime, Time}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemTime import no longer used

…te-time-interfaces

* apache/trunk:
  MINOR: Fix re-raise of python error in system tests
  MINOR: missing fullstop in doc for `max.partition.fetch.bytes`
  KAFKA-4372: Kafka Connect REST API does not handle DELETE of connector with slashes in their names
  MINOR: Add description of how consumer wakeup acts if no threads are awakened
  KAFKA-4024; Override client metadata backoff on topic changes and avoid unnecessary connections
  MINOR: Remove stray `%s` in `TopicMetadata.toString`
@ijuma
Copy link
Contributor Author

ijuma commented Nov 7, 2016

Thanks for the review @ewencp. I addressed the nits and left a question on the constructor visibility for SystemTime.

With regards to QueryableStateIntegrationTest, I was able to reproduce it locally after a few successful runs. Seems like there is an error acquiring a lock (it seems like there is a timestamp in the store name):

Caused by: org.rocksdb.RocksDBException: IO error: lock /var/folders/nv/h6d0v4151m74j_9qlc4cg6bc0000gn/T/qs-test5092372324958375949/queryable-state-3/2_0/windowed-word-count-store-stream-three-3/windowed-word-count-store-stream-three-3-201611070000/LOCK: No locks available
    at org.rocksdb.RocksDB.open(Native Method)
    at org.rocksdb.RocksDB.open(RocksDB.java:184)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189)

Full output in the following gist:

https://gist.github.com/ijuma/32001cef1a0fe6e8479600135ec69e40

@dguy, any ideas why we'd get the failure above?

@dguy
Copy link
Contributor

dguy commented Nov 7, 2016

@Ismael - Looking at the stack trace it would seem the group has rebalanced, but at least one of the StreamThreads hasn't stopped yet and still owns the lock for that store. Will have to see if i can reproduce.

…te-time-interfaces

* apache/trunk:
  KAFKA-3829: Ensure valid configuration prior to creating connector
  MINOR: Fix export command for additional env vars in connect system tests
  KAFKA-4081; KafkaConsumer should not allow negative offsets to be committed
  KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger
  MINOR: Extend mirror maker test to include interceptors
  HOTFIX: failing to close this iterator causes leaks in rocksdb
  MINOR: improve exception message for incompatible Serdes to actual key/value data types
  revert streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
  MINOR: add upgrade guide for Kafka Streams API
  KAFKA-4364: Remove secrets from DEBUG logging
  MINOR: fix incorrect logging in StreamThread
  MINOR: remove unused fields from KTableImpl
  KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect ProcessorNode
  MINOR: fix typos and incorrect docs
  KAFKA-4360:Controller may deadLock when autoLeaderRebalance encounter zk expired
  MINOR: some trace logging for streams debugging
  MINOR: remove commented out code and System.out.println
  KAFKA-4284: Make Partitioner a Closeable and close it when closing the producer
  MINOR: Fix regex on connector path param in ConnectorsResource
…te-time-interfaces

* apache/trunk:
  KAFKA-2066; Use client-side FetchRequest/FetchResponse on server
  KAFKA-4409; Fix deadlock between topic event handling and shutdown in ZookeeperConsumerConnector
…te-time-interfaces

* apache/trunk:
  HOTFIX: Hotfix streams smoke test
  KAFKA-3703; Graceful close for consumers and producer with acks=0
  KAFKA-4420; Group StopReplicaRequests for partitions on the same broker into one StopReplicaRequest
  KAFKA-4377; remove deprecated scala.collection.JavaConversions calls
  KAFKA-4417: Update build dependencies for 0.10.2 cycle
  KAFKA-3825: Allow users to specify different types of state stores in Streams DSL
  KAFKA-4366: KafkaStreams.close() blocks indefinitely
  KAFKA-4376; Cross compile to Scala 2.12.0
  MINOR: Extract SCALA_BINARY_VERSION from SCALA_VERSION
  KAFKA-4211; Update system test services to use the new consumer by default
  KAFKA-4359: Remove commit interval in integration tests for testing caching effects
  MINOR: add a space to separate two words
  MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter
  MINOR: Clarify how to fix conversion issues when plain JSON data is used with schemas.enable=true
  HOTFIX: Remove failing ConnectDistributedTest.test_bad_connector_class
@ijuma
Copy link
Contributor Author

ijuma commented Nov 19, 2016

With @dguy's help, we worked out the change that was causing the failing test in Streams: we now pass MockTime instead of SystemTime to GroupCoordinator (since we now use a single Time instance in KafkaServer whereas before we used SystemTime for any class that required the clients Time interface).

The root cause seems to be that MockTime is not thread-safe. After fixing that, the test seems to pass. When running it locally in a loop, I still get the occasional failure due to a ZK connect timeout, but that cannot be related to this PR as far as I can see.

…te-time-interfaces

* apache/trunk:
  KAFKA-4395; Break static initialization order dependency between KafkaConfig and Logconfig
  KAFKA-4362; Consumer can fail after reassignment of the offsets topic partition
  KAFKA-4355: Skip topics that have no partitions
  MINOR: Typo in KafkaConsumer javadoc
Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I don't see anything obviously wrong aside from potential multithreading issues. However, my current concern is that I can very consistently trigger failures in streams:test with this patch whereas master currently passed a bunch of times in a row with no issues.

}

@Override
public void sleep(long ms) {
this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
timeMs.addAndGet(ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating these independently seems risky. If two threads interleave updates via sleep the end result will be ok. But even with only 1 updating thread, another thread reading the values via the other methods could observe different values when nanoseconds()/hiResClockMs() calls are mixed with milliseconds() calls. Just doing a quick search, this does happen, e.g. in BufferPool. Is there a reason not to just make these methods all synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I did it like that because these time sources are updated independently in the real implementations so code that relies on them being updated atomically is probably buggy. Thanks for the BufferPool reference, I'll check it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked BufferPool and usage seems fine. The other places where we use nanoseconds and hiResTimerMs seem fine too.

val scheduler = new MockScheduler(this)

override def sleep(ms: Long) {
super.sleep(ms)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @volatile in the original suggests this was being used in multi-threaded tests. Related to other question re: synchronization, are we sure this isn't being used in a way that expects the scheduler tasks to run synchronized w/ the entire sleep() call?

@@ -19,31 +19,37 @@
import org.apache.kafka.common.utils.Time;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* A clock that you can manually advance by calling sleep
*/
public class MockTime implements Time {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why don't we use the MockTime from common utils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I assumed that there was a desire to avoid depending on the test jar for clients in connect tests, so I left this class in. Happy to remove it if you and @ewencp think it should be removed.

@@ -103,20 +103,22 @@ class Log(val dir: File,
else
0
}
val t = time.milliseconds

private[this] val t = time.milliseconds

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this patch, but since we're here, maybe we can choose a better name? It actually doesn't even seem like this needs to be a field since it's only used in a log message on startup. (Feel free to ignore this comment if you think it's too far out of scope)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into changing this from a val to a local before settling for private[this] and my preferred solution required more changes than I would like to include in this PR. Thinking about it some more, there is a simpler change that I think is OK to include so doing that.

@hachikuji
Copy link

@ijuma Thanks for the nice cleanup. Left a couple comments, but LGTM.

@hachikuji
Copy link

On second thought, I'm getting some streams test failures I haven't seen before when building locally. For example:

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > shouldCountClicksPerRegion[1] FAILED
    java.lang.AssertionError: Condition not met within timeout 30000. Did not receive 3 number of records
        at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:279)
        at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
        at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:180)
        at org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:296)

@ijuma
Copy link
Contributor Author

ijuma commented Nov 28, 2016

OK, I think it may be best to pass Time.SYSTEM from KafkaServer to GroupCoordinator to maintain the existing behaviour and to investigate why this is needed for the streams test to pass consistently in a separate JIRA. I updated the PR with this change and will file the JIRA if you agree.

@ewencp
Copy link
Contributor

ewencp commented Dec 1, 2016

@ijuma This LGTM but there were a few streams changes in the mean time that broke things. There's a trivial merge conflict and a new use of SystemTime. If these get cleaned up, this looks good to merge.

…te-time-interfaces

* apache/trunk:
  KAFKA-4443; Minor comment clean-up
  KAFKA-3637: Added initial states
  KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy
  KAFKA-4271: Fix the server start script for Windows 32-bit OS
  KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat <gharatmayuresh15@gmail.com> and Sumant Tambe <sutambe@yahoo.com>
  KAFKA-4387; KafkaConsumer should properly handle interrupts
  KAFKA-4403; Update KafkaBasedLog to use new endOffsets consumer API
  KAFKA-4397: Refactor Connect backing stores for thread safety
  KAFKA-4415; Reduce time to create and send UpdateMetadataRequest
  MINOR: Make release notes script check resolutions to avoid spurious inclusion of non-fix 'fixes' in release notes.
  KAFKA-4427: Skip topic groups with no tasks
  MINOR: Remove unused code in `LeaderAndIsr`, `ApiUtils` and `TopicMetadataRequest`
  KAFKA-4443; Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest during failover
  MINOR: doc fix related to monitoring consumer lag
  Revert "KAFKA-4345; Run decktape test for each pull request"
  KAFKA-4384; ReplicaFetcherThread stops after ReplicaFetcherThread receives a corrupted message
  KAFKA-4345; Run decktape test for each pull request
  KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic
@ewencp
Copy link
Contributor

ewencp commented Dec 2, 2016

Sigh, @ijuma one last test is failing for me still:

org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
    java.lang.RuntimeException: Request GROUP_COORDINATOR failed on brokers List(127.0.0.1:49709 (id: -1 rack: null))
        at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:63)
        at kafka.admin.AdminClient.findCoordinator(AdminClient.scala:68)
        at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:134)
        at org.apache.kafka.streams.integration.ResetIntegrationTest$WaitUntilConsumerGroupGotClosed.conditionMet(ResetIntegrationTest.java:425)
        at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:270)
        at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:157)

So close...

@ijuma
Copy link
Contributor Author

ijuma commented Dec 2, 2016

@ewencp I think that is an existing issue: https://github.com/apache/kafka/pull/2199/files

@ijuma
Copy link
Contributor Author

ijuma commented Dec 2, 2016

Looks like @guozhangwang may have a fix for the issue, so happy to wait until that is sorted to make sure it is indeed that.

@ewencp
Copy link
Contributor

ewencp commented Dec 2, 2016

@ijuma Given that it's happening already on trunk I'm not that worried. This LGTM otherwise, so feel free to commit if you want or I'll wait until we can get a clean test run on the merged version. I just don't want this to linger too much longer since it's so easy to break the patch with new tests.

…te-time-interfaces

* apache/trunk:
  KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit
  KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset commit
  KAFKA-3008: Parallel start and stop of connectors and tasks in Connect
@ijuma
Copy link
Contributor Author

ijuma commented Dec 2, 2016

Thanks @ewencp. I looked into it and the changes in this PR uncovered a minor bug in the test, which was easy to fix: disable expiration of connections by the broker. @enothereska and/or @dguy, can either of you take a look at the trivial change in 5d823b4 ?

@enothereska
Copy link
Contributor

LGTM cc @mjsax

@asfgit asfgit closed this in 128d0ff Dec 2, 2016
@ijuma
Copy link
Contributor Author

ijuma commented Dec 2, 2016

Merged to trunk. There are still occasional failures for ResetIntegrationTest, but that's already happening in trunk.

@ijuma
Copy link
Contributor Author

ijuma commented Dec 2, 2016

Filed https://issues.apache.org/jira/browse/KAFKA-4479 about removing hardcoded Time.SYSTEM in GroupCoordinator.

@ijuma ijuma deleted the kafka-2247-consolidate-time-interfaces branch January 3, 2017 00:49
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Also:
* Make all implementations of `Time` thread-safe as they are accessed from multiple threads in some cases.
* Change default implementation of `MockTime` to use two separate variables for `nanoTime` and `currentTimeMillis` as they have different `origins`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Shikhar Bhushan <shikhar@confluent.io>, Jason Gustafson <jason@confluent.io>, Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes apache#2095 from ijuma/kafka-2247-consolidate-time-interfaces
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants