-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5111: code cleanup follow up #2917
Conversation
mjsax
commented
Apr 26, 2017
- mainly moving methods
- also improved logging
Call for review @enothereska @dguy @guozhangwang This is a follow-up for #2895 with the code cleanup commit that I did remove (moving around methods). I also put a second commit on top to improving logging. This PR also addresses as remaining nits from #2895 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple minor comments, otherwise LGTM
@@ -122,14 +105,14 @@ public void commit() { | |||
*/ | |||
@Override | |||
public void suspend() { | |||
log.info("{} Suspending", logPrefix); | |||
log.debug("{} Suspending", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we are changing these from info to debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We log suspended tasks at info
level in StreamThread
already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we are not actually?
for (final AbstractTask task : activeAndStandbytasks()) {
+ try {
+ task.suspend();
+ } catch (final RuntimeException e) {
+ firstException.compareAndSet(null, e);
+ }
+ }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do. It's little further up:
log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks {}",
logPrefix, activeTasks.keySet(), standbyTasks.keySet());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final AbstractTask task : activeAndStandbytasks()) {
try {
task.suspend();
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax Just realized we already have an info log entry in onPartitionXX callbacks, so this should be fine. NVM.
final Sensor taskCommitTimeSensor; | ||
|
||
|
||
public TaskMetrics(final StreamsMetrics metrics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be package private
Refer to this link for build results (access rights to CI server needed): |
Updated this. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
/** | ||
* Flush all state stores owned by this task | ||
*/ | ||
void flushState() { | ||
log.trace("{} Flushing state store", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is a bit misleading since it is not flushing a store
, but the manager
.
BTW, maybe it's better just to move
log.debug("{} Flushing all stores registered in the state manager", logPrefix);
out of the if condition inside manager.flush()
and avoid adding this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's that the same "thing" the manager is just an implementation details?
The idea is, to log that task.flushState()
was called -- this log is independent of "how" the flushing is done. Does this make sense?
And I thinks it better to keep it within the if -> this los is not about a method got called, but what does actually happen -- and thus only log if an actual flush is done.
* @param writeCheckpoint boolean indicating if a checkpoint file should be written | ||
*/ | ||
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { | ||
log.trace("{} Closing state stores", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto to my other comment, maybe we can just move
log.debug("{} Closing its state manager and all the registered state stores", logPrefix);
out of the if condition inside the manager code.
} | ||
|
||
protected void updateOffsetLimits() { | ||
log.trace("{} Updating store offset limits", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change trace
to debug
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change this to debug
and do the inner as trace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just trying to be consistent with other task-level log entries. We can also just remove this as you mentioned below.
initTopology(); | ||
@Override | ||
protected void flushState() { | ||
log.trace("{} Flushing state and producer topology", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
producer topology
-> producer
?
@Override | ||
protected void flushState() { | ||
log.trace("{} Flushing state and producer topology", logPrefix); | ||
super.flushState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will incur two trace logging one from stream-task and one from abstract-task if trace
is enabled.
Instead could we just add a trace log after line 262 for "Flushing the producer after all state store has been flushed and corresponding changelog records sent".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same reply as above -- all those log indicate that a method got called -- thus it's not "double logging" imho -- but it's of course verbose, but for trace level should be fine
for (final TopicPartition partition : partitions) { | ||
try { | ||
final OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API? | ||
stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a trace entry recording the updated offset limit inside the manager class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId, | ||
Collections.singletonMap("client-id", threadClientId)); | ||
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { | ||
log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadClientId); | ||
log.warn("{} Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", logPrefix, threadClientId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second param seems redundant.
@@ -682,7 +781,7 @@ private void maybeUpdateStandbyTasks() { | |||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>(); | |||
|
|||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : | |||
standbyRecords.entrySet()) { | |||
standbyRecords.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This newline is not needed.
@@ -122,14 +105,14 @@ public void commit() { | |||
*/ | |||
@Override | |||
public void suspend() { | |||
log.info("{} Suspending", logPrefix); | |||
log.debug("{} Suspending", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we are not actually?
for (final AbstractTask task : activeAndStandbytasks()) {
+ try {
+ task.suspend();
+ } catch (final RuntimeException e) {
+ firstException.compareAndSet(null, e);
+ }
+ }
@@ -939,6 +1209,7 @@ private void addStreamTasks(final Collection<TopicPartition> assignment, final l | |||
final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>(); | |||
|
|||
// collect newly assigned tasks and reopen re-assigned tasks | |||
log.info("{} Resuming suspended re-assigned tasks", logPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit misleading, and less informative. Maybe just "Adding active tasks {partitionAssignor.activeTasks()}"?
Ditto below for standby tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Cf Line 1221 -> task.resume()
This loop figures out what resume task got reassigned.
retest this please |
Refer to this link for build results (access rights to CI server needed): |
@mjsax is Jenkins failure relevant? |
Refer to this link for build results (access rights to CI server needed): |
It says: I'll try to rebase. |
cd6cbea
to
3f545d7
Compare
Updated and rebased. \cc @guozhangwang |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Merged to trunk. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |