Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability #1832

Merged
merged 1 commit into from
Feb 15, 2017

Conversation

srdo
Copy link
Contributor

@srdo srdo commented Dec 17, 2016

Making these changes based on a discussion here #1826 and #1825 about the spout getting a bit large.

This PR makes the following changes:

  • OffsetEntry was moved into a new class and renamed OffsetManager
  • OffsetManager.commit now returns numCommittedOffsets, so it doesn't have to refer to a KafkaSpout internal variable.
  • KafkaSpout.commitOffsetsForAckedTuples no longer iterates over acked twice. The second iteration was used to commit offsets to OffsetEntries, but we may as well use nextCommitOffsets for that iteration, since those were the offsets committed to Kafka. This also saves us a null check in OffsetManager.commit.
  • OffsetManager.commit no longer checks if the parameter is null. It should no longer be possible that it is null, unless there's a bug, in which case we want to throw an NPE so we can fix it.
  • Timer supports Storm's time simulation. This also means that it only supports time units down to milliseconds. If we need nanosecond precision, we need to update Storm's Utils.Time.
  • Cleaned up a few redundant version declarations in the pom. Also switched out hamcrest-all for hamcrest-core + hamcrest-library, since tests were throwing NoSuchMethodError when an assertThat failed.
  • Updated the tests to use time simulation, and to check if the spout calls KafkaConsumer.commitSync instead of reading the spout's internal state

I think it would be nice if RetryService also supported time simulation, let me know what you think @hmcl

@srdo
Copy link
Contributor Author

srdo commented Dec 17, 2016

I wound up keeping KafkaUnit for the tests anyway. It doesn't hurt to have some integration testing, and mocking the consumer is a bit of a hassle for no real gain.

@harshach
Copy link
Contributor

@srdo can you please add details on any testing you've done with these changes.

@srdo
Copy link
Contributor Author

srdo commented Dec 21, 2016

@harshach Nothing beyond running the unit tests. Most of the changes are just moving code to new classes verbatim. The OffsetManager commit code changes should be getting hit by the test code.

@hmcl
Copy link
Contributor

hmcl commented Jan 24, 2017

@srdo What about calling the JIRA/PR Summary/Title: Kafka Spout Refactoring to Increase Modularity and Testability

@srdo srdo changed the title STORM-2250: Refactor new Kafka spout so it is easier to read, and easier to test without breaking encapsulation. STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability Jan 24, 2017
@srdo
Copy link
Contributor Author

srdo commented Jan 24, 2017

@hmcl Renamed. I'll wait until #1808 has been merged to fix conflicts here, since it's likely there's some overlap

@srdo
Copy link
Contributor Author

srdo commented Feb 1, 2017

@hmcl Updated. This should be ready for review.

@@ -52,7 +52,6 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's behind of the change? We're maintaining both storm.kafka.version and storm.kafka.client.version, and this line is for picking correct version.

EDIT: I just saw that root pom.xml has it, so that's OK, but IMHO I would like to leave this explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I put it back.

@srdo
Copy link
Contributor Author

srdo commented Feb 4, 2017

I'm a little unsure about whether the time unit change to Timer is a good idea. It might be better to either implement nanotime support in Storm Time, or to make the constructor parameters just be in milliseconds (I'm not sure how useful nano/microsecond support is for anyone). Anyone have any opinions on this?

KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
//This setter is here for testing
final KafkaSpout setKafkaConsumerFactory(KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather leave this with the constructor. We shouldn't be making this class mutable unless we have a very good reason to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change it back.

@@ -58,27 +56,26 @@

public class KafkaSpout<K, V> extends BaseRichSpout {
private static final long serialVersionUID = 4151921085047987154L;
public static final long INITIAL_TIMER_DELAY_MS = 500;
Copy link
Contributor

@hmcl hmcl Feb 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_TIMER_DELAY_MS - For cohesiveness this constant should likely be in the Timer class itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't a default, but a hardcoded value. It's mostly here so it can be referred to from tests. I don't think it should be in Timer, since it's not a property inherent to Timer, but to how KafkaSpout uses Timer. I could rename it to INITIAL_COMMIT_AND_SUBSCRIPTION_REFRESH_DELAY_MS to make it clear what it's for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just make it TIMER_DELAY_MS. Sweet and to the point.

long oldCommittedOffset = offsetManager.getCommittedOffset();
long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
numUncommittedOffsets -= numCommittedOffsets;
LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cohesiveness, I believe that this log message should be present in the OffsetManager class. If another (client) class creates an instance of OffsetManager, that class will be required to implement this same log message. Furthermore, this log message dives deep in the internal details of OffsetManager, which will make it very hard for client classes to print such message without know the internal details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I only moved it here because the log makes use of the numUncommittedOffsets spout field, which we can't pass into offsetManager.commit because it changes based on the return value of commit. I can split the log message into a part that logs how much was committed for which topic partition (inside OffsetManager), and then another log message that logs numUncommittedOffsets (inside KafkaSpout). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can split this into two log messages.

The first one LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}] goes into OffsetManager

The second LOG.debug("[{}] uncommitted offsets across all topic partitions stays here.

import java.util.Map;
import java.util.stream.IntStream;

import static org.mockito.Mockito.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please remove import wildcards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@hmcl
Copy link
Contributor

hmcl commented Feb 7, 2017

@srdo everything looks OK for me. I just would rather leave the Timer in ns than in ms, and change the SimulatedTime class to support nanos. I think that moving forward that will be useful for other developers as well.

@srdo
Copy link
Contributor Author

srdo commented Feb 8, 2017

@hmcl I gave updating Time to support nanoseconds a shot. Not really sure I'm happy with it, since it requires that we also switch the current millisecond support over to using System.nanoTime (System.nanoTime and System.currentTimeMillis are not on the same timeline, and are incomparable).

If we're going to keep this change, Time.currentTimeMillis should be changed to a different name, so it doesn't imply that it's the same as System.currentTimeMillis.

@revans2 Do you have an opinion on nanosecond support in o.a.s.u.Time? (asking because you have a number of commits on the Time file)

@revans2
Copy link
Contributor

revans2 commented Feb 8, 2017

@srdo Sorry I am kind of coming into this half way through.

History: System.nanoTime and System.currentTimeMillis have different uses and are not on the same time line. nanoTime is guaranteed to be monotonically increasing so long as the box is up. currentTimeMillis is not, because it is kept in sync with the system time. currentTimeMillis on most OSes is a very cheap. It is reading from shared memory, no system call needed. It can be different if read from different threads even. nanoTime, at least on x86 boxes end up reading a special register in the processor. If there is a very small core count this is cheap. However if you are on a system with many different cores or physical chips they all have to be synced up to guarantee that it is monotonically increasing so it ends up being about as expensive as a memory barrier. Not super expensive but also not dead cheap.

As far as Time.java is concerned. Simulated time can be whatever we want. I don't care if we store nanos internally or millis internally. It is all simulated anyways. If we are not simulating I don't think we should mix the two or have one backed by the other. They are separate for a reason and people most of the time will pick one or the other because of those differences. If we want nano support then lest have Time support nano, but the milli APIs stay backed by System.currentTimeMillis, and the nano APIs are backed by System.nanoTime.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 9, 2017

Let's back to actual usage of Timer.

if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
    commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);

Here we are taking period as milliseconds consistently, and dropping support nanoseconds on Timer means that up to 1 ms can be added to the period. Unless we're expecting users to set these config values to small value like 1 or so, I think it doesn't matter. Indeed we're having 500 ms as default value, which doesn't matter if it will be 501 ms.

@srdo
Copy link
Contributor Author

srdo commented Feb 9, 2017

@revans2 I agree, it is best to keep nanos and millis separate. I've updated Time to pass through to System.currrentTimeMillis for millisecond methods, and System.nanoTime for nanosecond methods. When simulating everything gets converted to nanos.

@HeartSaVioR I agree, I'm not really seeing a use case for submillisecond support in storm-kafka-client, because the things we're currently doing with timers is to set retry backoffs or commit periods. I don't think anyone would want either to be less than 1 millisecond.

@hmcl Is there a reason nano/microsecond support is needed?

I've updated Time and Timer to support nanoseconds (hopefully without introducing bugs).

@revans2
Copy link
Contributor

revans2 commented Feb 10, 2017

@srdo for Time and Timer for a pure correctness perspective the changes look good to me. I don't see any issues.

I took a quick look at the rest of the patch and it too looks good, but I don't dig in to it so I don't feel comfortable giving a +1 at this point. I would love to really take a look at it, but I am sadly swamped with other things right now.

@hmcl
Copy link
Contributor

hmcl commented Feb 10, 2017

@srdo I was also caught up in a couple of things. I will finish reviewing this tonight.

@srdo
Copy link
Contributor Author

srdo commented Feb 10, 2017

Thanks, both of you :)

@@ -42,10 +46,10 @@ public SimulatedTime() {
public SimulatedTime(Number ms) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

advanceTimeMs

advanceTimeNanos(millisToNanos(ms));
}

public static void advanceTimeNanos(long nanos) {
if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please enclose the if statement in braces.

if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
long newTime = simulatedCurrTimeMs.addAndGet(ms);
if (nanos < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please enclose the if statement in braces.

public static long currentTimeMillis() {
if(simulating.get()) {
return simulatedCurrTimeMs.get();
return nanosToMillis(simulatedCurrTimeNanos.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is not an atomic operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to be? It's not updating simulatedCurrTimeNanos, and it's just dividing by a constant.

@HeartSaVioR
Copy link
Contributor

@srdo Could you rebase this?

@srdo
Copy link
Contributor Author

srdo commented Feb 14, 2017

@HeartSaVioR Fixed conflict

@hmcl
Copy link
Contributor

hmcl commented Feb 14, 2017

+1 for all the KafkaSpout related code
+1 also for the SimulatedTime changes in the sense that the refactoring of this class does not change the semantics of the pre-existing code.

@srdo can you please squash all the commits and create a PR for 1.x-branch, such that we can backport these changes. Thanks.

@srdo At last but not least, thanks for the great work. This refactoring makes the code much more modular and easy to extend..

…ility. Also support nanoseconds in Storm time simulation
@srdo
Copy link
Contributor Author

srdo commented Feb 14, 2017

Squashed this and pushed 1.x version at #1941. Thanks all for good feedback :)

@ptgoetz
Copy link
Member

ptgoetz commented Feb 15, 2017

+1

@asfgit asfgit merged commit 6e75016 into apache:master Feb 15, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants