Skip to content

Commit

Permalink
[EXAMPLES] Updated TPCH Query01 implementation and test.
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexandrov committed Feb 26, 2015
1 parent 8199bf7 commit 5f12540
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package eu.stratosphere.emma.examples.tpch.query01

case class GrpKey(
returnFlag: String,
lineStatus: String) {}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package eu.stratosphere.emma.examples.tpch
package eu.stratosphere.emma.examples.tpch.query01

import eu.stratosphere.emma.api._
import eu.stratosphere.emma.examples.Algorithm
import eu.stratosphere.emma.examples.tpch.Query01.Schema._
import eu.stratosphere.emma.examples.tpch.Lineitem
import eu.stratosphere.emma.examples.tpch.query03.Query03
import eu.stratosphere.emma.runtime.Engine
import net.sourceforge.argparse4j.inf.{Namespace, Subparser}

Expand Down Expand Up @@ -45,30 +46,6 @@ object Query01 {
}
}

// --------------------------------------------------------------------------
// Schema
// --------------------------------------------------------------------------

object Schema {

case class GrpKey(
returnFlag: String,
lineStatus: String) {}

case class Result(
returnFlag: String,
lineStatus: String,
sumQty: Int,
sumBasePrice: Double,
sumDiscPrice: Double,
sumCharge: Double,
avgQty: Double,
avgPrice: Double,
avgDisc: Double,
countOrder: Long) {}

}

}


Expand Down Expand Up @@ -103,7 +80,7 @@ object Query01 {
* @param outPath Output path
* @param delta Query parameter `DELTA`
*/
class Query01(inPath: String, outPath: String, delta: Int, rt: Engine) extends Algorithm(rt) {
class Query01(inPath: String, outPath: String, delta: Int, rt: Engine, truncate: Boolean = false) extends Algorithm(rt) {

def this(ns: Namespace, rt: Engine) = this(
ns.get[String](Query01.Command.KEY_INPUT),
Expand All @@ -115,6 +92,9 @@ class Query01(inPath: String, outPath: String, delta: Int, rt: Engine) extends A

val alg = emma.parallelize {

// cannot directly reference the parameter
val _truncate = truncate

// compute join part of the query
val l = for (
l <- read(s"$inPath/lineitem.tbl", new CSVInputFormat[Lineitem]('|'));
Expand All @@ -123,25 +103,28 @@ class Query01(inPath: String, outPath: String, delta: Int, rt: Engine) extends A

// aggregate and compute the final result
val r = for (
g <- l.groupBy(l => GrpKey(l.returnFlag, l.lineStatus)))
g <- l.groupBy(l => new GrpKey(l.returnFlag, l.lineStatus)))
yield {

def tr(v: Double) = if (_truncate) BigDecimal(v).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble else v

// compute base aggregates
val sumQty = g.values.map(_.quantity).sum();
val sumBasePrice = g.values.map(_.extendedPrice).sum()
val sumDiscPrice = g.values.map(l => l.extendedPrice * (1 - l.discount)).sum()
val sumCharge = g.values.map(l => l.extendedPrice * (1 - l.discount) * (1 + l.tax)).sum()
val countOrder = g.values.count()
// compute result
Result(
new Result(
g.key.returnFlag,
g.key.lineStatus,
sumQty,
sumBasePrice,
sumDiscPrice,
sumCharge,
tr(sumBasePrice),
tr(sumDiscPrice),
tr(sumCharge),
avgQty = sumQty / countOrder,
avgPrice = sumBasePrice / countOrder,
avgDisc = sumDiscPrice / countOrder,
avgPrice = tr(sumBasePrice / countOrder),
avgDisc = tr(sumDiscPrice / countOrder),
countOrder)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eu.stratosphere.emma.examples.tpch.query01

case class Result(
returnFlag: String,
lineStatus: String,
sumQty: Int,
sumBasePrice: Double,
sumDiscPrice: Double,
sumCharge: Double,
avgQty: Double,
avgPrice: Double,
avgDisc: Double,
countOrder: Long) {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eu.stratosphere.emma.examples.tpch

import java.io.File

import eu.stratosphere.emma.examples.tpch.query01.Query01
import eu.stratosphere.emma.runtime
import eu.stratosphere.emma.runtime.Engine
import eu.stratosphere.emma.testutil._
Expand Down Expand Up @@ -33,21 +34,15 @@ class TPCHTest {
new File(outBase).mkdirs()
}

@After def teardown(): Unit = {
// close the runtime session
rt.closeSession()
}

@Ignore
@Test def testQuery01(): Unit = {

// execute with native and with tested environment
new Query01(inBase, outputPath("q1.tbl.native"), 30, runtime.Native).run()
new Query01(inBase, outputPath("q1.tbl.flink"), 30, rt).run()
new Query01(inBase, outputPath("q1.tbl.native"), 30, runtime.Native, true).run()
new Query01(inBase, outputPath("q1.tbl.flink"), 30, rt, true).run()

// compare the results
val exp = scala.io.Source.fromFile(outputPath("q1.tbl.flink")).getLines().toStream
val res = scala.io.Source.fromFile(outputPath("q1.tbl.flink")).getLines().toStream
val exp = scala.io.Source.fromFile(outputPath("q1.tbl.native")).getLines().toStream.toList.sorted
val res = (1 to rt.defaultDOP flatMap(i => scala.io.Source.fromFile(outputPath(s"q1.tbl.flink/$i")).getLines().toStream)).toList.sorted

// assert that the result contains the expected values
compareBags(exp, res)
Expand Down

0 comments on commit 5f12540

Please sign in to comment.