Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6161] Introduce PCollectionConsumerRegistry and add ElementCoun… #7272

Merged
merged 1 commit into from
Jan 24, 2019

Conversation

ajamato
Copy link

@ajamato ajamato commented Dec 13, 2018

…t counters to the java SDK

Please add a meaningful description for your change here


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

@ajamato ajamato force-pushed the element_count_dec14 branch 3 times, most recently from 1069654 to de52347 Compare December 18, 2018 02:47
@ajamato ajamato changed the title DO NOT SUBMIT [BEAM-6161] Introduce PCollectionConsumerRegistry and add ElementCoun… [BEAM-6161] Introduce PCollectionConsumerRegistry and add ElementCoun… Dec 18, 2018
@ajamato
Copy link
Author

ajamato commented Dec 18, 2018

@swegner @Ardagan @robertwb @echauchot
This PR is long, most of the changes are refactoring the bundle processing startup code's setup of Pcollection consumers to go through a common class, where wrapper code could be inserted to add an element count counter.

Copy link
Contributor

@Ardagan Ardagan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some class names that should be changed, so don't sign off yet.
Looks good to me in general, but might be good for someone else with domain knowledge to take a look at this PR.

@@ -146,8 +148,19 @@ public MetricUpdates getUpdates() {

for (MetricUpdate<Long> mu : mus.counterUpdates()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mu, as well as mus are hard to read. Use monitoringInfo and monitoringInfos.
e -> entry. (can be mixed with error, or other abbreviations)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -584,21 +584,28 @@ public void onProgress(ProcessBundleProgressResponse progress) {}
@Override
public void onCompleted(ProcessBundleResponse response) {
// Assert the timestamps are non empty then 0 them out before comparing.
List<MonitoringInfo> actualMIs = new ArrayList<>();
List<MonitoringInfo> actual = new ArrayList<MonitoringInfo>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to result.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN);
builder.setPCollectionLabel("impulse.out");
builder.setInt64Value(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use unique values. Right now you have 3 places that use same values: element_count counter, userMetric counter and total amount of counters.
This might lead to misunderstanding.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
package org.apache.beam.sdk.metrics;

public class LabelledMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LabeledMetrics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked this. It is difference between British and American English. I suggest we stick to American version here, since it is common for most of code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using LabeledMetrics

@@ -110,7 +110,7 @@
}
Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers =
(Collection)
pCollectionIdsToConsumers.get(getOnlyElement(pTransform.getOutputsMap().values()));
pCollectionConsumerRegistry.get(getOnlyElement(pTransform.getOutputsMap().values()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why our outputs map contains only one element?


/**
* A wrapping FnDataReceiverWindowedValue<T> which counts the number of elements consumed by the
* original nDataReceiverWindowedValue<T>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing F in FnDataReceiver...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/** @return The Get the only FnDataReceiver for the pcollection. */
public FnDataReceiver<WindowedValue<?>> getOnlyElement(String pCollectionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not used, please, remove. If I miss the usage, please rename to getReceiver, or getFnDataReceiver

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return pCollectionIdsToConsumers.keySet();
}

/** @return The Get the only FnDataReceiver for the pcollection. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document IllegalArgumentException when multiple elements are returned for pCollectionId


public MonitoringInfoMetricName(String urn, HashMap<String, String> labels) {
checkArgument(!Strings.isNullOrEmpty(urn), "MonitoringInfoMetricName urn must be non-empty");
checkArgument(labels != null, "MonitoringInfoMetricName name must be non-null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear comment.
Should state that labels parameter must not be null. If there are any required labels that should be present, it would be good to name those as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment, doing mroe would require adding a dep on SimpleMonitoringInfoBuilder, which would require adding a dep on runner-core-java-main. which is a backwards/looping dep. I'll add a TODO.

@Ardagan
Copy link
Contributor

Ardagan commented Dec 19, 2018

Please, add unittests for newly added classes.

Collection<FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>>> consumers =
(Collection)
pCollectionConsumerRegistry.get(
//Collection<FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>>> consumers =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -56,6 +54,7 @@
static class Factory<InputT> implements PTransformRunnerFactory<FlattenRunner<InputT>> {
@Override
public FlattenRunner<InputT> createRunnerForPTransform(
//public FlattenRunner<InputT> createRunnerForPTransform(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented code

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -72,17 +71,16 @@
throws IOException {

// Give each input a MultiplexingFnDataReceiver to all outputs of the flatten.
ImmutableSet.Builder<FnDataReceiver<WindowedValue<InputT>>> consumersBuilder =
new ImmutableSet.Builder<>();
// TODO ajamato: Somehow the consumersBuilder handles the casting for us before this change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo(ajamato)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm'd

ElementCountFnDataReceiver wrappedConsumer =
pCollectionIdsToWrappedConsumer.getOrDefault(pCollectionId, null);
if (wrappedConsumer != null) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might split this to builder and implementation. This will remove the option for user to invoke calls in wrong order.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea, but the builder would basically need to be the same as this. Having an exception thrown if built twice, or mroe things are added after, etc. Let's leave it as is for now, unless there is much more advantage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

new ElementCountFnDataReceiver<T>(consumer, pCollectionId);
pCollectionIdsToConsumers.put(pCollectionId, (FnDataReceiver) wrappedReceiver);
return wrappedReceiver;
public <T> void register(String pCollectionId, FnDataReceiver<WindowedValue<T>> consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation for method.
Note the order in which this method should be called regards getMultiplexingConsumer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,64 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unittests

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this metrics package does not have tests for every file inside, so I don't want to add an unnecessary test here. This is just a refactoring of existing code. Its being tested

I don't see this test as adding much value because this code is already tested very well with the existing tests since this is a class used to implement the others in the package. Specifically MetricContainerImpl

@@ -0,0 +1,56 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unittests

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@ajamato
Copy link
Author

ajamato commented Dec 20, 2018

@boyuanzz @robertwb

Copy link
Contributor

@ryan-williams ryan-williams left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a couple minor comments

// Represents a specific MonitoringInfo for a specific URN.
builder.setUrn(monitoringInfoName.getUrn());
for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
builder.setLabel(e.getKey(), e.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would addLabel be a better name for this function?

@ProcessElement
public void process(ProcessContext ctxt) {
Metrics.counter(RemoteExecutionTest.class, counterMetricName).inc();
// Due to a bug impulse is producing two elements instead of one.
// So add this check to only emit these three elemenets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// So add this check to only emit these three elemenets.
// So add this check to only emit these three elements.

is there a JIRA link handy about this bug, that could be included here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wlll add this, thank you

@ajamato
Copy link
Author

ajamato commented Jan 7, 2019

Seems to be an issue starting up the VMs, due to not getting the built dataflow container.

E Handler for GET /v1.27/images/us.gcr.io/apache-beam-testing/java-postcommit-it/java:20190105011654/json returned error: No such image: us.gcr.io/apache-beam-testing/java-postcommit-it/java:20190105011654
undefined
E Error syncing pod ab3a90f13f7e8f68b0508d2fbee851d2 ("dataflow-testpipeline-jenkins-0105-01041719-00vb-harness-18hs_default(ab3a90f13f7e8f68b0508d2fbee851d2)"), skipping: failed to "StartContainer" for "sdk" with ImagePullBackOff: "Back-off pulling image "us.gcr.io/apache-beam-testing/java-postcommit-it/java:20190105011654""
undefined

@ajamato
Copy link
Author

ajamato commented Jan 7, 2019

retest this please


for (MetricUpdate<Long> mu : mus.counterUpdates()) {
for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract MonitoringInfo creation logic into a separate method.

@@ -142,13 +143,25 @@ public MetricUpdates getUpdates() {
public Iterable<MonitoringInfo> getMonitoringInfos() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit test for this method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/** Tests for {@link MonitoringInfoMetricName}. */
public class MonitoringInfoMetricNameTest implements Serializable {

@Rule public final transient ExpectedException thrown = ExpectedException.none();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to throws. Then throws.expect() will sound more natural.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thrown is used in most tests because "throw" and "throws" are both java keywords and cannot be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thank you for clarification.

@Rule public final transient ExpectedException thrown = ExpectedException.none();

@Test
public void testElementCountConstruction() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split this test into metric equality test and hashCode equality tests.
Same for user counter version.
Non-equality test missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, added tests for not equals.

String urn = SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN;
MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, labels);

// Should not fail even though there is no metrics container.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Rule public ExpectedException expectedException = ExpectedException.none();

@Before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed. You do not have any mocks defined as class members.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Test
public void multipleConsumersSamePCollection() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testCounterIncrementsOnlyOnceWhenMultipleConsumersOfSamePCollectionReadSameElement

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is too long, I'll just add a comment for this one instead

}

@Test
public void throwsRuntimeExceptionIfRegisteredLate() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwsOnRegisteringPCollectionAfterMultiplexingConsumerWasInitialized

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

consumers.register(pCollectionA, consumerA1);
consumers.getMultiplexingConsumer(pCollectionA);

// Should throw an exception now if we try to register another after calling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Check that the underlying consumers are each invoked per element
// and the multiplexing one is invoked only once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which of the checks validates that multiplexing consumer is invoked only once? I'm misunderstanding the code I guess.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rm'd that comment

@@ -68,31 +68,31 @@ func init() {
runtime.RegisterType(reflect.TypeOf((*meanAccum)(nil)).Elem())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this file should not be present in change.
It seem to be formatting change only though.

@ajamato
Copy link
Author

ajamato commented Jan 17, 2019

I will resolve these conflicts after PR/7482 is merged

@ajamato ajamato force-pushed the element_count_dec14 branch 5 times, most recently from 8f0ca0b to 86f796c Compare January 22, 2019 22:02
@ajamato
Copy link
Author

ajamato commented Jan 22, 2019

Run Python PreCommit

1 similar comment
@ajamato
Copy link
Author

ajamato commented Jan 23, 2019

Run Python PreCommit

@ajamato
Copy link
Author

ajamato commented Jan 23, 2019

Run Java PreCommit

1 similar comment
@ajamato
Copy link
Author

ajamato commented Jan 23, 2019

Run Java PreCommit

@ajamato
Copy link
Author

ajamato commented Jan 23, 2019

Run Python PreCommit")

@ajamato
Copy link
Author

ajamato commented Jan 23, 2019

Run Java PreCommit

@ajamato
Copy link
Author

ajamato commented Jan 24, 2019

Run Java PreCommit

3 similar comments
@ajamato
Copy link
Author

ajamato commented Jan 24, 2019

Run Java PreCommit

@ajamato
Copy link
Author

ajamato commented Jan 24, 2019

Run Java PreCommit

@ajamato
Copy link
Author

ajamato commented Jan 24, 2019

Run Java PreCommit

@swegner swegner merged commit fb13cb1 into apache:master Jan 24, 2019
@@ -68,6 +68,7 @@ public SpecMonitoringInfoValidator() {
monitoringInfo.getUrn(), spec.getTypeUrn(), monitoringInfo.getType()));
}

// TODO(ajamato): Tighten this restriction to use set equality, to catch unused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting: I recently had a local change to keep this a bit relaxed, because I was testing with user metrics that were getting a PTRANSFORM label. I'd weakened the condition below to:

!requiredLabels.ieEmpty && !monitoringInfo.getLabelsMap().keySet().equals(requiredLabels)

where previously it required strict equality in all cases.

Seemingly this was relaxed to the containsAll below before I got anywhere with my change, and it's possible I was in some other invalid state to have had user metrics with a PTRANSFORM label that were failing the strict test (where requiredLabels was empty).

Just wanted to leave a breadcrumb here about that since we'll probably come back to it soon..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants