Scalding Runner for Google Dataflow
Scala Java
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
project
src/main
.gitignore
README.md
kinglear.txt

README.md

Build Status

scalding-dataflow

Scalding Runner for Google Dataflow SDK. This project is a WIP, try it at your own risk.

Usage

You can use it in your own SBT projects

built.sbt

resolvers += Resolver.sonatypeRepo("snapshots")

// For more updated version check out the last run version of Build pipeline
libraryDependencies += "in.ashwanthkumar" %% "scalding-dataflow" % "1.0.23-SNAPSHOT"

pom.xml

  <dependency>
    <groupId>in.ashwanthkumar</groupId>
    <artifactId>scalding-dataflow_2.10</artifactId>
    <!-- For more updated version check out the last run version of Build pipeline -->
    <version>1.0.23</version>
  </dependency>

  ....

  <repositories>
    <repository>
      <id>oss.sonatype.org-snapshot</id>
      <url>http://oss.sonatype.org/content/repositories/snapshots</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

Pass the following options to the program (WordCount) when running it

--runner=ScaldingPipelineRunner --name=Main-Test --mode=local

  PipelineOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .create();
  Pipeline pipeline = Pipeline.create(options);

  pipeline.apply(TextIO.Read.from("kinglear.txt").named("Source"))
    .apply(Count.<String>perElement())
    .apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
      @Override
      public void processElement(ProcessContext c) throws Exception {
        KV<String, Long> kv = c.element();
        c.output(String.format("%s\t%d", kv.getKey(), kv.getValue()));
      }
    }))
    .apply(TextIO.Write.to("out.txt").named("Sink"));

  pipeline.run();

If you want to run it on HDFS (experimental), change the mode=local to mode=hdfs

Todos

Translators

  • ParDo.Bound
  • Filter
  • Keys
  • Values
  • KvSwap
  • ParDo.Bound with sideInputs
  • Combine
  • Flatten
  • ParDo.BoundMulti
  • Combine.GroupedValues
  • Combine.PerKey
  • View.AsSingleton
  • View.AsIterable
  • Window.Bound

IO

  • Text
  • Custom Cascading Scheme
  • Iterable of Items
  • Google SDK's Coder for SerDe

Scalding

  • Move to TypedPipes
  • Test it on Hadoop Mode