Skip to content

Commit

Permalink
Normalize and size-limit transform Ids
Browse files Browse the repository at this point in the history
  • Loading branch information
chamikaramj committed Apr 10, 2021
1 parent a48abeb commit bee1a14
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 18 deletions.
4 changes: 4 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Expand Up @@ -58,6 +58,10 @@ message BeamConstants {
// on any proto message that may contain references needing resolution.
message Components {
// (Required) A map from pipeline-scoped id to PTransform.
//
// Keys of the transforms map may be used by runners to identify pipeline
// steps. Hence it's recommended to use strings that are not too long and
// use alphanumeric characters, '_', and '-'.
map<string, PTransform> transforms = 1;

// (Required) A map from pipeline-scoped id to PCollection.
Expand Down
Expand Up @@ -21,6 +21,8 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.text.Normalizer;
import java.text.Normalizer.Form;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -191,6 +193,10 @@ private String getApplicationName(AppliedPTransform<?, ?, ?> appliedPTransform)
if (name.isEmpty()) {
name = "unnamed-ptransform";
}
// Normalize, trim, and uniqify.
int maxNameLength = 100;
name = Normalizer.normalize(name, Form.NFC).replaceAll("[^A-Za-z0-9-_]", "-");
name = (name.length() > maxNameLength) ? name.substring(0, maxNameLength) : name;
name = uniqify(name, transformIds.values());
transformIds.put(appliedPTransform, name);
return name;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.not;

import java.io.IOException;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void registerCoder() throws IOException {
public void registerTransformNoChildren() throws IOException {
Create.Values<Integer> create = Create.of(1, 2, 3);
PCollection<Integer> pt = pipeline.apply(create);
String userName = "my_transform/my_nesting";
String userName = "my_transform-my_nesting";
AppliedPTransform<?, ?, ?> transform =
AppliedPTransform.of(
userName,
Expand All @@ -96,6 +97,23 @@ public void registerTransformNoChildren() throws IOException {
assertThat(components.getExistingPTransformId(transform), equalTo(componentName));
}

@Test
public void registerTransformIdFormat() throws IOException {
Create.Values<Integer> create = Create.of(1, 2, 3);
PCollection<Integer> pt = pipeline.apply(create);
String malformedUserName = "my/tRAnsform 1(nesting)";
AppliedPTransform<?, ?, ?> transform =
AppliedPTransform.of(
malformedUserName,
PValues.expandInput(pipeline.begin()),
PValues.expandOutput(pt),
create,
pipeline);
String componentName = components.registerPTransform(transform, Collections.emptyList());
assertThat(componentName, matchesPattern("^[A-Za-z0-9-_]+"));
assertThat(components.getExistingPTransformId(transform), equalTo(componentName));
}

@Test
public void registerTransformAfterChildren() throws IOException {
Create.Values<Long> create = Create.of(1L, 2L, 3L);
Expand Down
Expand Up @@ -317,13 +317,13 @@ public void getProducer() {
String impulseOutputName =
getOnlyElement(
PipelineNode.pTransform(
"BoundedRead/Impulse", components.getTransformsOrThrow("BoundedRead/Impulse"))
"BoundedRead-Impulse", components.getTransformsOrThrow("BoundedRead-Impulse"))
.getTransform()
.getOutputsMap()
.values());
PTransformNode impulseProducer =
PipelineNode.pTransform(
"BoundedRead/Impulse", components.getTransformsOrThrow("BoundedRead/Impulse"));
"BoundedRead-Impulse", components.getTransformsOrThrow("BoundedRead-Impulse"));
PCollectionNode impulseOutput =
PipelineNode.pCollection(
impulseOutputName, components.getPcollectionsOrThrow(impulseOutputName));
Expand Down Expand Up @@ -355,7 +355,7 @@ public void getEnvironmentWithEnvironment() {

PTransformNode environmentalTransform =
PipelineNode.pTransform(
"ParDo/ParMultiDo(Test)", components.getTransformsOrThrow("ParDo/ParMultiDo(Test)"));
"ParDo-ParMultiDo-Test-", components.getTransformsOrThrow("ParDo-ParMultiDo-Test-"));
PTransformNode nonEnvironmentalTransform =
PipelineNode.pTransform("groupByKey", components.getTransformsOrThrow("groupByKey"));

Expand Down
Expand Up @@ -639,7 +639,7 @@ public void process(ProcessContext ctxt) {
(Coder<WindowedValue<?>>) remoteOutputCoder.getValue(), outputContents::add));
}

final String testPTransformId = "create/ParMultiDo(Metrics)";
final String testPTransformId = "create-ParMultiDo-Metrics-";
BundleProgressHandler progressHandler =
new BundleProgressHandler() {
@Override
Expand Down Expand Up @@ -801,7 +801,7 @@ public void onCompleted(ProcessBundleResponse response) {
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
builder.setLabel(
MonitoringInfoConstants.Labels.PCOLLECTION, testPTransformId + ".output");
MonitoringInfoConstants.Labels.PCOLLECTION, "create/ParMultiDo(Metrics).output");
builder.setInt64SumValue(3);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));

Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -93,8 +94,11 @@ public void testConstructPubsubRead() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/PubsubUnboundedSource", "test_namespacetest/MapElements"));
Matchers.hasItem(MatchesPattern.matchesPattern(".*PubsubUnboundedSource.*")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*MapElements.*")));

assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));
}
Expand Down Expand Up @@ -150,8 +154,10 @@ public void testConstructPubsubWrite() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/MapElements", "test_namespacetest/PubsubUnboundedSink"));
Matchers.hasItem(MatchesPattern.matchesPattern(".*MapElements.*")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*PubsubUnboundedSink.*")));
assertThat(transform.getInputsCount(), Matchers.is(1));
assertThat(transform.getOutputsCount(), Matchers.is(0));

Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -120,19 +121,24 @@ public void testConstructKafkaRead() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/KafkaIO.Read", "test_namespacetest/Remove Kafka Metadata"));
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*Remove-Kafka-Metadata.*")));
assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));

RunnerApi.PTransform kafkaComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
assertThat(
kafkaComposite.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/KafkaIO.Read/Impulse",
"test_namespacetest/KafkaIO.Read/ParDo(GenerateKafkaSourceDescriptor)",
"test_namespacetest/KafkaIO.Read/KafkaIO.ReadSourceDescriptors"));
Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));
assertThat(
kafkaComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*GenerateKafkaSourceDescriptor.*")));
assertThat(
kafkaComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*ReadSourceDescriptors.*")));
RunnerApi.PTransform kafkaSdfParDo =
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(2));
RunnerApi.ParDoPayload parDoPayload =
Expand Down Expand Up @@ -197,8 +203,10 @@ public void testConstructKafkaWrite() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/Kafka ProducerRecord", "test_namespacetest/KafkaIO.WriteRecords"));
Matchers.hasItem(MatchesPattern.matchesPattern(".*Kafka-ProducerRecord.*")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-WriteRecords.*")));
assertThat(transform.getInputsCount(), Matchers.is(1));
assertThat(transform.getOutputsCount(), Matchers.is(0));

Expand Down

0 comments on commit bee1a14

Please sign in to comment.