Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add support for top-k operations on PTables in scrunch

  • Loading branch information...
commit cf01463cabecedeeb35c40f32a35a5a3c25704ff 1 parent 817cfd0
@jwills jwills authored
View
8 scrunch/src/main/scala/com/cloudera/scrunch/PTable.scala
@@ -16,9 +16,9 @@ package com.cloudera.scrunch
import com.cloudera.crunch.{DoFn, Emitter, FilterFn, MapFn, Target}
import com.cloudera.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
-import com.cloudera.crunch.lib.{Cogroup, Join, PTables}
+import com.cloudera.crunch.lib.{Aggregate, Cogroup, Join, PTables}
import com.cloudera.scrunch.Conversions._
-import java.lang.{Iterable => JIterable}
+import java.lang.{Comparable, Iterable => JIterable}
import java.util.{Collection => JCollect}
import scala.collection.JavaConversions._
@@ -73,6 +73,10 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
}, ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType)))
}
+ def top(limit: Int, maximize: Boolean) = {
+ wrap(Aggregate.top(this.native, limit, maximize))
+ }
+
def groupByKey = new PGroupedTable(native.groupByKey())
def groupByKey(partitions: Int) = new PGroupedTable(native.groupByKey(partitions))
View
33 scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala
@@ -0,0 +1,33 @@
+/**
+ * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.scrunch
+
+import com.cloudera.crunch.io.{From => from, To => to}
+import com.cloudera.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class TopTest extends JUnitSuite {
+ @Test def top2 {
+ val pipeline = new Pipeline[TopTest]
+ val input = FileHelper.createTempCopyOf("shakes.txt")
+
+ val wc = pipeline.read(from.textFile(input))
+ .flatMap(_.toLowerCase.split("\\s+"))
+ .filter(!_.isEmpty()).count
+ assert(wc.top(10, true).materialize.exists(_ == ("is", 205)))
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.