From c64313eb8b89551ab1bbd4267c70e406ee1b5bea Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Tue, 25 Oct 2016 14:39:55 -0400 Subject: [PATCH 1/3] STORM-2174: Initial commit for beam runner. --- external/storm-beam/pom.xml | 99 ++++++++++ .../storm/beam/RandomSentenceSource.java | 109 ++++++++++ .../storm/beam/StormPipelineOptions.java | 12 ++ .../org/apache/storm/beam/StormRegistrar.java | 31 +++ .../org/apache/storm/beam/StormRunner.java | 170 ++++++++++++++++ .../org/apache/storm/beam/StormWordCount.java | 118 +++++++++++ .../translation/GroupByKeyTranslator.java | 58 ++++++ .../translation/ParDoBoundTranslator.java | 51 +++++ .../translation/StormPipelineTranslator.java | 51 +++++ .../beam/translation/TransformTranslator.java | 11 ++ .../beam/translation/TranslationContext.java | 186 ++++++++++++++++++ .../beam/translation/TranslatorRegistry.java | 29 +++ .../UnboundedSourceTranslator.java | 21 ++ .../translation/WindowBoundTranslator.java | 47 +++++ .../beam/translation/runtime/DoFnBolt.java | 102 ++++++++++ .../runtime/GroupByKeyCompleteBolt.java | 40 ++++ .../runtime/GroupByKeyInitBolt.java | 47 +++++ .../translation/runtime/StormDoFnRunner.java | 27 +++ .../runtime/UnboundedSourceSpout.java | 109 ++++++++++ .../beam/translation/runtime/WindowBolt.java | 48 +++++ .../util/DefaultSideInputReader.java | 29 +++ .../translation/util/DefaultStepContext.java | 51 +++++ .../beam/util/SerializedPipelineOptions.java | 64 ++++++ pom.xml | 1 + 24 files changed, 1511 insertions(+) create mode 100644 external/storm-beam/pom.xml create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java create mode 100644 external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java diff --git a/external/storm-beam/pom.xml b/external/storm-beam/pom.xml new file mode 100644 index 00000000000..465400558a3 --- /dev/null +++ b/external/storm-beam/pom.xml @@ -0,0 +1,99 @@ + + + + 4.0.0 + + + storm + org.apache.storm + 1.1.0-SNAPSHOT + ../../pom.xml + + + storm-beam + jar + + + Apache Storm Beam Runner + + 0.2.0-incubating-SNAPSHOT + + + + + org.apache.storm + storm-core + ${project.version} + + + org.apache.beam + beam-sdks-java-core + ${beam.version} + + + org.apache.beam + beam-runners-core-java + ${beam.version} + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + runtime + true + + + com.google.auto.service + auto-service + 1.0-rc2 + true + + + + + + src/main/resources + true + + + + + org.apache.maven.plugins + maven-shade-plugin + + true + + + + package + + shade + + + + + + + + + + + + + + diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java new file mode 100644 index 00000000000..1c87e5472b6 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java @@ -0,0 +1,109 @@ +package org.apache.storm.beam; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Created by tgoetz on 7/28/16. + */ +public class RandomSentenceSource extends UnboundedSource { + + private final Coder coder; + + public RandomSentenceSource(Coder coder){ + this.coder = coder; + } + + @Override + public List> generateInitialSplits(int i, PipelineOptions pipelineOptions) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader createReader(PipelineOptions pipelineOptions, @Nullable CheckpointMark checkpointMark) throws IOException { + return new RandomSentenceReader(this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + + } + + @Override + public Coder getDefaultOutputCoder() { + return this.coder; + } + + + + public static class RandomSentenceReader extends UnboundedReader { + + private String[] values = {"blah blah blah", "foo bar", "my dog has fleas"}; + private int index = 0; + private final UnboundedSource source; + + public RandomSentenceReader(UnboundedSource source){ + this.source = source; + } + + + @Override + public boolean start() throws IOException { + index = 0; + return true; + } + + @Override + public boolean advance() throws IOException { + index++; + if(index == values.length){ + index = 0; + } + return true; + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource getCurrentSource() { + return this.source; + } + + @Override + public String getCurrent() throws NoSuchElementException { + return values[index]; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + + } + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java new file mode 100644 index 00000000000..8c710ab84ce --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java @@ -0,0 +1,12 @@ +package org.apache.storm.beam; + +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Created by tgoetz on 7/27/16. + */ +public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions { + + +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java new file mode 100644 index 00000000000..0e102715bc2 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java @@ -0,0 +1,31 @@ +package org.apache.storm.beam; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +public class StormRegistrar { + private StormRegistrar(){} + + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of( + StormRunner.class); + } + } + + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of( + StormPipelineOptions.class); + } + } + +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java new file mode 100644 index 00000000000..cc923494963 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java @@ -0,0 +1,170 @@ +package org.apache.storm.beam; + + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.AggregatorRetrievalException; +import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.beam.translation.StormPipelineTranslator; +import org.apache.storm.beam.translation.TranslationContext; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.*; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Main entry point into the Storm Runner. + * + * After reading the user defined pipeline, Beam will invoke the run() method with a representation + * of the pipeline. + * + * TODO: Only supports storm local mode for now. + */ +public class StormRunner extends PipelineRunner { + private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class); + + private StormPipelineOptions options; + + public StormRunner(StormPipelineOptions options){ + this.options = options; + } + + public static StormRunner fromOptions(PipelineOptions options){ + StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options); + return new StormRunner(pipelineOptions); + + } + + @Override + public StormPipelineResult run(Pipeline pipeline) { + LOG.info("Running pipeline..."); + TranslationContext context = new TranslationContext(this.options); + StormPipelineTranslator transformer = new StormPipelineTranslator(context); + transformer.translate(pipeline); + + for(TranslationContext.Stream stream : context.getStreams()){ + LOG.info(stream.getFrom() + " --> " + stream.getTo()); + } + + runTopologyLocal(getTopology(context)); + return null; + } + + private void runTopologyLocal(StormTopology topology){ + Config conf = new Config(); + conf.setMaxSpoutPending(1000); +// conf.setDebug(true); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, topology); + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + +// cluster.shutdown(); + } + + public static class StormPipelineResult implements PipelineResult { + private State state; + + public State getState() { + return this.state; + } + + public AggregatorValues getAggregatorValues(Aggregator aggregator) throws AggregatorRetrievalException { + return null; + } + } + + private StormTopology getTopology(TranslationContext context){ + TopologyBuilder builder = new TopologyBuilder(); + Map spouts = context.getSpouts(); + for(String id : spouts.keySet()){ + builder.setSpout(id, spouts.get(id)); + } + + HashMap declarers = new HashMap(); + for (TranslationContext.Stream stream : context.getStreams()) { + Object boltObj = context.getBolt(stream.getTo()); + BoltDeclarer declarer = declarers.get(stream.getTo()); + if (boltObj instanceof IRichBolt) { + if(declarer == null) { + declarer = builder.setBolt(stream.getTo(), + (IRichBolt) boltObj); + declarers.put(stream.getTo(), declarer); + } + } else if (boltObj instanceof IBasicBolt) { + if(declarer == null) { + declarer = builder.setBolt( + stream.getTo(), + (IBasicBolt) boltObj); + declarers.put(stream.getTo(), declarer); + } + } else if (boltObj instanceof IWindowedBolt) { + if(declarer == null) { + declarer = builder.setBolt( + stream.getTo(), + (IWindowedBolt) boltObj); + declarers.put(stream.getTo(), declarer); + } + } else if (boltObj instanceof IStatefulBolt) { + if(declarer == null) { + declarer = builder.setBolt( + stream.getTo(), + (IStatefulBolt) boltObj); + declarers.put(stream.getTo(), declarer); + } + } else { + throw new IllegalArgumentException("Class does not appear to be a bolt: " + + boltObj.getClass().getName()); + } + + TranslationContext.Grouping grouping = stream.getGrouping(); + // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream + String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId()); + + + switch (grouping.getType()) { + case SHUFFLE: + declarer.shuffleGrouping(stream.getFrom(), streamId); + break; + case FIELDS: + //TODO check for null grouping args + declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs())); + break; + case ALL: + declarer.allGrouping(stream.getFrom(), streamId); + break; + case DIRECT: + declarer.directGrouping(stream.getFrom(), streamId); + break; + case GLOBAL: + declarer.globalGrouping(stream.getFrom(), streamId); + break; + case LOCAL_OR_SHUFFLE: + declarer.localOrShuffleGrouping(stream.getFrom(), streamId); + break; + case NONE: + declarer.noneGrouping(stream.getFrom(), streamId); + break; + default: + throw new UnsupportedOperationException("unsupported grouping type: " + grouping); + } + } + + return builder.createTopology(); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java new file mode 100644 index 00000000000..b98c1aa4644 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.beam; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * A minimal word count pipeline using the Beam API, running on top of Storm + * + * When the Storm Runner is reasonably complete, running this pipline in Storm + * should yield that same output as running it on the Beam DirectRunner + * + */ +public class StormWordCount { + + static class ExtractWordsFn extends DoFn { + private final Aggregator emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + System.out.println(word); + c.output(word); + } + } + } + } + + /** + * A SimpleFunction that converts a Word and Count into a printable string. + */ + public static class FormatAsTextFn extends SimpleFunction, String> { + @Override + public String apply(KV input) { + String retval = input.getKey() + ": " + input.getValue(); + System.out.println(retval); + return retval; + } + } + + /** + * A PTransform that converts a PCollection containing lines of text into a PCollection of + * formatted word counts. + */ + public static class CountWords extends PTransform, + PCollection>> { + @Override + public PCollection> apply(PCollection lines) { + + // Convert lines of text into individual words. + PCollection words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection> wordCounts = + words.apply(Count.perElement()); + + return wordCounts; + } + } + + /** + * Options supported by {@link StormWordCount}. + *

+ *

Inherits standard configuration options. + */ + public interface WordCountOptions extends PipelineOptions { + + } + + public static void main(String[] args) { + WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(WordCountOptions.class); + Pipeline p = Pipeline.create(options); + p.apply("Spout", Read.from(new RandomSentenceSource(StringUtf8Coder.of()))) + .apply("Window", Window.into(FixedWindows.of(Duration.standardSeconds(2)))) + .apply("ExtractWords", ParDo.of(new ExtractWordsFn())) + .apply(new CountWords()); + + p.run(); + + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java new file mode 100644 index 00000000000..455d8a2b54f --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java @@ -0,0 +1,58 @@ +package org.apache.storm.beam.translation; + +import avro.shaded.com.google.common.collect.Lists; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PValue; +import org.apache.storm.beam.translation.runtime.GroupByKeyCompleteBolt; +import org.apache.storm.beam.translation.runtime.GroupByKeyInitBolt; + +import java.util.List; + +/** + * Translates a Beam GroupByKey operation into a pair of Storm Bolts with a fields grouping. + * + * TODO: From a Beam perspective this is likely the wrong approach to doing GBK + */ +public class GroupByKeyTranslator implements + TransformTranslator> { + @Override + public void translateNode(GroupByKey transform, TranslationContext context) { + PValue pvFrom = (PValue)context.getCurrentTransform().getInput(); + + PValue pvTo = (PValue)context.getCurrentTransform().getEnclosingNode().getOutput(); + + String from = baseName(pvFrom.getName()); + String to = baseName(pvTo.getName()); + context.activateGBK(to); + String initBolt = from + "_GBK_init"; // first GBK bolt + String completeBolt = from + "_GBK_complete"; + + GroupByKeyInitBolt gbkInit = new GroupByKeyInitBolt(); + GroupByKeyCompleteBolt gbkComplete = new GroupByKeyCompleteBolt(); + + + // from --> initBolt + TranslationContext.Stream stream = new TranslationContext.Stream(from, initBolt, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE)); + context.addStream(stream); + context.addBolt(initBolt, gbkInit); + + // initBolt --> completeBolt + TranslationContext.Grouping fieldsGrouping = new TranslationContext.Grouping(TranslationContext.Grouping.Type.FIELDS); + List fields = Lists.newArrayList(); + fields.add("keyValue"); + fieldsGrouping.setArgs(fields); + context.addBolt(completeBolt, gbkComplete); + stream = new TranslationContext.Stream(initBolt, completeBolt, fieldsGrouping); + context.addStream(stream); + + // completeBolt --> to + stream = new TranslationContext.Stream(completeBolt, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE)); + context.addStream(stream); + } + + + private static String baseName(String str){ + return str.substring(0, str.lastIndexOf(".")); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java new file mode 100644 index 00000000000..671b84dad9b --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java @@ -0,0 +1,51 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.storm.beam.translation.runtime.DoFnBolt; +import org.apache.storm.beam.translation.util.DefaultSideInputReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Translates a ParDo.Bound to a Storm DoFnBolt + */ +public class ParDoBoundTranslator implements + TransformTranslator> { + + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); + + @Override + public void translateNode(ParDo.Bound transform, TranslationContext context) { + DoFn doFn = transform.getFn(); + PCollection output = context.getOutput(); + WindowingStrategy windowingStrategy = output.getWindowingStrategy(); + + DoFnBolt bolt = new DoFnBolt<>(context.getOptions(), doFn, + windowingStrategy, new DefaultSideInputReader()); + + PValue pvFrom = (PValue)context.getCurrentTransform().getInput(); + String from = baseName(pvFrom.getName()); + if(context.isGBKActive()){ + from = context.completeGBK(); + } + LOG.info(baseName(pvFrom.getName())); + + PValue pvTo = (PValue)context.getCurrentTransform().getOutput(); + LOG.info(baseName(pvTo.getName())); + String to = baseName(pvTo.getName()); + + TranslationContext.Stream stream = new TranslationContext.Stream(from, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE)); + + context.addStream(stream); + context.addBolt(baseName(pvTo.getName()), bolt); + } + + private static String baseName(String str){ + return str.substring(0, str.lastIndexOf(".")); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java new file mode 100644 index 00000000000..d3dd478d891 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java @@ -0,0 +1,51 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class StormPipelineTranslator implements Pipeline.PipelineVisitor{ + private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); + private TranslationContext context; + + public StormPipelineTranslator(TranslationContext context){ + this.context = context; + } + + + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + } + + public CompositeBehavior enterCompositeTransform(TransformTreeNode transformTreeNode) { + LOG.info("entering composite translation {}", transformTreeNode.getTransform()); + return CompositeBehavior.ENTER_TRANSFORM; + } + + public void leaveCompositeTransform(TransformTreeNode transformTreeNode) { + LOG.info("leaving composite translation {}", transformTreeNode.getTransform()); + } + + public void visitPrimitiveTransform(TransformTreeNode transformTreeNode) { + LOG.info("visiting transform {}", transformTreeNode.getTransform()); + PTransform transform = transformTreeNode.getTransform(); + LOG.info("class: {}", transform.getClass()); + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + if(translator != null) { + context.setCurrentTransform(transformTreeNode); + translator.translateNode(transformTreeNode.getTransform(), context); + } else { + LOG.warn("No translator found for {}", transform.getClass()); + } + } + + public void visitValue(PValue value, TransformTreeNode transformTreeNode) { + LOG.info("visiting value {}", value); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java new file mode 100644 index 00000000000..cb310601b0b --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java @@ -0,0 +1,11 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. + */ +public interface TransformTranslator { + + void translateNode(Type transform, TranslationContext context); +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java new file mode 100644 index 00000000000..b3920c0ca3c --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java @@ -0,0 +1,186 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.storm.beam.StormPipelineOptions; +import org.apache.storm.topology.IRichSpout; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Maintains the state necessary during Pipeline translation to build a Storm topology. + */ +public class TranslationContext { + private StormPipelineOptions options; + + private TransformTreeNode currentTransform; + + private Map spoutMap = new HashMap(); + + private Map boltMap = new HashMap(); + + private List streams = new ArrayList(); + + public TranslationContext(StormPipelineOptions options){ + this.options = options; + + } + + private String gbkTo = null; + + public StormPipelineOptions getOptions(){ + return this.options; + } + + public void addSpout(String id, IRichSpout spout){ + this.spoutMap.put(id, spout); + } + + public Map getSpouts(){ + return this.spoutMap; + } + + public void addBolt(String id, Object bolt){ + this.boltMap.put(id, bolt); + } + + public Object getBolt(String id){ + return this.boltMap.get(id); + } + + public void addStream(Stream stream){ + this.streams.add(stream); + } + public List getStreams(){ + return this.streams; + } + + public void setCurrentTransform(TransformTreeNode transform){ + this.currentTransform = transform; + } + + public TransformTreeNode getCurrentTransform(){ + return this.currentTransform; + } + + public InputT getInput() { + return (InputT) getCurrentTransform().getInput(); + } + + public OutputT getOutput() { + return (OutputT) getCurrentTransform().getOutput(); + } + + public void activateGBK(String gbkTo){ + this.gbkTo = gbkTo; + } + + public String completeGBK(){ + String gbkTo = this.gbkTo; + this.gbkTo = null; + return gbkTo; + } + + public boolean isGBKActive(){ + return this.gbkTo != null; + } + + + + public static class Stream { + + private String from; + private String to; + private Grouping grouping; + + public Stream(String from, String to, Grouping grouping){ + this.from = from; + this.to = to; + this.grouping = grouping; + } + + public String getTo() { + return to; + } + + public void setTo(String to) { + this.to = to; + } + + public String getFrom() { + return from; + } + + public void setFrom(String from) { + this.from = from; + } + + public Grouping getGrouping() { + return grouping; + } + + public void setGrouping(Grouping grouping) { + this.grouping = grouping; + } + } + + public static class Grouping { + + /** + * Types of stream groupings Storm allows + */ + public static enum Type { + ALL, + CUSTOM, + DIRECT, + SHUFFLE, + LOCAL_OR_SHUFFLE, + FIELDS, + GLOBAL, + NONE + } + + private Type type; + private String streamId; // for named streams, other than DEFAULT + private List args; // arguments for fields grouping + + + public Grouping(Type type){ + this.type = type; + } + + public Grouping(List args){ + this.type = Type.FIELDS; + this.args = args; + } + public List getArgs() { + return args; + } + + public void setArgs(List args) { + this.args = args; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java new file mode 100644 index 00000000000..bdb42f86e8b --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java @@ -0,0 +1,29 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; + +import java.util.HashMap; +import java.util.Map; + +/** + * Lookup table mapping PTransform types to associated TransformTranslator implementations. + */ +public class TranslatorRegistry { + private static final Map, TransformTranslator> TRANSLATORS = new HashMap(); + + static { + TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); + TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslator<>()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator<>()); + } + + static TransformTranslator getTranslator( + PTransform transform) { + return TRANSLATORS.get(transform.getClass()); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java new file mode 100644 index 00000000000..c177a89027f --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java @@ -0,0 +1,21 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.storm.beam.StormPipelineOptions; +import org.apache.storm.beam.translation.runtime.UnboundedSourceSpout; + +/** + * Translates a Read.Unbounded into a Storm spout. + * @param + */ +public class UnboundedSourceTranslator implements TransformTranslator> { + public void translateNode(Read.Unbounded transform, TranslationContext context) { + UnboundedSource source = transform.getSource(); + StormPipelineOptions options = context.getOptions(); + UnboundedSourceSpout spout = new UnboundedSourceSpout(source, options); + + String name = context.getCurrentTransform().getFullName(); + context.addSpout(name, spout); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java new file mode 100644 index 00000000000..be6d1f9092b --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java @@ -0,0 +1,47 @@ +package org.apache.storm.beam.translation; + +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PValue; +import org.apache.storm.beam.translation.runtime.WindowBolt; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Translates a Window.Bound node into a Storm WindowedBolt + * @param + */ +public class WindowBoundTranslator implements TransformTranslator> { + private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class); + + @Override + public void translateNode(Window.Bound transform, TranslationContext context) { + if(transform.getWindowFn() instanceof FixedWindows){ + Duration size = ((FixedWindows) transform.getWindowFn()).getSize(); + + WindowBolt bolt = new WindowBolt(); + bolt.withTumblingWindow(WindowBolt.Duration.seconds((int)size.getStandardSeconds())); + + PValue from = (PValue)context.getCurrentTransform().getInput(); + LOG.info(baseName(from.getName())); + + PValue to = (PValue)context.getCurrentTransform().getOutput(); + LOG.info(baseName(to.getName())); + + TranslationContext.Stream stream = new TranslationContext.Stream(baseName(from.getName()), baseName(to.getName()), new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE)); + + context.addStream(stream); + context.addBolt(baseName(to.getName()), bolt); + + } else { + throw new UnsupportedOperationException("Currently only fixed windows are supported."); + } + } + + + private static String baseName(String str){ + return str.substring(0, str.lastIndexOf(".")); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java new file mode 100644 index 00000000000..3a166e74112 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java @@ -0,0 +1,102 @@ +package org.apache.storm.beam.translation.runtime; + +import com.google.api.client.util.Lists; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.*; +import org.apache.beam.sdk.util.common.Counter; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.storm.beam.StormPipelineOptions; +import org.apache.storm.beam.translation.util.DefaultStepContext; +import org.apache.storm.beam.util.SerializedPipelineOptions; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.*; +import org.apache.storm.tuple.Values; + +import java.util.List; +import java.util.Map; + +/** + * Created by tgoetz on 8/2/16. + */ +public class DoFnBolt extends BaseRichBolt implements DoFnRunners.OutputManager{ + + private transient DoFnRunner runner = null; + + private final TupleTag tupleTag = new TupleTag() {}; + + private transient OutputCollector collector; + + private List> output = Lists.newArrayList(); + + private SerializedPipelineOptions serializedOptions; + private transient StormPipelineOptions pipelineOptions; + + private DoFn doFn; + private WindowingStrategy windowingStrategy; + private SideInputReader sideInputReader; + + public DoFnBolt( + StormPipelineOptions pipelineOptions, + DoFn doFn, + WindowingStrategy windowingStrategy, + SideInputReader sideInputReader){ + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.doFn = doFn; + this.windowingStrategy = windowingStrategy; + this.sideInputReader = sideInputReader; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class); + + Counter counter = Counter.ints("foo", Counter.AggregationKind.SUM); + CounterSet counters = new CounterSet(counter); + + this.runner = new StormDoFnRunner(this.pipelineOptions, this.doFn, this.sideInputReader, this, this.tupleTag, TupleTagList.empty().getAll(), new DefaultStepContext(), counters.getAddCounterMutator(), this.windowingStrategy); + } + + @Override + public void execute(Tuple input) { + System.out.println("Type: " + input.getValue(0).getClass()); + Object value = input.getValue(0); + this.output = Lists.newArrayList(); + this.runner.startBundle(); + if(value instanceof List){ + for(Object o : ((List)value)){ + this.runner.processElement((WindowedValue)o); + } + + } else { + this.runner.processElement((WindowedValue) input.getValue(0)); + } + this.runner.finishBundle(); + +// for(WindowedValue val : this.output){ +// this.collector.emit(input, new Values(val)); +// } + this.collector.emit(new Values(this.output)); + this.collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value")); + } + + + @Override + public void output(TupleTag tupleTag, WindowedValue windowedValue) { + if(this.tupleTag.equals(tupleTag)){ + this.output.add((WindowedValue)windowedValue); + } else { + throw new RuntimeException("Wrong tag"); + } + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java new file mode 100644 index 00000000000..d541709fe72 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java @@ -0,0 +1,40 @@ +package org.apache.storm.beam.translation.runtime; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +/** + * + */ +public class GroupByKeyCompleteBolt extends BaseRichBolt { + private OutputCollector collector; + + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + public GroupByKeyCompleteBolt() { + } + + @Override + public void execute(Tuple input) { + this.collector.emit(input, new Values(input.getValueByField("windowedValue"))); + this.collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value")); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java new file mode 100644 index 00000000000..aaba57e5548 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java @@ -0,0 +1,47 @@ +package org.apache.storm.beam.translation.runtime; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.List; +import java.util.Map; + +/** + * + */ +public class GroupByKeyInitBolt extends BaseRichBolt { + private OutputCollector collector; + + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + public GroupByKeyInitBolt() { + } + + @Override + public void execute(Tuple input) { + + List> values = (List>)input.getValue(0); + for(WindowedValue value : values) { + KV kv = value.getValue(); + Object key = kv.getKey(); + this.collector.emit(input, new Values(key, value)); + this.collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("keyValue", "windowedValue")); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java new file mode 100644 index 00000000000..632fdbf466e --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java @@ -0,0 +1,27 @@ +package org.apache.storm.beam.translation.runtime; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.*; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.values.TupleTag; + +import java.io.Serializable; +import java.util.List; + +/** + * + */ +public class StormDoFnRunner extends SimpleDoFnRunner implements Serializable { + public StormDoFnRunner(PipelineOptions options, + DoFn fn, + SideInputReader sideInputReader, + DoFnRunners.OutputManager outputManager, + TupleTag mainOutputTag, List sideOutputTags, + ExecutionContext.StepContext stepContext, + CounterSet.AddCounterMutator addCounterMutator, + WindowingStrategy windowingStrategy) { + super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, + addCounterMutator, windowingStrategy); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java new file mode 100644 index 00000000000..349576efacd --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java @@ -0,0 +1,109 @@ +package org.apache.storm.beam.translation.runtime; + +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.storm.beam.StormPipelineOptions; +import org.apache.storm.beam.util.SerializedPipelineOptions; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +/** + * Spout implementation that wraps a Beam UnboundedSource + */ +public class UnboundedSourceSpout extends BaseRichSpout{ + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + + private UnboundedSource source; + private transient UnboundedSource.UnboundedReader reader; + private SerializedPipelineOptions serializedOptions; + private transient StormPipelineOptions pipelineOptions; + + private SpoutOutputCollector collector; + + public UnboundedSourceSpout(UnboundedSource source, StormPipelineOptions options){ + this.source = source; + this.serializedOptions = new SerializedPipelineOptions(options); + } + + @Override + public void close() { + super.close(); + try { + this.reader.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void activate() { + super.activate(); + } + + @Override + public void deactivate() { + super.deactivate(); + } + + @Override + public void ack(Object msgId) { + super.ack(msgId); + } + + @Override + public void fail(Object msgId) { + super.fail(msgId); + } + + @Override + public Map getComponentConfiguration() { + return super.getComponentConfiguration(); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value")); + + } + + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + + this.collector = collector; + this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class); + this.reader = this.source.createReader(this.pipelineOptions, null); + this.reader.start(); + } catch (IOException e) { + throw new RuntimeException("Unable to create unbounded reader.", e); + } + + } + + public void nextTuple() { + try { + if(this.reader.advance()){ + Object value = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + Instant watermark = reader.getWatermark(); + + WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + collector.emit(new Values(wv), UUID.randomUUID()); + } + } catch (IOException e) { + throw new RuntimeException("Exception reading values from source.", e); + } + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java new file mode 100644 index 00000000000..4e8a7fb6f1d --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java @@ -0,0 +1,48 @@ +package org.apache.storm.beam.translation.runtime; + +import com.google.common.collect.Lists; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * + */ +public class WindowBolt extends BaseWindowedBolt { + private static final Logger LOG = LoggerFactory.getLogger(WindowBolt.class); + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + super.prepare(stormConf, context, collector); + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + LOG.info("*******************Executing window with size {}", inputWindow.get().size()); + LOG.info("Type: {}", inputWindow.get().getClass()); + List values = Lists.newArrayList(); + for(Tuple t :inputWindow.get()){ + values.add(t.getValue(0)); + } + collector.emit(inputWindow.get(), new Values(values)); + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("window")); + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java new file mode 100644 index 00000000000..bea9e8dd805 --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java @@ -0,0 +1,29 @@ +package org.apache.storm.beam.translation.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + +import javax.annotation.Nullable; +import java.io.Serializable; + +/** + * No-op SideInputReader implementation. + */ +public class DefaultSideInputReader implements SideInputReader, Serializable { + @Nullable + @Override + public T get(PCollectionView pCollectionView, BoundedWindow boundedWindow) { + return null; + } + + @Override + public boolean contains(PCollectionView pCollectionView) { + return false; + } + + @Override + public boolean isEmpty() { + return true; + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java new file mode 100644 index 00000000000..ffdd19bcadd --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java @@ -0,0 +1,51 @@ +package org.apache.storm.beam.translation.util; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +import java.io.IOException; + +/** + * No-op StepContext implementation. + */ +public class DefaultStepContext implements ExecutionContext.StepContext { + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue windowedValue) { + + } + + @Override + public void noteSideOutput(TupleTag tupleTag, WindowedValue windowedValue) { + + } + + @Override + public void writePCollectionViewData(TupleTag tupleTag, Iterable> iterable, Coder>> coder, W w, Coder coder1) throws IOException { + + } + + @Override + public StateInternals stateInternals() { + return null; + } + + @Override + public TimerInternals timerInternals() { + return null; + } +} diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java new file mode 100644 index 00000000000..8b14fedfbca --- /dev/null +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.beam.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private final byte[] serializedOptions; + + /** Lazily initialized copy of deserialized options */ + private transient PipelineOptions pipelineOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + checkNotNull(options, "PipelineOptions must not be null."); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + + } + + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } + } + + return pipelineOptions; + } + +} diff --git a/pom.xml b/pom.xml index a2aec4d630f..82423196e86 100644 --- a/pom.xml +++ b/pom.xml @@ -305,6 +305,7 @@ external/storm-opentsdb external/storm-kafka-monitor external/storm-jms + external/storm-beam examples/storm-starter From 7de6eedb455e3916aebaf561b180dbc9247ee4b6 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Tue, 25 Oct 2016 14:45:43 -0400 Subject: [PATCH 2/3] STORM-2174: Apply ALv2 license headers --- .../storm/beam/RandomSentenceSource.java | 20 +++++++++++++++--- .../storm/beam/StormPipelineOptions.java | 21 +++++++++++++++---- .../org/apache/storm/beam/StormRegistrar.java | 17 +++++++++++++++ .../org/apache/storm/beam/StormRunner.java | 17 +++++++++++++++ .../translation/GroupByKeyTranslator.java | 17 +++++++++++++++ .../translation/ParDoBoundTranslator.java | 18 +++++++++++++++- .../translation/StormPipelineTranslator.java | 17 +++++++++++++++ .../beam/translation/TransformTranslator.java | 17 +++++++++++++++ .../beam/translation/TranslationContext.java | 17 +++++++++++++++ .../beam/translation/TranslatorRegistry.java | 17 +++++++++++++++ .../UnboundedSourceTranslator.java | 17 +++++++++++++++ .../translation/WindowBoundTranslator.java | 17 +++++++++++++++ .../beam/translation/runtime/DoFnBolt.java | 19 ++++++++++++++++- .../runtime/GroupByKeyCompleteBolt.java | 17 +++++++++++++++ .../runtime/GroupByKeyInitBolt.java | 17 +++++++++++++++ .../translation/runtime/StormDoFnRunner.java | 17 +++++++++++++++ .../runtime/UnboundedSourceSpout.java | 17 +++++++++++++++ .../beam/translation/runtime/WindowBolt.java | 21 +++++++++++++++---- .../util/DefaultSideInputReader.java | 17 +++++++++++++++ .../translation/util/DefaultStepContext.java | 17 +++++++++++++++ 20 files changed, 341 insertions(+), 13 deletions(-) diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java index 1c87e5472b6..d15d88aa5c1 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam; import org.apache.beam.sdk.coders.Coder; @@ -11,9 +28,6 @@ import java.util.List; import java.util.NoSuchElementException; -/** - * Created by tgoetz on 7/28/16. - */ public class RandomSentenceSource extends UnboundedSource { private final Coder coder; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java index 8c710ab84ce..c5b7caad16e 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java @@ -1,12 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.PipelineOptions; -/** - * Created by tgoetz on 7/27/16. - */ public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions { - } diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java index 0e102715bc2..d014018dd03 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam; import com.google.auto.service.AutoService; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java index cc923494963..f7638c42203 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java index 455d8a2b54f..2f860015f9d 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import avro.shaded.com.google.common.collect.Lists; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java index 671b84dad9b..9f14dfac44a 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java @@ -1,6 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java index d3dd478d891..af33de04ebc 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import org.apache.beam.sdk.Pipeline; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java index cb310601b0b..55587257a54 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import org.apache.beam.sdk.transforms.PTransform; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java index b3920c0ca3c..2d209c78ee4 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import org.apache.beam.sdk.runners.TransformTreeNode; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java index bdb42f86e8b..4a23a50cb35 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import org.apache.beam.sdk.io.Read; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java index c177a89027f..912b43cb6c1 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import org.apache.beam.sdk.io.Read; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java index be6d1f9092b..391f0b50e51 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation; import org.apache.beam.sdk.transforms.windowing.FixedWindows; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java index 3a166e74112..94d6449af3d 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.runtime; import com.google.api.client.util.Lists; @@ -21,7 +38,7 @@ import java.util.Map; /** - * Created by tgoetz on 8/2/16. + * */ public class DoFnBolt extends BaseRichBolt implements DoFnRunners.OutputManager{ diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java index d541709fe72..2ff17877cf5 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.runtime; import org.apache.beam.sdk.util.WindowedValue; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java index aaba57e5548..8a1ab5ddf8f 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.runtime; import org.apache.beam.sdk.util.WindowedValue; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java index 632fdbf466e..8cf06cf4935 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.runtime; import org.apache.beam.sdk.options.PipelineOptions; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java index 349576efacd..fa85af8e4ef 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.runtime; import org.apache.beam.sdk.io.UnboundedSource; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java index 4e8a7fb6f1d..f3df4b9cc3c 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.runtime; import com.google.common.collect.Lists; @@ -15,9 +32,6 @@ import java.util.List; import java.util.Map; -/** - * - */ public class WindowBolt extends BaseWindowedBolt { private static final Logger LOG = LoggerFactory.getLogger(WindowBolt.class); @@ -32,7 +46,6 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(TupleWindow inputWindow) { LOG.info("*******************Executing window with size {}", inputWindow.get().size()); - LOG.info("Type: {}", inputWindow.get().getClass()); List values = Lists.newArrayList(); for(Tuple t :inputWindow.get()){ values.add(t.getValue(0)); diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java index bea9e8dd805..12dd6e9ed1e 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.util; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java index ffdd19bcadd..bbdb1d0ec97 100644 --- a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java +++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.beam.translation.util; import org.apache.beam.sdk.coders.Coder; From 6516defc0e83c12a4aaf147b3c81bc74ec364f72 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Tue, 25 Oct 2016 14:58:37 -0400 Subject: [PATCH 3/3] STORM-2174: update beam dependency to released version 0.2.0-incubating --- external/storm-beam/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-beam/pom.xml b/external/storm-beam/pom.xml index 465400558a3..f0ae23fdf29 100644 --- a/external/storm-beam/pom.xml +++ b/external/storm-beam/pom.xml @@ -31,7 +31,7 @@ Apache Storm Beam Runner - 0.2.0-incubating-SNAPSHOT + 0.2.0-incubating