Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial public release

  • Loading branch information...
commit 667e2ded5d456441a949144256ba9f2f8d558111 0 parents
@jwills jwills authored
Showing with 43,487 additions and 0 deletions.
  1. +4 −0 .gitignore
  2. +72 −0 LICENSE.txt
  3. +4 −0 NOTICE.txt
  4. +293 −0 README.md
  5. +55 −0 examples/pom.xml
  6. +55 −0 examples/src/main/java/com/cloudera/crunch/examples/WordCount.java
  7. +146 −0 pom.xml
  8. +217 −0 src/main/java/com/cloudera/crunch/CombineFn.java
  9. +25 −0 src/main/java/com/cloudera/crunch/DoCollection.java
  10. +123 −0 src/main/java/com/cloudera/crunch/DoFn.java
  11. +35 −0 src/main/java/com/cloudera/crunch/Emitter.java
  12. +129 −0 src/main/java/com/cloudera/crunch/FilterFn.java
  13. +129 −0 src/main/java/com/cloudera/crunch/GroupingOptions.java
  14. +39 −0 src/main/java/com/cloudera/crunch/MapFn.java
  15. +110 −0 src/main/java/com/cloudera/crunch/PCollection.java
  16. +37 −0 src/main/java/com/cloudera/crunch/PGroupedTable.java
  17. +66 −0 src/main/java/com/cloudera/crunch/PTable.java
  18. +57 −0 src/main/java/com/cloudera/crunch/Pair.java
  19. +78 −0 src/main/java/com/cloudera/crunch/Pipeline.java
  20. +48 −0 src/main/java/com/cloudera/crunch/Source.java
  21. +24 −0 src/main/java/com/cloudera/crunch/SourceTarget.java
  22. +31 −0 src/main/java/com/cloudera/crunch/TableSource.java
  23. +26 −0 src/main/java/com/cloudera/crunch/Target.java
  24. +47 −0 src/main/java/com/cloudera/crunch/Tuple.java
  25. +61 −0 src/main/java/com/cloudera/crunch/Tuple3.java
  26. +69 −0 src/main/java/com/cloudera/crunch/Tuple4.java
  27. +37 −0 src/main/java/com/cloudera/crunch/TupleN.java
  28. +36 −0 src/main/java/com/cloudera/crunch/fn/IdentityFn.java
  29. +28 −0 src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
  30. +28 −0 src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
  31. +38 −0 src/main/java/com/cloudera/crunch/fn/PairMapFn.java
  32. +170 −0 src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
  33. +68 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImpl.java
  34. +82 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/DoTableImpl.java
  35. +64 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/InputCollection.java
  36. +67 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/InputTable.java
  37. +128 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/PCollectionImpl.java
  38. +119 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/PGroupedTableImpl.java
  39. +61 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/PTableBase.java
  40. +78 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/UnionCollection.java
  41. +91 −0 src/main/java/com/cloudera/crunch/impl/mr/collect/UnionTable.java
  42. +64 −0 src/main/java/com/cloudera/crunch/impl/mr/emit/CombineFnEmitter.java
  43. +46 −0 src/main/java/com/cloudera/crunch/impl/mr/emit/IntermediateEmitter.java
  44. +55 −0 src/main/java/com/cloudera/crunch/impl/mr/emit/MultipleOutputEmitter.java
  45. +50 −0 src/main/java/com/cloudera/crunch/impl/mr/emit/OutputEmitter.java
  46. +99 −0 src/main/java/com/cloudera/crunch/impl/mr/exec/CrunchJob.java
  47. +61 −0 src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
  48. +146 −0 src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
  49. +192 −0 src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
  50. +68 −0 src/main/java/com/cloudera/crunch/impl/mr/plan/MSCROutputHandler.java
  51. +338 −0 src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
  52. +103 −0 src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
  53. +22 −0 src/main/java/com/cloudera/crunch/impl/mr/plan/PlanningParameters.java
  54. +80 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
  55. +119 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
  56. +82 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
  57. +61 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchMapper.java
  58. +75 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRecordReader.java
  59. +58 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchReducer.java
  60. +22 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
  61. +66 −0 src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
  62. +30 −0 src/main/java/com/cloudera/crunch/impl/mr/run/NodeContext.java
  63. +120 −0 src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
  64. +70 −0 src/main/java/com/cloudera/crunch/impl/mr/run/RTNodeSerializer.java
  65. +29 −0 src/main/java/com/cloudera/crunch/impl/mr/run/RuntimeParameters.java
  66. +25 −0 src/main/java/com/cloudera/crunch/io/MapReduceTarget.java
  67. +22 −0 src/main/java/com/cloudera/crunch/io/OutputHandler.java
  68. +23 −0 src/main/java/com/cloudera/crunch/io/PathTarget.java
  69. +69 −0 src/main/java/com/cloudera/crunch/io/SourceTargetHelper.java
  70. +89 −0 src/main/java/com/cloudera/crunch/io/avro/AvroFileSourceTarget.java
  71. +86 −0 src/main/java/com/cloudera/crunch/io/avro/AvroFileTarget.java
  72. +103 −0 src/main/java/com/cloudera/crunch/io/seq/SeqFileSourceTarget.java
  73. +107 −0 src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSourceTarget.java
  74. +79 −0 src/main/java/com/cloudera/crunch/io/text/TextFileSourceTarget.java
  75. +71 −0 src/main/java/com/cloudera/crunch/io/text/TextFileTarget.java
  76. +46 −0 src/main/java/com/cloudera/crunch/lib/Aggregate.java
  77. +92 −0 src/main/java/com/cloudera/crunch/lib/Cogroup.java
  78. +104 −0 src/main/java/com/cloudera/crunch/lib/Join.java
  79. +106 −0 src/main/java/com/cloudera/crunch/lib/JoinUtils.java
  80. +30 −0 src/main/java/com/cloudera/crunch/type/Converter.java
  81. +65 −0 src/main/java/com/cloudera/crunch/type/DataBridge.java
  82. +122 −0 src/main/java/com/cloudera/crunch/type/PGroupedTableType.java
  83. +41 −0 src/main/java/com/cloudera/crunch/type/PTableType.java
  84. +61 −0 src/main/java/com/cloudera/crunch/type/PType.java
  85. +70 −0 src/main/java/com/cloudera/crunch/type/PTypeFamily.java
  86. +63 −0 src/main/java/com/cloudera/crunch/type/PTypeUtils.java
  87. +31 −0 src/main/java/com/cloudera/crunch/type/PairConverter.java
  88. +129 −0 src/main/java/com/cloudera/crunch/type/avro/AvroGroupedTableType.java
  89. +58 −0 src/main/java/com/cloudera/crunch/type/avro/AvroInputFormat.java
  90. +41 −0 src/main/java/com/cloudera/crunch/type/avro/AvroKeyConverter.java
  91. +66 −0 src/main/java/com/cloudera/crunch/type/avro/AvroOutputFormat.java
  92. +104 −0 src/main/java/com/cloudera/crunch/type/avro/AvroRecordReader.java
  93. +113 −0 src/main/java/com/cloudera/crunch/type/avro/AvroTableType.java
  94. +161 −0 src/main/java/com/cloudera/crunch/type/avro/AvroType.java
  95. +150 −0 src/main/java/com/cloudera/crunch/type/avro/AvroTypeFamily.java
  96. +265 −0 src/main/java/com/cloudera/crunch/type/avro/Avros.java
  97. +226 −0 src/main/java/com/cloudera/crunch/type/writable/TupleWritable.java
  98. +57 −0 src/main/java/com/cloudera/crunch/type/writable/WritableGroupedTableType.java
  99. +91 −0 src/main/java/com/cloudera/crunch/type/writable/WritableTableType.java
  100. +79 −0 src/main/java/com/cloudera/crunch/type/writable/WritableType.java
  101. +142 −0 src/main/java/com/cloudera/crunch/type/writable/WritableTypeFamily.java
  102. +37 −0 src/main/java/com/cloudera/crunch/type/writable/WritableValueConverter.java
  103. +408 −0 src/main/java/com/cloudera/crunch/type/writable/Writables.java
  104. +339 −0 src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
  105. +295 −0 src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
  106. +459 −0 src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
  107. +128 −0 src/test/java/com/cloudera/crunch/CogroupTest.java
  108. +68 −0 src/test/java/com/cloudera/crunch/CombineFnTest.java
  109. +59 −0 src/test/java/com/cloudera/crunch/FilterFnTest.java
  110. +120 −0 src/test/java/com/cloudera/crunch/JoinTest.java
  111. +147 −0 src/test/java/com/cloudera/crunch/TFIDFTest.java
  112. +83 −0 src/test/java/com/cloudera/crunch/TextPairTest.java
  113. +65 −0 src/test/java/com/cloudera/crunch/TupleTest.java
  114. +89 −0 src/test/java/com/cloudera/crunch/WordCountTest.java
  115. +87 −0 src/test/java/com/cloudera/crunch/type/PTypeUtilsTest.java
  116. +6 −0 src/test/resources/docs.txt
  117. +29,112 −0 src/test/resources/maugham.txt
  118. +3,667 −0 src/test/resources/shakes.txt
4 .gitignore
@@ -0,0 +1,4 @@
+.classpath
+.project
+.settings
+target
72 LICENSE.txt
@@ -0,0 +1,72 @@
+Apache License
+
+Version 2.0, January 2004
+
+http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
+
+"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
+
+"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
+
+"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
+
+"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
+
+"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
+
+"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
+
+"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
+
+"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
+
+"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
+
+You must give any other recipients of the Work or Derivative Works a copy of this License; and
+
+You must cause any modified files to carry prominent notices stating that You changed the files; and
+
+You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
+
+If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work
+To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
4 NOTICE.txt
@@ -0,0 +1,4 @@
+Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
293 README.md
@@ -0,0 +1,293 @@
+# Crunch - Simple and Efficient Java Library for MapReduce Pipelines
+
+## Introduction
+
+Crunch is a Java library for writing, testing, and running MapReduce pipelines, based on
+Google's FlumeJava. Its goal is to make pipelines that are composed of many user-defined
+functions simple to write, easy to test, and efficient to run.
+
+## Build and Installation
+
+Crunch uses Maven for dependency management. The code in the examples/ subdirectory relies
+on the top-level crunch libraries. In order to execute the included WordCount application, run:
+
+ mvn install
+ cd examples/
+ mvn package
+ hadoop jar target/crunch-examples-0.1.0-jar-with-dependencies.jar com.cloudera.crunch.examples.WordCount <input> <output>
+
+## High Level Concepts
+
+### Data Model and Operators
+
+Crunch is centered around three interfaces that represent distributed datasets: `PCollection<T>`, `PTable<K, V>`, and `PGroupedTable<K, V>`.
+
+A `PCollection<T>` represents a distributed, unordered collection of elements of type T. For example, we represent a text file in Crunch as a
+`PCollection<String>` object. PCollection provides a method, `parallelDo`, that applies a function to each element in a PCollection in parallel,
+and returns a new PCollection as its result.
+
+A `PTable<K, V>` is a sub-interface of PCollection that represents a distributed, unordered multimap of its key type K to its value type V.
+In addition to the parallelDo operation, PTable provides a `groupByKey` operation that aggregates all of the values in the PTable that
+have the same key into a single record. It is the groupByKey operation that triggers the sort phase of a MapReduce job.
+
+The result of a groupByKey operation is a `PGroupedTable<K, V>` object, which is a distributed, sorted map of keys of type K to an Iterable
+collection of values of type V. In addition to parallelDo, the PGroupedTable provides a `combineValues` operation, which allows for
+a commutative and associative aggregation operator to be applied to the values of the PGroupedTable instance on both the map side and the
+reduce side of a MapReduce job.
+
+Finally, PCollection, PTable, and PGroupedTable all support a `union` operation, which takes a series of distinct PCollections and treats
+them as a single, virtual PCollection. The union operator is required for operations that combine multiple inputs, such as cogroups and
+joins.
+
+### Pipeline Building and Execution
+
+Every Crunch pipeline starts with a `Pipeline` object that is used to coordinate building the pipeline and executing the underlying MapReduce
+jobs. For efficiency, Crunch uses lazy evaluation, so it will only construct MapReduce jobs from the different stages of the pipelines when
+the Pipeline object's `run` or `done` methods are called.
+
+## A Detailed Example
+
+Here is the classic WordCount application using Crunch:
+
+ import com.cloudera.crunch.DoFn;
+ import com.cloudera.crunch.Emitter;
+ import com.cloudera.crunch.PCollection;
+ import com.cloudera.crunch.PTable;
+ import com.cloudera.crunch.Pipeline;
+ import com.cloudera.crunch.impl.mr.MRPipeline;
+ import com.cloudera.crunch.lib.Aggregate;
+ import com.cloudera.crunch.type.writable.Writables;
+
+ public class WordCount {
+ public static void main(String[] args) throws Exception {
+ Pipeline pipeline = new MRPipeline(WordCount.class);
+ PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+ PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() {
+ public void process(String line, Emitter<String> emitter) {
+ for (String word : line.split("\\s+")) {
+ emitter.emit(word);
+ }
+ }
+ }, Writables.strings());
+
+ PTable<String, Long> counts = Aggregate.count(words);
+
+ pipeline.writeTextFile(counts, args[1]);
+ pipeline.run();
+ }
+ }
+
+Let's walk through the example line by line.
+
+### Step 1: Creating a Pipeline and referencing a text file
+
+The `MRPipeline` implementation of the Pipeline interface compiles the individual stages of a
+pipeline into a series of MapReduce jobs. The MRPipeline constructor takes a class argument
+that is used to tell Hadoop where to find the code that is used in the pipeline execution.
+
+We now need to tell the Pipeline about the inputs it will be consuming. The Pipeline interface
+defines a `readTextFile` method that takes in a String and returns a PCollection of Strings.
+In addition to text files, Crunch supports reading data from SequenceFiles and Avro container files,
+via the `SequenceFileSource` and `AvroFileSource` classes defined in the com.cloudera.crunch.io package.
+
+Note that each PCollection is a _reference_ to a source of data- no data is actually loaded into a
+PCollection on the client machine.
+
+### Step 2: Splitting the lines of text into words
+
+Crunch defines a small set of primitive operations that can be composed in order to build complex data
+pipelines. The first of these primitives is the `parallelDo` function, which applies a function (defined
+by a subclass of `DoFn`) to every record in a PCollection, and returns a new PCollection that contains
+the results.
+
+The first argument to parallelDo is a string that is used to identify this step in the pipeline. When
+a pipeline is composed into a series of MapReduce jobs, it is often the case that multiple stages will
+run within the same Mapper or Reducer. Having a string that identifies each processing step is useful
+for debugging errors that occur in a running pipeline.
+
+The second argument to parallelDo is an anonymous subclass of DoFn. Each DoFn subclass must override
+the `process` method, which takes in a record from the input PCollection and an `Emitter` object that
+may have any number of output values written to it. In this case, our DoFn splits each lines up into
+words, using a blank space as a separator, and emits the words from the split to the output PCollection.
+
+The last argument to parallelDo is an instance of the `PType` interface, which specifies how the data
+in the output PCollection is serialized. While Crunch takes advantage of Java Generics to provide
+compile-time type safety, the generic type information is not available at runtime. Crunch needs to know
+how to map the records stored in each PCollection into a Hadoop-supported serialization format in order
+to read and write data to disk. Two serialization implementations are supported in crunch via the
+`PTypeFamily` interface: a Writable-based system that is defined in the com.cloudera.crunch.type.writable
+package, and an Avro-based system that is defined in the com.cloudera.crunch.type.avro package. Each
+implementation provides convenience methods for working with the common PTypes (Strings, longs, bytes, etc.)
+as well as utility methods for creating PTypes from existing Writable classes or Avro schemas.
+
+### Step 3: Counting the words
+
+Out of Crunch's simple primitive operations, we can build arbitrarily complex chains of operations in order
+to perform higher-level operations, like aggregations and joins, that can work on any type of input data.
+Let's look at the implementation of the `Aggregate.count` function:
+
+ package com.cloudera.crunch.lib;
+
+ import com.cloudera.crunch.CombineFn;
+ import com.cloudera.crunch.MapFn;
+ import com.cloudera.crunch.PCollection;
+ import com.cloudera.crunch.PTable;
+ import com.cloudera.crunch.Pair;
+ import com.cloudera.crunch.type.PTypeFamily;
+
+ public class Aggregate {
+
+ private static class Counter<S> extends MapFn<S, Pair<S, Long>> {
+ public Pair<S, Long> map(S input) {
+ return Pair.of(input, 1L);
+ }
+ }
+
+ public static <S> PTable<S, Long> count(PCollection<S> collect) {
+ PTypeFamily tf = collect.getTypeFamily();
+
+ // Create a PTable from the PCollection by mapping each element
+ // to a key of the PTable with the value equal to 1L
+ PTable<S, Long> withCounts = collect.parallelDo("count:" + collect.getName(),
+ new Counter<S>(), tf.tableOf(collect.getPType(), tf.longs()));
+
+ // Group the records of the PTable based on their key.
+ PGroupedTable<S, Long> grouped = withCounts.groupByKey();
+
+ // Sum the 1L values associated with the keys to get the
+ // count of each element in this PCollection, and return it
+ // as a PTable so that it may be processed further or written
+ // out for storage.
+ return grouped.combineValues(CombineFn.<S>SUM_LONGS());
+ }
+ }
+
+First, we get the PTypeFamily that is associated with the PType for the collection. The
+call to parallelDo converts each record in this PCollection into a Pair of the input record
+and the number one by extending the `MapFn` convenience subclass of DoFn, and uses the
+`tableOf` method of the PTypeFamily to specify that the returned PCollection should be a
+PTable instance, with the key being the PType of the PCollection and the value being the Long
+implementation for this PTypeFamily.
+
+The next line features the second of Crunch's four operations, `groupByKey`. The groupByKey
+operation may only be applied to a PTable, and returns an instance of the `PGroupedTable`
+interface, which references the grouping of all of the values in the PTable that have the same key.
+The groupByKey operation is what triggers the reduce phase of a MapReduce within Crunch.
+
+The last line in the function returns the output of the third of Crunch's four operations,
+`combineValues`. The combineValues operator takes a `CombineFn` as an argument, which is a
+specialized subclass of DoFn that operates on an implementation of Java's Iterable interface. The
+use of combineValues (as opposed to parallelDo) signals to Crunch that the CombineFn may be used to
+aggregate values for the same key on the map side of a MapReduce job as well as the reduce side.
+
+### Step 4: Writing the output and running the pipeline
+
+The Pipeline object also provides a `writeTextFile` convenience method for indicating that a
+PCollection should be written to a text file. There are also output targets for SequenceFiles and
+Avro container files, available in the com.cloudera.crunch.io package.
+
+After you are finished constructing a pipeline and specifying the output destinations, call the
+pipeline's blocking `run` method in order to compile the pipeline into one or more MapReduce
+jobs and execute them.
+
+## Writing Your Own Pipelines
+
+This section discusses the different steps of creating your own Crunch pipelines in more detail.
+
+### Writing a DoFn
+
+The DoFn class is designed to keep the complexity of the MapReduce APIs out of your way when you
+don't need them while still keeping them accessible when you do.
+
+#### Serialization
+
+First, all DoFn instances are required to be `java.io.Serializable`. This is a key aspect of Crunch's design:
+once a particular DoFn is assigned to the Map or Reduce stage of a MapReduce job, all of the state
+of that DoFn is serialized so that it may be distributed to all of the nodes in the Hadoop cluster that
+will be running that task. There are two important implications of this for developers:
+
+1. All member values of a DoFn must be either serializable or marked as `transient`.
+2. All anonymous DoFn instances must be defined in a static method or in a class that is itself serializable.
+
+Because sometimes you will need to work with non-serializable objects inside of a DoFn, every DoFn provides an
+`initialize` method that is called before the `process` method is ever called so that any initialization tasks,
+such as creating a non-serializable member variable, can be performed before processing begins. Similarly, all
+DoFn instances have a `cleanup` method that may be called after processing has finished to perform any required
+cleanup tasks.
+
+#### Scale Factor
+
+The DoFn class defines a `scaleFactor` method that can be used to signal to the MapReduce compiler that a particular
+DoFn implementation will yield an output PCollection that is larger (scaleFactor > 1) or smaller (0 < scaleFactor < 1)
+than the input PCollection it is applied to. The compiler may use this information to determine how to optimally
+split processing tasks between the Map and Reduce phases of dependent MapReduce jobs.
+
+#### Other Utilities
+
+The DoFn base class provides convenience methods for accessing the `Configuration` and `Counter` objects that
+are associated with a MapReduce stage, so that they may be accessed during initialization, processing, and cleanup.
+
+### Performing Cogroups and Joins
+
+In Crunch, cogroups and joins are performed on PTable instances that have the same key type. This section walks through
+the basic flow of a cogroup operation, explaining how this higher-level operation is composed of Crunch's four primitives.
+In general, these common operations are provided as part of the core Crunch library or in extensions, you do not need
+to write them yourself. But it can be useful to understand how they work under the covers.
+
+Assume we have a `PTable<K, U>` named "a" and a different `PTable<K, V>` named "b" that we would like to combine into a
+single `PTable<K, Pair<Collection<U>, Collection<V>>>`. First, we need to apply parallelDo operations to a and b that
+convert them into the same Crunch type, `PTable<K, Pair<U, V>>`:
+
+ // Perform the "tagging" operation as a parallelDo on PTable a
+ PTable<K, Pair<U, V>> aPrime = a.parallelDo("taga", new MapFn<Pair<K, U>, Pair<K, Pair<U, V>>>() {
+ public Pair<K, Pair<U, V>> map(Pair<K, U> input) {
+ return Pair.of(input.first(), Pair.of(input.second(), null));
+ }
+ }, tableOf(a.getKeyType(), pair(a.getValueType(), b.getValueType())));
+
+ // Perform the "tagging" operation as a parallelDo on PTable b
+ PTable<K, Pair<U, V>> bPrime = b.parallelDo("tagb", new MapFn<Pair<K, V>, Pair<K, Pair<U, V>>>() {
+ public Pair<K, Pair<U, V>> map(Pair<K, V> input) {
+ return Pair.of(input.first(), Pair.of(null, input.second()));
+ }
+ }, tableOf(a.getKeyType(), pair(a.getValueType(), b.getValueType())));
+
+Once the input PTables are tagged into a single type, we can apply the union operation to create a single PTable
+reference that includes both of the tagged PTables and then group the unioned PTable by the common key:
+
+ PTable<K, Pair<U, V>> both = aPrime.union(bPrime);
+ PGroupedTable<K, Pair<U, V>> grouped = both.groupByKey();
+
+The grouping operation will create an `Iterable<Pair<U, V>>` which we can then convert to a `Pair<Collection<U>, Collection<V>>`:
+
+ grouped.parallelDo("cogroup", new MapFn<Pair<K, Iterable<Pair<U, V>>>, Pair<K, Pair<Collection<U>, Collection<V>>>>() {
+ public Pair<K, Pair<Collection<U>, Collection<V>>> map(Pair<K, Iterable<Pair<U, V>>> input) {
+ Collection<U> uValues = new ArrayList<U>();
+ Collection<V> vValues = new ArrayList<V>();
+ for (Pair<U, V> pair : input.second()) {
+ if (pair.first() != null) {
+ uValues.add(pair.first());
+ } else {
+ vValues.add(pair.second());
+ }
+ }
+ return Pair.of(input.first(), Pair.of(uValues, vValues));
+ },
+ }, tableOf(grouped.getKeyType(), pair(collections(a.getValueType()), collections(b.getValueType()))));
+
+## Current Limitations and Future Work
+
+This section contains an almost certainly incomplete list of known limitations of Crunch and plans for future work.
+
+* The Avro-based type system doesn't have robust support for multiple inputs/outputs that use different Avro schemas, so
+joins based on Avro files with different data schemas may not work.
+* We're currently missing support for MapReduce jobs over HBase. We would also like to have easy support for reading and
+writing data from/to HCatalog.
+* The decision of how to split up processing tasks between dependent MapReduce jobs is very naiive right now- we simply
+delegate all of the work to the reduce stage of the predecessor job. We should take advantage of information about the
+expected size of different PCollections to optimize this processing.
+* The Crunch optimizer does not yet merge different groupByKey operations that run over the same input data into a single
+MapReduce job. Implementing this optimization will provide a major performance benefit for a number of problems.
+* We would like to be able to "materialize" a PCollection into an in-memory Collection on the client. This can be done now,
+but it requires work by the client.
55 examples/pom.xml
@@ -0,0 +1,55 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.cloudera.crunch</groupId>
+ <artifactId>crunch-examples</artifactId>
+ <packaging>jar</packaging>
+ <version>0.1.0</version>
+ <name>crunch-examples</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>r09</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.cloudera.crunch</groupId>
+ <artifactId>crunch</artifactId>
+ <version>0.1.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2.1</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
55 examples/src/main/java/com/cloudera/crunch/examples/WordCount.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.examples;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.lib.Aggregate;
+import com.cloudera.crunch.type.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.Serializable;
+
+public class WordCount extends Configured implements Tool, Serializable {
+ public int run(String[] args) throws Exception {
+ Pipeline pipeline = new MRPipeline(WordCount.class);
+ PCollection<String> words = pipeline.readTextFile(args[0]);
+
+ PTable<String, Long> counts = Aggregate.count(words.parallelDo("split",
+ new DoFn<String, String>() {
+ @Override
+ public void process(String line, Emitter<String> emitter) {
+ for (String word : line.split("\\s+")) {
+ emitter.emit(word);
+ }
+ }
+ }, Writables.strings()));
+
+ pipeline.writeTextFile(counts, args[1]);
+ pipeline.run();
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new WordCount(), args);
+ }
+}
146 pom.xml
@@ -0,0 +1,146 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.cloudera.crunch</groupId>
+ <artifactId>crunch</artifactId>
+ <packaging>jar</packaging>
+ <version>0.1.0</version>
+ <name>crunch</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>r09</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2-cdh3u1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.5.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>1.5.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.7.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.7.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.15</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>maven-hadoop</id>
+ <name>Hadoop Releases</name>
+ <url>https://repository.cloudera.com/content/repositories/releases/</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.10</version>
+ <configuration>
+ <argLine>-Xmx512m</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.1.2</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>cobertura-maven-plugin</artifactId>
+ <version>2.5.1</version>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
217 src/main/java/com/cloudera/crunch/CombineFn.java
@@ -0,0 +1,217 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A special {@link DoFn} implementation that converts an {@link Iterable}
+ * of values into a single value. If a {@code CombineFn} instance is used
+ * on a {@link PGroupedTable}, the function will be applied to the output
+ * of the map stage before the data is passed to the reducer, which can
+ * improve the runtime of certain classes of jobs.
+ *
+ */
+public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
+
+ /**
+ * The associative and communative combiner function.
+ *
+ * @param values the values to combine
+ * @return the combined result of the inputs
+ */
+ public abstract T combine(Iterable<T> values);
+
+ @Override
+ public void process(Pair<S, Iterable<T>> input, Emitter<Pair<S, T>> emitter) {
+ emitter.emit(Pair.of(input.first(), combine(input.second())));
+ }
+
+ @Override
+ public float scaleFactor() {
+ return 0.9f;
+ }
+
+ public static final <K> CombineFn<K, Long> SUM_LONGS() {
+ return new CombineFn<K, Long>() {
+ @Override
+ public Long combine(Iterable<Long> values) {
+ long sum = 0L;
+ for (Long v : values) {
+ sum += v;
+ }
+ return sum;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Integer> SUM_INTS() {
+ return new CombineFn<K, Integer>() {
+ @Override
+ public Integer combine(Iterable<Integer> values) {
+ int sum = 0;
+ for (Integer v : values) {
+ sum += v;
+ }
+ return sum;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Float> SUM_FLOATS() {
+ return new CombineFn<K, Float>() {
+ @Override
+ public Float combine(Iterable<Float> values) {
+ float sum = 0f;
+ for (Float v : values) {
+ sum += v;
+ }
+ return sum;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
+ return new CombineFn<K, Double>() {
+ @Override
+ public Double combine(Iterable<Double> values) {
+ double sum = 0;
+ for (Double v : values) {
+ sum += v;
+ }
+ return sum;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Long> MAX_LONGS() {
+ return new CombineFn<K, Long>() {
+ @Override
+ public Long combine(Iterable<Long> values) {
+ Long max = null;
+ for (Long value : values) {
+ if (max == null || value > max) {
+ max = value;
+ }
+ }
+ return max;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Integer> MAX_INTS() {
+ return new CombineFn<K, Integer>() {
+ @Override
+ public Integer combine(Iterable<Integer> values) {
+ Integer max = null;
+ for (Integer value : values) {
+ if (max == null || value > max) {
+ max = value;
+ }
+ }
+ return max;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Float> MAX_FLOATS() {
+ return new CombineFn<K, Float>() {
+ @Override
+ public Float combine(Iterable<Float> values) {
+ Float max = null;
+ for (Float value : values) {
+ if (max == null || value > max) {
+ max = value;
+ }
+ }
+ return max;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
+ return new CombineFn<K, Double>() {
+ @Override
+ public Double combine(Iterable<Double> values) {
+ Double max = null;
+ for (Double value : values) {
+ if (max == null || value > max) {
+ max = value;
+ }
+ }
+ return max;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Long> MIN_LONGS() {
+ return new CombineFn<K, Long>() {
+ @Override
+ public Long combine(Iterable<Long> values) {
+ Long min = null;
+ for (Long value : values) {
+ if (min == null || value < min) {
+ min = value;
+ }
+ }
+ return min;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Integer> MIN_INTS() {
+ return new CombineFn<K, Integer>() {
+ @Override
+ public Integer combine(Iterable<Integer> values) {
+ Integer min = null;
+ for (Integer value : values) {
+ if (min == null || value < min) {
+ min = value;
+ }
+ }
+ return min;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Float> MIN_FLOATS() {
+ return new CombineFn<K, Float>() {
+ @Override
+ public Float combine(Iterable<Float> values) {
+ Float min = null;
+ for (Float value : values) {
+ if (min == null || value < min) {
+ min = value;
+ }
+ }
+ return min;
+ }
+ };
+ }
+
+ public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
+ return new CombineFn<K, Double>() {
+ @Override
+ public Double combine(Iterable<Double> values) {
+ Double min = null;
+ for (Double value : values) {
+ if (min == null || value < min) {
+ min = value;
+ }
+ }
+ return min;
+ }
+ };
+ }
+}
25 src/main/java/com/cloudera/crunch/DoCollection.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * An interface for subclasses of PCollection that have a backing
+ * {@link DoFn}.
+ *
+ */
+public interface DoCollection<T> extends PCollection<T> {
+ DoFn<Object, T> getDoFn();
+}
123 src/main/java/com/cloudera/crunch/DoFn.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Base class for all data processing functions in Crunch.
+ *
+ * <p>Note that all {@code DoFn} instances implement {@link Serializable},
+ * and thus all of their non-transient member variables must implement
+ * {@code Serializable} as well. If your DoFn depends on non-serializable
+ * classes for data processing, they may be declared as {@code transient}
+ * and initialized in the DoFn's {@code initialize} method.
+ *
+ */
+public abstract class DoFn<S, T> implements Serializable {
+
+ private TaskInputOutputContext<?, ?, ?, ?> context;
+
+ /**
+ * Called during the job planning phase. Subclasses may override
+ * this method in order to modify the configuration of the Job
+ * that this DoFn instance belongs to.
+ *
+ * @param conf The Configuration instance for the Job.
+ */
+ public void configure(Configuration conf) {
+ }
+
+ /**
+ * Processes the records from a {@link PCollection}.
+ *
+ * @param input The input record
+ * @param emitter The emitter to send the output to
+ */
+ public abstract void process(S input, Emitter<T> emitter);
+
+ /**
+ * Called during the setup of the MapReduce job this {@code DoFn}
+ * is associated with. Subclasses may override this method to
+ * do appropriate initialization.
+ */
+ public void initialize() {
+ }
+
+ /**
+ * Called during the cleanup of the MapReduce job this {@code DoFn}
+ * is associated with. Subclasses may override this method to do
+ * appropriate cleanup.
+ */
+ public void cleanup() {
+ }
+
+ /**
+ * Called during setup to pass the {@link TaskInputOutputContext} to
+ * this {@code DoFn} instance.
+ */
+ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ this.context = context;
+ initialize();
+ }
+
+ /**
+ * Returns an estimate of how applying this function to a {@link PCollection}
+ * will cause it to change in side. The optimizer uses these estimates to
+ * decide where to break up dependent MR jobs into separate Map and Reduce
+ * phases in order to minimize I/O.
+ *
+ * <p>
+ * Subclasses of {@code DoFn} that will substantially alter the size of the
+ * resulting {@code PCollection} should override this method.
+ */
+ public float scaleFactor() {
+ return 1.2f;
+ }
+
+ protected Configuration getConfiguration() {
+ return context.getConfiguration();
+ }
+
+ protected Counter getCounter(Enum<?> counterName) {
+ return context.getCounter(counterName);
+ }
+
+ protected Counter getCounter(String groupName, String counterName) {
+ return context.getCounter(groupName, counterName);
+ }
+
+ protected void progress() {
+ context.progress();
+ }
+
+ protected TaskAttemptID getTaskAttemptID() {
+ return context.getTaskAttemptID();
+ }
+
+ protected void setStatus(String status) {
+ context.setStatus(status);
+ }
+
+ protected String getStatus() {
+ return context.getStatus();
+ }
+}
35 src/main/java/com/cloudera/crunch/Emitter.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * Interface for writing outputs from a {@link DoFn}.
+ *
+ */
+public interface Emitter<T> {
+ /**
+ * Write the emitted value to the next stage of the pipeline.
+ *
+ * @param emitted The value to write
+ */
+ void emit(T emitted);
+
+ /**
+ * Flushes any values cached by this emitter. Called during the
+ * cleanup stage.
+ */
+ void flush();
+}
129 src/main/java/com/cloudera/crunch/FilterFn.java
@@ -0,0 +1,129 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link DoFn} for the common case of filtering the members of
+ * a {@link PCollection} based on a boolean condition.
+ *
+ */
+public abstract class FilterFn<T> extends DoFn<T, T> {
+
+ /**
+ * If true, emit the given record.
+ */
+ public abstract boolean accept(T input);
+
+ @Override
+ public void process(T input, Emitter<T> emitter) {
+ if (accept(input)) {
+ emitter.emit(input);
+ }
+ }
+
+ @Override
+ public float scaleFactor() {
+ return 0.5f;
+ }
+
+ public static <S> FilterFn<S> and(FilterFn<S>...fns) {
+ return new AndFn<S>(fns);
+ }
+
+ public static class AndFn<S> extends FilterFn<S> {
+ private final List<FilterFn<S>> fns;
+
+ public AndFn(FilterFn<S>... fns) {
+ this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
+ }
+
+ @Override
+ public boolean accept(S input) {
+ for (FilterFn<S> fn : fns) {
+ if (!fn.accept(input)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public float scaleFactor() {
+ float scaleFactor = 1.0f;
+ for (FilterFn<S> fn : fns) {
+ scaleFactor *= fn.scaleFactor();
+ }
+ return scaleFactor;
+ }
+ }
+
+ public static <S> FilterFn<S> or(FilterFn<S>...fns) {
+ return new OrFn<S>(fns);
+ }
+
+ public static class OrFn<S> extends FilterFn<S> {
+ private final List<FilterFn<S>> fns;
+
+ public OrFn(FilterFn<S>... fns) {
+ this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
+ }
+
+ @Override
+ public boolean accept(S input) {
+ for (FilterFn<S> fn : fns) {
+ if (fn.accept(input)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public float scaleFactor() {
+ float scaleFactor = 0.0f;
+ for (FilterFn<S> fn : fns) {
+ scaleFactor += fn.scaleFactor();
+ }
+ return Math.min(1.0f, scaleFactor);
+ }
+ }
+
+ public static <S> FilterFn<S> not(FilterFn<S> fn) {
+ return new NotFn<S>(fn);
+ }
+
+ public static class NotFn<S> extends FilterFn<S> {
+ private final FilterFn<S> base;
+
+ public NotFn(FilterFn<S> base) {
+ this.base = base;
+ }
+
+ @Override
+ public boolean accept(S input) {
+ return !base.accept(input);
+ }
+
+ @Override
+ public float scaleFactor() {
+ return 1.0f - base.scaleFactor();
+ }
+ }
+}
129 src/main/java/com/cloudera/crunch/GroupingOptions.java
@@ -0,0 +1,129 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Options that can be passed to a {@code groupByKey} operation in order to exercise
+ * finer control over how the partitioning, grouping, and sorting of keys is
+ * performed.
+ *
+ */
+public class GroupingOptions {
+
+ private static final Log LOG = LogFactory.getLog(GroupingOptions.class);
+
+ private final Class<? extends Partitioner> partitionerClass;
+ private final Class<? extends RawComparator> groupingComparatorClass;
+ private final Class<? extends RawComparator> sortComparatorClass;
+ private final int numReducers;
+
+ private GroupingOptions(Class<? extends Partitioner> partitionerClass,
+ Class<? extends RawComparator> groupingComparatorClass,
+ Class<? extends RawComparator> sortComparatorClass, int numReducers) {
+ this.partitionerClass = partitionerClass;
+ this.groupingComparatorClass = groupingComparatorClass;
+ this.sortComparatorClass = sortComparatorClass;
+ this.numReducers = numReducers;
+ }
+
+ public int getNumReducers() {
+ return numReducers;
+ }
+
+ public void configure(Job job) {
+ if (partitionerClass != null) {
+ job.setPartitionerClass(partitionerClass);
+ }
+ if (groupingComparatorClass != null) {
+ job.setGroupingComparatorClass(groupingComparatorClass);
+ }
+ if (sortComparatorClass != null) {
+ job.setSortComparatorClass(sortComparatorClass);
+ }
+ if (numReducers > 0) {
+ job.setNumReduceTasks(numReducers);
+ LOG.info(String.format("Using %d reduce tasks", numReducers));
+ }
+ }
+
+ public boolean isCompatibleWith(GroupingOptions other) {
+ if (partitionerClass != other.partitionerClass) {
+ return false;
+ }
+ if (groupingComparatorClass != other.groupingComparatorClass) {
+ return false;
+ }
+ if (sortComparatorClass != other.sortComparatorClass) {
+ return false;
+ }
+ return true;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class for creating {@code GroupingOptions} instances.
+ *
+ */
+ public static class Builder {
+ private Class<? extends Partitioner> partitionerClass;
+ private Class<? extends RawComparator> groupingComparatorClass;
+ private Class<? extends RawComparator> sortComparatorClass;
+ private int numReducers;
+
+ public Builder() {
+ }
+
+ public Builder partitionerClass(
+ Class<? extends Partitioner> partitionerClass) {
+ this.partitionerClass = partitionerClass;
+ return this;
+ }
+
+ public Builder groupingComparatorClass(
+ Class<? extends RawComparator> groupingComparatorClass) {
+ this.groupingComparatorClass = groupingComparatorClass;
+ return this;
+ }
+
+ public Builder sortComparatorClass(
+ Class<? extends RawComparator> sortComparatorClass) {
+ this.sortComparatorClass = sortComparatorClass;
+ return this;
+ }
+
+ public Builder numReducers(int numReducers) {
+ if (numReducers <= 0) {
+ throw new IllegalArgumentException("Invalid number of reducers: " + numReducers);
+ }
+ this.numReducers = numReducers;
+ return this;
+ }
+
+ public GroupingOptions build() {
+ return new GroupingOptions(partitionerClass, groupingComparatorClass,
+ sortComparatorClass, numReducers);
+ }
+ }
+}
39 src/main/java/com/cloudera/crunch/MapFn.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A {@link DoFn} for the common case of emitting exactly one value
+ * for each input record.
+ *
+ */
+public abstract class MapFn<S, T> extends DoFn<S, T> {
+
+ /**
+ * Maps the given input into an instance of the output type.
+ */
+ public abstract T map(S input);
+
+ @Override
+ public void process(S input, Emitter<T> emitter) {
+ emitter.emit(map(input));
+ }
+
+ @Override
+ public float scaleFactor() {
+ return 1.0f;
+ }
+}
110 src/main/java/com/cloudera/crunch/PCollection.java
@@ -0,0 +1,110 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+import com.cloudera.crunch.type.PTableType;
+import com.cloudera.crunch.type.PType;
+import com.cloudera.crunch.type.PTypeFamily;
+
+/**
+ * A representation of an immutable, distributed collection of elements
+ * that is the fundamental target of computations in Crunch.
+ *
+ */
+public interface PCollection<S> {
+ /**
+ * Returns the {@code Pipeline} associated with this PCollection.
+ */
+ Pipeline getPipeline();
+
+ /**
+ * Returns a {@code PCollection} instance that acts as the union
+ * of this {@code PCollection} and the input {@code PCollection}s.
+ */
+ PCollection<S> union(PCollection<S>... collections);
+
+ /**
+ * Applies the given doFn to the elements of this {@code PCollection} and
+ * returns a new {@code PCollection} that is the output of this processing.
+ *
+ * @param doFn The {@code DoFn} to apply
+ * @param type The {@link PType} of the resulting {@code PCollection}
+ * @return a new {@code PCollection}
+ */
+ <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);
+
+ /**
+ * Applies the given doFn to the elements of this {@code PCollection} and
+ * returns a new {@code PCollection} that is the output of this processing.
+ *
+ * @param name An identifier for this processing step, useful for debugging
+ * @param doFn The {@code DoFn} to apply
+ * @param type The {@link PType} of the resulting {@code PCollection}
+ * @return a new {@code PCollection}
+ */
+ <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
+
+ /**
+ * Similar to the other {@code parallelDo} instance, but returns a
+ * {@code PTable} instance instead of a {@code PCollection}.
+ *
+ * @param doFn The {@code DoFn} to apply
+ * @param type The {@link PTableType} of the resulting {@code PTable}
+ * @return a new {@code PTable}
+ */
+ <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
+
+ /**
+ * Similar to the other {@code parallelDo} instance, but returns a
+ * {@code PTable} instance instead of a {@code PCollection}.
+ *
+ * @param name An identifier for this processing step
+ * @param doFn The {@code DoFn} to apply
+ * @param type The {@link PTableType} of the resulting {@code PTable}
+ * @return a new {@code PTable}
+ */
+ <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
+ PTableType<K, V> type);
+
+ /**
+ * Write the contents of this {@code PCollection} to the given {@code Target},
+ * using the storage format specified by the target.
+ *
+ * @param target The target to write to
+ */
+ void writeTo(Target target);
+
+ /**
+ * Returns the {@code PType} of this {@code PCollection}.
+ */
+ PType<S> getPType();
+
+ /**
+ * Returns the {@code PTypeFamily} of this {@code PCollection}.
+ */
+ PTypeFamily getTypeFamily();
+
+ /**
+ * Returns the size of the data represented by this {@code PCollection} in bytes.
+ */
+ long getSize();
+
+ /**
+ * Returns a shorthand name for this PCollection.
+ * @return
+ */
+ String getName();
+}
37 src/main/java/com/cloudera/crunch/PGroupedTable.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * The Crunch representation of a grouped {@link PTable}.
+ *
+ */
+public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
+ /**
+ * Combines the values of this grouping using the given {@code CombineFn}.
+ *
+ * @param combineFn The combiner function
+ * @return A {@code PTable} where each key has a single value
+ */
+ PTable<K, V> combineValues(CombineFn<K, V> combineFn);
+
+ /**
+ * Convert this grouping back into a multimap.
+ *
+ * @return an ungrouped version of the data in this {@code PGroupedTable}.
+ */
+ PTable<K, V> ungroup();
+}
66 src/main/java/com/cloudera/crunch/PTable.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+import com.cloudera.crunch.type.PTableType;
+import com.cloudera.crunch.type.PType;
+
+/**
+ * A sub-interface of {@code PCollection} that represents an immutable,
+ * distributed multi-map of keys and values.
+ *
+ */
+public interface PTable<K, V> extends PCollection<Pair<K, V>> {
+ /**
+ * Returns a {@code PTable} instance that acts as the union
+ * of this {@code PTable} and the input {@code PTable}s.
+ */
+ PTable<K, V> union(PTable<K, V>... others);
+
+ /**
+ * Performs a grouping operation on the keys of this table.
+ * @return a {@code PGroupedTable} instance that represents the grouping
+ */
+ PGroupedTable<K, V> groupByKey();
+
+ /**
+ * Performs a grouping operation on the keys of this table, using the given
+ * number of partitions.
+ *
+ * @param numPartitions The number of partitions for the data.
+ * @return a {@code PGroupedTable} instance that represents this grouping
+ */
+ PGroupedTable<K, V> groupByKey(int numPartitions);
+
+ /**
+ * Performs a grouping operation on the keys of this table, using the
+ * additional {@code GroupingOptions} to control how the grouping is
+ * executed.
+ *
+ * @param options The grouping options to use
+ * @return a {@code PGroupedTable} instance that represents the grouping
+ */
+ PGroupedTable<K, V> groupByKey(GroupingOptions options);
+
+ /**
+ * Returns the {@code PTableType} of this {@code PTable}.
+ */
+ PTableType<K, V> getPTableType();
+
+ PType<K> getKeyType();
+
+ PType<V> getValueType();
+}
57 src/main/java/com/cloudera/crunch/Pair.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A convenience class for two-element {@link Tuple}s.
+ */
+public class Pair<K, V> extends Tuple {
+
+ private final K first;
+ private final V second;
+
+ public static <T, U> Pair<T, U> of(T first, U second) {
+ return new Pair<T, U>(first, second);
+ }
+
+ public Pair(K first, V second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ public K first() {
+ return first;
+ }
+
+ public V second() {
+ return second;
+ }
+
+ public Object get(int index) {
+ switch (index) {
+ case 0:
+ return first;
+ case 1:
+ return second;
+ default:
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ }
+
+ public int size() {
+ return 2;
+ }
+}
78 src/main/java/com/cloudera/crunch/Pipeline.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Manages the state of a pipeline execution.
+ *
+ */
+public interface Pipeline {
+
+ /**
+ * Returns the {@code Configuration} instance associated with this pipeline.
+ */
+ Configuration getConfiguration();
+
+ /**
+ * Converts the given {@code Source} into a {@code PCollection} that is
+ * available to jobs run using this {@code Pipeline} instance.
+ *
+ * @param source The source of data
+ * @return A PCollection that references the given source
+ */
+ <T> PCollection<T> read(Source<T> source);
+
+ /**
+ * A version of the read method for {@code TableSource} instances that
+ * map to {@code PTable}s.
+ * @param tableSource The source of the data
+ * @return A PTable that references the given source
+ */
+ <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
+
+ /**
+ * Write the given collection to the given target on the next
+ * pipeline run.
+ *
+ * @param collection The collection
+ * @param target The output target
+ */
+ void write(PCollection<?> collection, Target target);
+
+ /**
+ * Constructs and executes a series of MapReduce jobs in order
+ * to write data to the output targets.
+ */
+ void run();
+
+ /**
+ * Run any remaining jobs required to generate outputs and then
+ * clean up any intermediate data files that were created in
+ * this run or previous calls to {@code run}.
+ */
+ void done();
+
+ /**
+ * A convenience method for reading a text file.
+ */
+ PCollection<String> readTextFile(String pathName);
+
+ /**
+ * A convenience method for writing a text file.
+ */
+ <T> void writeTextFile(PCollection<T> collection, String pathName);
+}
48 src/main/java/com/cloudera/crunch/Source.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.cloudera.crunch.type.PType;
+
+/**
+ * A {@code Source} represents an input data set that is an input to one
+ * or more MapReduce jobs.
+ *
+ */
+public interface Source<T> {
+ /**
+ * Returns the {@code PType} for this source.
+ */
+ PType<T> getType();
+
+ /**
+ * Configure the given job to use this source as an input.
+ *
+ * @param job The job to configure
+ * @param inputId For a multi-input job, an identifier for this input to the job
+ * @throws IOException
+ */
+ void configureSource(Job job, int inputId) throws IOException;
+
+ /**
+ * Returns the number of bytes in this {@code Source}.
+ */
+ long getSize(Configuration configuration);
+}
24 src/main/java/com/cloudera/crunch/SourceTarget.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch;
+
+
+/**
+ * An interface for classes that implement both the {@code Source} and
+ * the {@code Target} interfaces.
+ *
+ */
+public interface SourceTarget<T> extends Source<T>, Target {
+}
31 src/main/java/com/cloudera/crunch/TableSource.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch;
+
+import com.cloudera.crunch.type.PTableType;
+import com.cloudera.crunch.type.PType;
+
+/**
+ * The base class for {@code Source} implementations that return a {@link PTable}.
+ *
+ */
+public abstract class TableSource<K, V> implements Source<Pair<K, V>> {
+ @Override
+ public PType<Pair<K, V>> getType() {
+ return getTableType();
+ }
+
+ public abstract PTableType<K, V> getTableType();
+}
26 src/main/java/com/cloudera/crunch/Target.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch;
+
+import com.cloudera.crunch.io.OutputHandler;
+import com.cloudera.crunch.type.PType;
+
+/**
+ * A {@code Target} represents the output destination of a Crunch job.
+ *
+ */
+public interface Target {
+ boolean accept(OutputHandler handler, PType<?> ptype);
+}
47 src/main/java/com/cloudera/crunch/Tuple.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A fixed-size collection of Objects, used in Crunch for representing
+ * joins between {@code PCollection}s.
+ *
+ */
+public abstract class Tuple {
+
+ public static Tuple tuplify(Object... values) {
+ switch (values.length) {
+ case 2:
+ return Pair.of(values[0], values[1]);
+ case 3:
+ return new Tuple3(values[0], values[1], values[2]);
+ case 4:
+ return new Tuple4(values[0], values[1], values[2], values[3]);
+ default:
+ return new TupleN(values);
+ }
+ }
+
+ /**
+ * Returns the Object at the given index.
+ */
+ public abstract Object get(int index);
+
+ /**
+ * Returns the number of elements in this Tuple.
+ */
+ public abstract int size();
+}
61 src/main/java/com/cloudera/crunch/Tuple3.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A convenience class for three-element {@link Tuple}s.
+ */
+public class Tuple3<V1, V2, V3> extends Tuple {
+
+ private final V1 first;
+ private final V2 second;
+ private final V3 third;
+
+ public Tuple3(V1 first, V2 second, V3 third) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ }
+
+ public V1 first() {
+ return first;
+ }
+
+ public V2 second() {
+ return second;
+ }
+
+ public V3 third() {
+ return third;
+ }
+
+ public Object get(int index) {
+ switch (index) {
+ case 0:
+ return first;
+ case 1:
+ return second;
+ case 2:
+ return third;
+ default:
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ }
+
+ public int size() {
+ return 3;
+ }
+}
69 src/main/java/com/cloudera/crunch/Tuple4.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A convenience class for four-element {@link Tuple}s.
+ */
+public class Tuple4<V1, V2, V3, V4> extends Tuple {
+
+ private final V1 first;
+ private final V2 second;
+ private final V3 third;
+ private final V4 fourth;
+
+ public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ this.fourth = fourth;
+ }
+
+ public V1 first() {
+ return first;
+ }
+
+ public V2 second() {
+ return second;
+ }
+
+ public V3 third() {
+ return third;
+ }
+
+ public V4 fourth() {
+ return fourth;
+ }
+
+ public Object get(int index) {
+ switch (index) {
+ case 0:
+ return first;
+ case 1:
+ return second;
+ case 2:
+ return third;
+ case 3:
+ return fourth;
+ default:
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ }
+
+ public int size() {
+ return 4;
+ }
+}
37 src/main/java/com/cloudera/crunch/TupleN.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.crunch;
+
+/**
+ * A {@link Tuple} instance for an arbitrary number of values.
+ */
+public class TupleN extends Tuple {
+
+ private final Object values[];
+
+ public TupleN(Object... values) {
+ this.values = new Object[values.length];
+ System.arraycopy(values, 0, this.values, 0, values.length);
+ }
+
+ public Object get(int index) {
+ return values[index];
+ }
+
+ public int size() {
+ return values.length;
+ }
+}
36 src/main/java/com/cloudera/crunch/fn/IdentityFn.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.fn;
+
+import com.cloudera.crunch.MapFn;
+
+public class IdentityFn<T> extends MapFn<T, T> {
+
+ private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
+
+ @SuppressWarnings("unchecked")
+ public static <T> IdentityFn<T> getInstance() {
+ return (IdentityFn<T>) INSTANCE;
+ }
+
+ // Non-instantiable
+ private IdentityFn() {
+ }
+
+ @Override
+ public T map(T input) {
+ return input;
+ }
+}
28 src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.fn;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.Pair;
+
+public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
+ @Override
+ public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
+ emitter.emit(Pair.of(map(input.first()), input.second()));
+ }
+
+ public abstract K2 map(K1 k1);
+}
28 src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.fn;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.Pair;
+
+public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
+ @Override
+ public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
+ emitter.emit(Pair.of(input.first(), map(input.second())));
+ }
+
+ public abstract V2 map(V1 v);
+}
38 src/main/java/com/cloudera/crunch/fn/PairMapFn.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.fn;
+
+import com.cloudera.crunch.MapFn;
+import com.cloudera.crunch.Pair;
+
+public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
+ private MapFn<K, S> keys;
+ private MapFn<V, T> values;
+
+ public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
+ this.keys = keys;
+ this.values = values;
+ }
+
+ public void initialize() {
+ keys.initialize();
+ values.initialize();
+ }
+
+ @Override
+ public Pair<S, T> map(Pair<K, V> input) {
+ return Pair.of(keys.map(input.first()), values.map(input.second()));
+ }
+}
170 src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ *
+ */
+package com.cloudera.crunch.impl.mr;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;