Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8791: RocksDBTimestampedStore should open in regular mode by default #7201

Merged
merged 3 commits into from Aug 14, 2019

Conversation

@mjsax
Copy link
Member

commented Aug 13, 2019

No description provided.

@mjsax mjsax added the streams label Aug 13, 2019

@@ -247,6 +308,9 @@ private void verifyOldAndNewColumnFamily() throws Exception {
assertThat(db.get(withTimestampColumnFamily, "key11".getBytes()).length, is(21));
assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), new IsNull<>());

// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
noTimestampColumnFamily.close();
withTimestampColumnFamily.close();

This comment has been minimized.

Copy link
@mjsax

mjsax Aug 13, 2019

Author Member

Minor fix on the side (same below)

@mjsax

This comment has been minimized.

Copy link
Member Author

commented Aug 13, 2019

@@ -139,70 +212,70 @@ public void shouldMigrateDataFromDefaultToTimestampColumnFamily() throws Excepti

private void iteratorsShouldNotMigrateData() {
// iterating should not migrate any data, but return all key over both CF (plus surrogate timestamps for old CF)
final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all();

This comment has been minimized.

Copy link
@mjsax

mjsax Aug 13, 2019

Author Member

Added some exception handling improvements to close all resources properly in case of failure.

@ableegoldman
Copy link
Contributor

left a comment

LGTM, nice catch

@ableegoldman

This comment has been minimized.

Copy link
Contributor

commented Aug 13, 2019

checkstyleTest failed though

@mjsax

This comment has been minimized.

Copy link
Member Author

commented Aug 13, 2019

Fixed checkstyle

@bbejeck
Copy link
Contributor

left a comment

Just one minor comment, otherwise LGTM

noTimestampColumnFamily.close();
}
noTimestampsIter.close();
setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));
} catch (final RocksDBException e) {
if ("Column family not found: : keyValueWithTimestamp".equals(e.getMessage())) {
try {

This comment has been minimized.

Copy link
@bbejeck

bbejeck Aug 13, 2019

Contributor

Maybe add an INFO or WARN statement here to help with tracing control flow.

This comment has been minimized.

Copy link
@mjsax

mjsax Aug 13, 2019

Author Member

Not sure -- it's an internal implementation detail that we create a second CF -- if not a WARN for sure, because it's expected to happen. We could de a DEBUG if we want.

This comment has been minimized.

Copy link
@bbejeck

bbejeck Aug 13, 2019

Contributor

good point. I'd say just leave it as is.

@ConcurrencyPractitioner
Copy link
Contributor

left a comment

LGTM. Thanks for the change!

@guozhangwang
Copy link
Contributor

left a comment

Overall LGTM, just a minor question about test (trying to avoid flakiness).

@@ -92,14 +79,27 @@ void openRocksDB(final DBOptions dbOptions,
} catch (final RocksDBException fatal) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal);
}
log.info("Opening store {} in upgrade mode", name);
dbAccessor = new DualColumnFamilyAccessor(columnFamilies.get(0), columnFamilies.get(1));
setDbAccessor(columnFamilies.get(0), columnFamilies.get(1));

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Aug 13, 2019

Contributor

Line 75 above: double colons?

This comment has been minimized.

Copy link
@mjsax

mjsax Aug 13, 2019

Author Member

Yes -- well, it's the error message text we get from RocksDB -- nothing we can do about it guess.

withTimestampColumnFamily = columnFamilies.get(1);

assertThat(db.get(noTimestampColumnFamily, "key".getBytes()), new IsNull<>());
assertThat(db.getLongProperty(noTimestampColumnFamily, "rocksdb.estimate-num-keys"), is(0L));

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Aug 13, 2019

Contributor

Do we know that rocksdb.estimate-num-keys would always return 0 and 1 in this test (since it is only an approximate estimate, hence no guarantee)?

This comment has been minimized.

Copy link
@mjsax

mjsax Aug 13, 2019

Author Member

It's no exact counting in general, but this this case we know it will be counting correctly However, the "approximation" is at least deterministic and hence, it seems safe to me to write the test that way.

Basically, for each put() RocksDB increments the counter and for each delete() is decrements the counter. Hence, if you do a put() for an existing key, RockDB over counts, and if you do a delete() for a non-exiting key, RocksDB under counts.

In this test, we do a single put() on one CF and not operation on the other CF, hence, the count (ie, approximation) will be correct.

Thoughts?

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Aug 13, 2019

Contributor

SG.

@mjsax

This comment has been minimized.

Copy link
Member Author

commented Aug 13, 2019

Java8: org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector
Java 11 / 2.12:

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testReconfigConnector
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching enabled = true]
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerOuter[caching enabled = true]
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterInner[caching enabled = true]

Java 11 / 2.13 passed.

@mjsax

This comment has been minimized.

Copy link
Member Author

commented Aug 13, 2019

Retest this please.

@mjsax

This comment has been minimized.

Copy link
Member Author

commented Aug 14, 2019

Java 11 / 2.13: kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
Java 11 / 2.12:

kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic
kafka.api.SaslScramSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

Java8 passed.

Merging this.

@mjsax mjsax merged commit 245e9ca into apache:trunk Aug 14, 2019

1 of 3 checks passed

JDK 11 and Scala 2.12 FAILURE 11606 tests run, 77 skipped, 2 failed.
Details
JDK 11 and Scala 2.13 FAILURE 11811 tests run, 77 skipped, 1 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 11811 tests run, 77 skipped, 0 failed.
Details
mjsax added a commit that referenced this pull request Aug 14, 2019
KAFKA-8791: RocksDBTimestampedStore should open in regular mode by de…
…fault (#7201)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, Guozhang Wang <guozhang@confluent.io>
@mjsax

This comment has been minimized.

Copy link
Member Author

commented Aug 14, 2019

Merged to trunk and cherry-picked to 2.3.

@mjsax mjsax deleted the mjsax:kafka-8791-fix-store-upgrad-mode branch Aug 14, 2019

mjsax added a commit to mjsax/kafka that referenced this pull request Aug 15, 2019
KAFKA-8791: RocksDBTimestampedStore should open in regular mode by de…
…fault (apache#7201)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, Guozhang Wang <guozhang@confluent.io>
ableegoldman added a commit to confluentinc/kafka that referenced this pull request Aug 20, 2019
KAFKA-8791: RocksDBTimestampedStore should open in regular mode by de…
…fault (apache#7201)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, Guozhang Wang <guozhang@confluent.io>
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
[LI-CHERRY-PICK] [d5eda1e] KAFKA-8791: RocksDBTimestampedStore should…
… open in regular mode by default (apache#7201)

TICKET = KAFKA-8791
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [d5eda1e]
ORIGINAL_DESCRIPTION =

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, Guozhang Wang <guozhang@confluent.io>
(cherry picked from commit d5eda1e)
guozhangwang added a commit to confluentinc/kafka that referenced this pull request Aug 29, 2019
KAFKA-8791: RocksDBTimestampedStore should open in regular mode by de…
…fault (apache#7201)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, Guozhang Wang <guozhang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.