Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5900: Add task metrics common to both sink and source tasks #3911

Closed
wants to merge 6 commits into from

Conversation

rhauch
Copy link
Contributor

@rhauch rhauch commented Sep 19, 2017

Added metrics that are common to both sink and source tasks.

Marked as "WIP" since this PR is built upon #3864, and will need to be rebased once that has been merged into trunk. However, I would still appreciate initial reviews since this PR is largely additive.

@rhauch rhauch changed the title Kafka 5900 KAFKA-5900 Add task metrics common to both sink and source tasks Sep 19, 2017
@rhauch rhauch changed the title KAFKA-5900 Add task metrics common to both sink and source tasks [WIP] KAFKA-5900: Add task metrics common to both sink and source tasks Sep 19, 2017
@rhauch
Copy link
Contributor Author

rhauch commented Sep 19, 2017

@kkonstantine can you review this after you've given #3864 a once-over?

Copy link
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a first pass, skipping the parts that I reviewed already in KAFKA-5899.

It'll be much easier to do another pass after the above is merged. Focused on code and just glanced over tests.

My main question is around the correctness and the efficiency of StateTracker. Also any mention to thread-safety should be included to its javadoc.

* @return the Frequencies instance; never null
* @throws IllegalArgumentException if both {@code falseMetricName} and {@code trueMetricName} are null
*/
public static Frequencies forBooleanValues(MetricName falseMetricName, MetricName trueMetricName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you agree with public static methods following member field definition and constructor definition?

Copy link
Contributor Author

@rhauch rhauch Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think public static methods should appear before all (non-static) members. In this case, it is a factory method, so it is as important as constructors. IMO, static members should not really be interspersed with non-static members.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but I don't mind factory methods following private constructors.
And then you have non-factory public static methods which feels odd to be before constructors and member fields (again we are talking about java specifically). Anyway, up to you.

P.s. again this is old but still feels relevant: http://www.oracle.com/technetwork/java/codeconventions-141855.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep it where it is so it's not buried.

* @param centerValue the value identifying the {@link Frequencies} bucket to be reported
*/
public Frequency(MetricName name, double centerValue) {
super();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class doesn't extend another class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. Copied from Percentile, but removed now.


/**
* Create an instance with the given name and center point value.
* @param name the name of the frequency metric; may not be null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

this.centerValue = centerValue;
}

public MetricName name() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing javadoc to public method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

return this.name;
}

public double centerValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing javadoc to public method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

return count / (double) totalCount;
}

public double totalCount(MetricConfig config, long now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both arguments are unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.9999));
assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.0000));
assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.9999));
assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.000));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why precision is reduced in those two cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

* Utility class that tracks the current state and the duration of time spent in each state.
*/
public class StateTracker {
private volatile State currentState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions before I dive deeper into this if I have to.

These are a lot of volatile variables. Are we confident that the overall access pattern will be lighter than using a mutex (or combination of 1-2 volatiles with a mutex)?
Are the operations on the shared state composable? (see comment below). Are they somehow monotonically additive? (I wouldn't think so by glancing over durationRatio below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You raise a great point, and the code you reviewed definitely is not valid. I've completely redesigned the whole StateTracker class to be properly and efficiently threadsafe, and it should address your concerns.

durationDesired += destroyedTotalTimeMs;
break;
}
long total = durationCurrent + unassignedTotalTimeMs + runningTotalTimeMs + pausedTotalTimeMs +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either this is not atomic or I'm missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

}
}

void recordCommit(long duration, boolean success, Throwable error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error is unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I anticipate adding more metrics in the future that may want to know the error (e.g., last error reported by the task). I'd rather leave this as is.

@rhauch
Copy link
Contributor Author

rhauch commented Sep 20, 2017

Rebased on the latest #3864.

Expanded upon the unit tests, and corrected several issues found during the expanded testing of these classes that don’t appear to have been used yet.
Copy link
Contributor Author

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkonstantine ok, #3864 has been merged and this PR is now rebased on trunk, so this is ready for a "real" review.

A number of changes were moved from this PR into that PR, resulting in kind of a mess with the previous commits. So, I squashed several of them and now have just 2 commits: one to fix the Percentiles and Histogram classes and tests (which were unused up till now), and the second larger one is to add tasks metrics common to both sources and sinks. Below are a number of line comments with commentary about the changes.
Thanks for reviewing!

if (quantile > 1.00d)
return Float.POSITIVE_INFINITY;
if (quantile < 0.00d)
return Float.NEGATIVE_INFINITY;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Histogram and Percentiles classes were not yet used in the codebase, but I want to use both in the Connect metrics. Neither was really working properly, so I expanded the incomplete HistorgramTest and discovered it was not behaving correctly nor consistently with how it was being used in Percentiles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. They're both pretty useful for metrics. Glad you discovered what was off with these constructs. And thanks for the explanation, now the changes in the formulas make more sense.

@@ -92,29 +96,29 @@ public ConstantBinScheme(int bins, double min, double max) {
this.min = min;
this.max = max;
this.bins = bins;
this.bucketWidth = (max - min) / (bins - 2);
this.bucketWidth = (max - min) / bins;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LinearBinScheme and ConstantBinScheme were not consistent with each other or with how Histogram uses them. And, the logic simply didn't work, as the previous code did not even produce the correct number of bins.

@@ -149,10 +156,8 @@ public int toBin(double x) {
} else if (x > this.max) {
return this.bins - 1;
} else {
double scaled = x / this.scale;
return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
return (int) (-0.5 + 0.5 * Math.sqrt(1.0 + 8.0 * x / this.scale));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure where the previous equation came from, and couldn't find any similar algorithm in papers or code. The new equation is simply a simplification of the quadratic formula that solves the second-order equation on line 149.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you mentioned, is there a single doc you used as a guide for Histogram's impl? (A wikipedia article maybe?) If there's something concrete, maybe would be useful to add it to the javadoc, or an inline non-javadoc comment for this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what served as the original inspiration, but the algorithm is really just returning bins whose widths are the sum of all prior bin widths. IOW, the equation on line 149 in fromBin method is really just n (n+1) / 2, aka Gauss' Summation Trick, scaled such that the final bin has a range that ends with max.

}

@Test
public void testConstantBinScheme() {
ConstantBinScheme scheme = new ConstantBinScheme(5, -5, 5);
assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-5.01));
assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-5));
assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5));
assertEquals("Check boundary of bucket 3", 3, scheme.toBin(4.9999));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bin numbers are 0-based, so this was wrong to start with.

assertEquals(-1.0, scheme.fromBin(2), 0.001d);
assertEquals(1.0, scheme.fromBin(3), 0.001d);
assertEquals(3.0, scheme.fromBin(4), 0.001d);
checkBinningConsistency(scheme);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, it is possible to simplify these tests to use a loop, passing the output of fromBin() into toBin() and just checking they are consistent. That is what checkBinningConsistency does, and it like the name suggests it only checks that the two methods are consistent, not that the bins are computed correctly. IMO it is far clearer and more complete to use literal values and to check the range of the bins. I am intentionally using literals with different precision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

*/
public class StateTracker {

private final AtomicReference<StateChange> lastState = new AtomicReference<>(new StateChange());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkonstantine, on a previous review you caught how this class was not threadsafe. This new version is threadsafe and only requires on synchronized method only when the state (infrequently) changes.

*/
public synchronized void changeState(State newState, long now) {
// JDK8: remove synchronization by using lastState.getAndUpdate(oldState->oldState.newState(newState, now));
lastState.set(lastState.get().newState(newState, now));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we switch to Java 8 as the minimum compilation level, we can/should remove the synchronization and replace this method to use the atomic AtomicReference.getAndUpdate(UnaryOperator<T> updateFunction) method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's cool.

If you want immediately what JDK8 will bring with getAndUpdate, you may write this as:

StateChange current;
do {
    current = lastState.get();
} while (!lastState.compareAndSet(current, current.newState(newState, now)));

but a one-liner with synchronized here is fine too.

*/
protected void recordCommitFailure(long duration, Throwable error) {
taskMetricsGroup.recordCommit(duration, false, error);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had a single recordCommit(long duration, boolean success, Throwable error) method instead of the above 2 methods, but these two methods are IMO more readable where they're used and are therefore preferable.

workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, pluginLoader, time);
taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, transformationChain, pluginLoader, time);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it was necessary to not create multiple WorkerTask objects with the same name, since that results in extra metrics that pose problems in the test with the mocks and expectations. That's an unrealistic situation, so this change, while unappealing, is the easiest way to prevent that.

Copy link
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rhauch!

Looks good to me now. Left a few comments as replies, but I don't see anything additional that needs to be addressed here.

LGTM!

if (quantile > 1.00d)
return Float.POSITIVE_INFINITY;
if (quantile < 0.00d)
return Float.NEGATIVE_INFINITY;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. They're both pretty useful for metrics. Glad you discovered what was off with these constructs. And thanks for the explanation, now the changes in the formulas make more sense.

@@ -149,10 +156,8 @@ public int toBin(double x) {
} else if (x > this.max) {
return this.bins - 1;
} else {
double scaled = x / this.scale;
return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
return (int) (-0.5 + 0.5 * Math.sqrt(1.0 + 8.0 * x / this.scale));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you mentioned, is there a single doc you used as a guide for Histogram's impl? (A wikipedia article maybe?) If there's something concrete, maybe would be useful to add it to the javadoc, or an inline non-javadoc comment for this class.

MetricName name3 = name("3");
MetricName name4 = name("4");
Frequencies frequencies = new Frequencies(4, 1.0, 4.0,
new Frequency(name1, 1.0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is alignment correct here?

Copy link
Contributor Author

@rhauch rhauch Sep 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. One sec.

assertEquals(-1.0, scheme.fromBin(2), 0.001d);
assertEquals(1.0, scheme.fromBin(3), 0.001d);
assertEquals(3.0, scheme.fromBin(4), 0.001d);
checkBinningConsistency(scheme);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

*/
public synchronized void changeState(State newState, long now) {
// JDK8: remove synchronization by using lastState.getAndUpdate(oldState->oldState.newState(newState, now));
lastState.set(lastState.get().newState(newState, now));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's cool.

If you want immediately what JDK8 will bring with getAndUpdate, you may write this as:

StateChange current;
do {
    current = lastState.get();
} while (!lastState.compareAndSet(current, current.newState(newState, now)));

but a one-liner with synchronized here is fine too.

@rhauch rhauch changed the title [WIP] KAFKA-5900: Add task metrics common to both sink and source tasks KAFKA-5900: Add task metrics common to both sink and source tasks Sep 25, 2017
@rhauch
Copy link
Contributor Author

rhauch commented Sep 25, 2017

@kkonstantine, I just added some JavaDoc to the Histogram class and fixed the indentation you mentioned in the FrequenciesTest class.

@ewencp, I'll ping you for this since you're the committer that did the review on #3864.

Copy link
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

* object is a {@link CompoundStat}, and so it can be {@link org.apache.kafka.common.metrics.Sensor#add(CompoundStat)
* added directly to a Sensor} so the metrics are created automatically.
*/
public class Frequencies extends SampledStat implements CompoundStat {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new because the existing -ratio metrics are Rates and have actual time units/measurements that are actual times rather than just samples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they're durations over total time. This class captures a normal histogram of values, but unlike Percentiles does not aggregate the counts below the requested bin.

* {@link Frequency#centerValue() center value} within the specified range
*/
public Frequencies(int buckets, double min, double max, Frequency... frequencies) {
super(0.0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the right initial value? What if 0.0 isn't even in the range specified by min/max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base class uses it to create a new Sample, and this class doesn't use that initial value in the samples. So, that value is unused. I can add a comment.

throw new IllegalArgumentException("Must be at least 2 buckets");
}
if (buckets < frequencies.length) {
throw new IllegalArgumentException("More frequencies than buckets");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the thing that matters whether there is more than one frequency per bucket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important that all supplied Frequency objects have a bin. I don't think having more than one Frequency per bin is horrible; yes, it's a coding error, so I can add a check.

}
}
double halfBucketWidth = (max - min) / (buckets - 1) / 2.0;
this.binScheme = new ConstantBinScheme(buckets, min - halfBucketWidth, max + halfBucketWidth);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually got quite complex for just counting the frequency of 2 fixed values. Is all of this really needed? Are we going to use the more complicated functionality in a later patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do plan on using it for more complex cases as well. And, I started writing something that applied to boolean samples, and it wasn't that much more efficient but would have resulted in quite a bit of duplicate code. Plus, SampledStat already comes with a fair amount of baggage (e.g., the record method takes only double values, Sample has several values, samples are List<Sample>, etc.), so using it with boolean wouldn't be simpler. Given all of this, I thought using this for boolean values was an acceptable compromise.

return bins - 1;
else
return (int) ((x - min) / bucketWidth) + 1;
return (int) ((x - min) / bucketWidth);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there ever any issues with floating point precision here and the bucketWidth? Should this clamp to 0 to bins -1 just to be safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably wouldn't hurt. I'll add that.

private final Sensor commitAttempts;

public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener) {
delegateListener = statusListener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we keep ending up with this pattern, it might be clearer to create a Listener implementation that delegates to a list of listeners instead of chaining them manually this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but for just 2 listeners I didn't think it was worth it (yet).

addTaskStateMetric(State.DESTROYED, "status-destroyed",
"Signals whether the connector task is in the destroyed state.");

addRatioMetric(State.RUNNING, "running-ratio",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this state is missing from the KIP, it should be added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. Fixed.

"Signals whether the connector task is in the destroyed state.");

addRatioMetric(State.RUNNING, "running-ratio",
"The fraction of time this task has spent in the paused state.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paused -> running

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

"The average time in milliseconds taken by this task to commit offsets"),
new Avg());

// int buckets = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this all commented out intentionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will want them at some point, but I wasn't sure about the correctness of Percentiles and these are not high priority metrics. Turns out I think at least some of the problem was in Histogram, which I've since fixed. This is in a commit now, so if you prefer I can remove them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just generally try to avoid the mess if the code isn't ready to be committed yet. it can always be held onto in a branch, wip pr, etc


@Override
public void onShutdown(ConnectorTaskId id) {
taskStateTimer.changeState(State.UNASSIGNED, time.milliseconds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see sensors being removed, but for tasks we'd definitely want to remove them when the task gets reassigned to another worker. I haven't thought it through, but this might be the right place to be doing that rather than marking it unassigned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatedly, I think there might be some sort of checks in unit tests in maybe the producer or consumer that validate metrics are unregistered, might be able to use a similar approach here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point about removing. I completely missed that, so I'll address that now.

… tasks are stopped/removed

The ability to easily remove the metrics required a few more changes to the `ConnectMetrics` class, including the addition of methods to add `Sensor` objects.
The sensor names used within a group must be unique, so now the group creates sensors using a prefix that includes the group’s name and tags, which must be unique to the group.
@rhauch
Copy link
Contributor Author

rhauch commented Sep 26, 2017

@ewencp, I added two more commit. The first addresses all of the concerns you mentioned above. Unfortunately, removing the sensors was not trivial, so it required a few more methods on the MetricGroup to hold on to the sensor names so they can be removed. All metrics that are part of the group are all automatically removed.

The second commit ensures that MetricGroup are unique and keyed by the group name and the tags. This is essential if each task is to have its own metric groups.

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhauch Getting close, one real issue w/ ordering of tags but otherwise just a few minor cleanups.

assert groupName != null;
assert tags != null;
this.groupName = groupName;
this.tags = Collections.unmodifiableMap(new HashMap<>(tags));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should ensure it preserves the order of the tags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

if (previous != null) group = previous;
}
return group;
}

protected MetricGroupId groupId(String groupName, boolean includeWorkerId, String... tagKeyValues) {
checkNameIsValid(groupName);
Map<String, String> tags = tags(includeWorkerId ? workerId : null, tagKeyValues);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We missed it in the first round probably because those metrics only had 1 tag, but the tags method should use something like a LinkedHashMap. Without this, the tags can get jumbled. I noticed this because jconsole's hierarchy was listing the task ID above the connector name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

@@ -124,6 +125,10 @@ protected void close() {
}

@Override
protected void releaseResources() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just drop this since neither implementation contains any code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this will be used in the future PRs to close the subclass-specific metrics. I debated about doing this inside close, but the problem is that we'd miss any failures that happen when the tasks are being closed.

"The average time in milliseconds taken by this task to commit offsets"),
new Avg());

// int buckets = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just generally try to avoid the mess if the code isn't ready to be committed yet. it can always be held onto in a branch, wip pr, etc

*
* @param sensors The sensors to be removed
*/
public synchronized void removeSensors(Sensor... sensors) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't currently used. Are we expecting to have a use case for it or should we just remove it? Generally you'd only remove the entire group together and only when it is being destroyed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not being used. I can remove it.

@rhauch
Copy link
Contributor Author

rhauch commented Sep 27, 2017

@ewencp another commit to address your comments/suggestions.

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

asfgit pushed a commit that referenced this pull request Sep 28, 2017
Added Connect metrics specific to source tasks, and builds upon #3864 and #3911 that have already been merged into `trunk`.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: tedyu <yuzhihong@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3959 from rhauch/kafka-5901
asfgit pushed a commit that referenced this pull request Oct 3, 2017
Added Connect metrics specific to source tasks, and builds upon #3864 and #3911 that have already been merged into `trunk`, and #3959 that has yet to be merged.

I'll rebase this PR when the latter is merged.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3975 from rhauch/kafka-5902
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants