Permalink
Browse files

Checkpoint: working MapValuesMap

  • Loading branch information...
1 parent d8fb459 commit d9f2c78081a7fbe51f09630ec181a46e513a0a2a @aboisvert committed Jun 5, 2012
View
@@ -7,13 +7,13 @@ repositories.remote << 'http://repo1.maven.org/'
HADOOP_HOME = "/opt/boisvert/hadoop-0.20.2"
-CASCADING_DISTRO = "org.cascading:cascading:tgz:wip-2.0"
-download(artifact(CASCADING_DISTRO) => 'http://files.concurrentinc.com/cascading/2.0/cascading-2.0.0-wip-200-hadoop-0.20.2%2B.tgz')
+CASCADING_DISTRO = "org.cascading:cascading:tgz:wip-2.0-298"
+download(artifact(CASCADING_DISTRO) => 'http://files.concurrentinc.com/cascading/2.0/cascading-2.0.0-wip-298.tgz')
-CASCADING_TMP = File.join(ENV['TMP'] || '/tmp', 'cascading-wip-2.0')
+CASCADING_TMP = File.join(ENV['TMP'] || '/tmp', 'cascading-wip-2.0-208')
unzip(CASCADING_TMP => artifact(CASCADING_DISTRO))
file(CASCADING_TMP).invoke
-FileList[CASCADING_TMP + '/cascading-2.0.0-wip-200-hadoop-0.20.2+/*'].each do |f|
+FileList[CASCADING_TMP + '/cascading-2.0.0-wip-298/*'].each do |f|
FileUtils.mv(f, CASCADING_TMP)
end
@@ -25,7 +25,8 @@ CASCADING_CORE = \
HADOOP = \
FileList[HADOOP_HOME + "/hadoop-*-core.jar"] + \
- FileList[HADOOP_HOME + "/lib/*.jar"]
+ FileList[HADOOP_HOME + "/lib/*.jar"] \
+ .reject { |f| f =~ /slf4j/ }
desc 'Scala-based query language for Hadoop'
define "revolute" do
@@ -26,21 +26,25 @@ object EvaluationChain {
def outputNames(c: ColumnBase[_]): Seq[String] = {
var unique = 0
- def name(c: ColumnBase[_]): Option[String] = c match {
- case named: NamedColumn[_] => Some(named.qualifiedColumnName)
- case as: ColumnOps.AsColumnOf[_, _] => name(as.left)
- case _ => None
-
+ def resolveNames(c: ColumnBase[_]): Seq[String] = c match {
+ case p: Projectable[_] => p.projection.columns flatMap resolveNames
+ case as: ColumnOps.AsColumn[_] => resolveNames(as.right)
+ case named: NamedColumn[_] => Seq(named.qualifiedColumnName)
+ case as: ColumnOps.AsColumnOf[_, _] => resolveNames(as.left)
+ case _ => Seq.empty
+ }
+ new EvaluationChain(c).leaves flatMap { c =>
+ val names = resolveNames(c)
+ if (names.nonEmpty) names else {
+ unique += 1
+ Seq(c.nameHint + "#" + unique)
+ }
}
- new EvaluationChain(c).leaves map { c => name(c).getOrElse {
- unique += 1
- c.nameHint + "#" + unique
- }}
}
def outputFields(c: ColumnBase[_]): Fields = new Fields(outputNames(c): _*)
- def tables(c: ColumnBase[_]) = new EvaluationChain(c).namedRoots map (_.table) toSet
+ def tables(c: ColumnBase[_]): Set[AbstractTable[_ <: Product]] = new EvaluationChain(c).namedRoots map (_.table) toSet
def queries(c: ColumnBase[_]): Set[Query[_ <: ColumnBase[_]]] = new EvaluationChain(c).queryRoots map (_.query) toSet
@@ -109,7 +113,7 @@ class EvaluationChain(val c: ColumnBase[_]) {
else {
tuple.tupleEntry = tupleEntry
tupleEntry = null
- Console.println("root context next tuple: " + tuple)
+ // Console.println("root context next tuple: " + tuple)
tuple
}
}
@@ -192,7 +196,7 @@ class SharedEvaluationContext(val shared: SharedEvaluationContext.SharedState) e
shared.listeners += this
override def nextTuple(): Tuple = {
- Console.println("SharedEvaluationContext.nextTuple() consumed=" + consumed)
+ // Console.println("SharedEvaluationContext.nextTuple() consumed=" + consumed)
if (consumed || shared.tuple == null) {
shared.nextTuple()
}
@@ -215,6 +219,8 @@ class OneToZeroOrOneEvaluationContext(
private[this] val positions = orderedDependencies.zipWithIndex.toMap
+ //Console.println("subContexts " + (subContexts.toList))
+ //Console.println("orderedDependencies " + (orderedDependencies.toList))
private[this] val subPositions = subContexts zip orderedDependencies map { case (c, d) => c.position(d) }
private[this] val tuple = new Tuple {
@@ -241,12 +247,13 @@ class OneToZeroOrOneEvaluationContext(
while (i < subContexts.length) {
val context = subContexts(i)
val next = context.nextTuple()
- Console.println("next: " + context + " next: " + next)
+ // Console.println("next context: " + context)
+ // Console.println("next tuple: " + next)
if (next == null) return null
tuple.values(i) = next
i += 1
}
- Console.println("EvaluationChainContext nextTuple(): " + tuple)
+ // Console.println("EvaluationChainContext nextTuple(): " + tuple)
tuple
}
@@ -286,7 +293,8 @@ class OneToZeroOrManyEvaluationContext(
subValues(i).clear()
val context = subContexts(i)
val next = context.nextTuple()
- Console.println("next: " + context + " next: " + next)
+ // Console.println("next context: " + context)
+ // Console.println("next tuple: " + next)
if (next == null) return null
types(i) match {
@@ -314,7 +322,7 @@ class OneToZeroOrManyEvaluationContext(
}
iterator = Combinations.combinations(subValues)
current = iterator.next()
- Console.println("EvaluationChainContext nextTuple(): " + tuple)
+ // Console.println("EvaluationChainContext nextTuple(): " + tuple)
tuple
} else if (iterator.hasNext) {
current = iterator.next()
@@ -4,7 +4,7 @@ import cascading.flow.FlowProcess
import cascading.operation.{BaseOperation, Function, FunctionCall, OperationCall}
import cascading.tuple.{Fields, Tuple}
-import revolute.query.{ColumnBase, Column, OutputType, Projection}
+import revolute.query._
import revolute.query.OperationType._
import revolute.util.{Combinations, NamingContext}
@@ -21,14 +21,23 @@ class FlatMapOperation(val projection: Projection[_])
override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[Any]) {
chain.tupleEntry = functionCall.getArguments
var tuple = chain.context.nextTuple()
- Console.println("tuple: " + tuple)
+ // Console.println("tuple: " + tuple)
while (tuple != null) {
try {
val result = new cascading.tuple.Tuple()
var i = 0
while (i < chain.arity) {
val value = tuple.get(i)
- result.add(value)
+ projection.columns(i) match {
+ case p: Projectable[_] =>
+ val map = value.asInstanceOf[Map[ColumnBase[_], Any]]
+ p.projection.columns foreach { c =>
+ val v = map(c)
+ result.add(v)
+ }
+ case _ =>
+ result.add(value)
+ }
i += 1
}
Console.println("add: " + result)
@@ -2,13 +2,12 @@ package revolute.flow
import cascading.flow.{Flow, FlowConnector, FlowProcess}
import cascading.pipe.Pipe
-
-import revolute.query.{ColumnBase, Query, QueryBuilder, Table, TableBase}
+import revolute.query._
import revolute.util.NamingContext
import revolute.util.Compat._
-
import scala.collection._
import scala.collection.mutable.ArrayBuffer
+import revolute.cascading.EvaluationChain
object FlowContext {
def local(f: FlowContext => Unit): FlowContext = {
@@ -24,16 +23,19 @@ object FlowContext {
class FlowContext(val namingContext: NamingContext, val flowConnector: FlowConnector) {
val sources = mutable.Map[TableBase[_], () => Tap]()
- val sinks = mutable.Map[TableBase[_], Tap]()
+ val sinks = mutable.Map[TableBase[_], () => Tap]()
}
object FlowBuilder {
def flow(context: FlowContext)(f: FlowBuilder => Unit): Flow[_] = {
val builder = new FlowBuilder(context)
f(builder)
val flow = builder.createFlow()
+ Console.println("flow start()")
flow.start()
+ Console.println("flow complete()")
flow.complete()
+ Console.println("flow completed")
flow
}
@@ -55,8 +57,8 @@ class FlowBuilder(val context: FlowContext) {
def insert[Q <: Query[_ <: ColumnBase[_]]](q: Q) = {
new {
- def into[T <: Table[_]](t: T): Insert[Q, T] = {
- val i = new Insert(q, t, context)
+ def into[T <: Table](target: T): Insert[Q, T] = {
+ val i = new Insert(q, target, context)
statements += i
i
}
@@ -66,27 +68,75 @@ class FlowBuilder(val context: FlowContext) {
def createFlow(): Flow[_] = {
if (statements.size != 1) sys.error("FlowBuilder only supports 1 statement right now: " + statements.size)
- val pipe = statements.head.pipe
- val sink = statements.head.sink
- val pipeNames = statements map (_.pipe.getName)
- sys.error("TODO")
- // val flow = context.flowConnector.connect(sources(pipeNames, context), sink, pipe)
- // flow
+ val statement = statements.head
+
+ val pipes = mutable.Map[ColumnBase[_], Pipe]()
+ val pipe = statement.pipe
+ //Console.println()
+ //Console.println("pipe: " + pipe)
+ val sources = statement.sources
+ Console.println("Sources: " + sources)
+ val sink = statement.sink
+ Console.println("Sink: " + sink)
+ // val pipeNames = statements.head.pipe.getHeads.toSeq map (_.getName)
+ // val srcs: java.util.Map[String, Tap] = sources(pipeNames, context)
+ val flow = context.flowConnector.connect(JavaConversions.mapAsJavaMap(sources), sink, pipe)
+ flow
}
}
sealed trait Statement {
def pipe: Pipe
def sink: Tap
+ def sources: Map[String, Tap]
}
-class Insert[Q <: Query[_ <: ColumnBase[_]], T <: Table[_]](val query: Q, val table: T, context: FlowContext) extends Statement {
- def pipe = {
+class Insert[Q <: Query[_ <: ColumnBase[_]], T <: Table](val query: Q, val table: T, context: FlowContext) extends Statement {
+ override def pipe = {
val qb = new QueryBuilder(query.asInstanceOf[Query[query.QT]], NamingContext())
qb.pipe
}
- def sink = {
- context.sinks.getOrElse(table, sys.error("Missing sink binding for table %s" format table.tableName))
+ override def sink = {
+ val result = context.sinks.getOrElse(table, sys.error("Missing sink binding for table %s" format table.tableName)).apply()
+ //Console.println("sink: " + result)
+ result
+ }
+
+ override def sources = {
+ val srcs = mutable.Map[AbstractTable[_], () => Tap]()
+
+ def visit(c: ColumnBase[_]) {
+ //Console.println("sources visit %s" format c)
+ c match {
+ case q: Query[_] =>
+ q.subquery foreach visit
+ visit(q.value)
+
+ case _ =>
+ val tables = EvaluationChain.tables(c)
+ //Console.println
+ // Console.println("sources -> queries: " + queries)
+ // Console.println
+ // Console.println("sources -> tables: " + tables)
+ //Console.println
+ tables foreach { t =>
+ if (!srcs.contains(t) && context.sources.get(t).isDefined) {
+ srcs += (t -> context.sources.get(t).get)
+ }
+ }
+ }
+ }
+
+ visit(query)
+
+
+ val result: Map[String, Tap] = srcs map { case (table, lazyTap) => table.tableName -> lazyTap.apply() } toMap;
+
+ Console.println
+ Console.println("sources:\n" + (result mkString "\n"))
+ Console.println
+
+ result
}
}
@@ -36,7 +36,7 @@ object OutputType {
val values = List(OneToZeroOrOne, OneToMany)
}
-trait SyntheticColumn[T] extends ColumnBase[T] {
+trait SyntheticColumn[+T] extends ColumnBase[T] {
override val nameHint = getClass.getSimpleName
override val operationType = OperationType.PureMapper
override def dependencies = sys.error("Should not be called; use case-by-case for synthetic column dependencies: " + this)
@@ -110,7 +110,7 @@ abstract class OperatorColumn[T : TypeMapper] extends Column[T] with ColumnOps[T
}
/** A column which is part of a Table. */
-class NamedColumn[T: TypeMapper](val table: AbstractTable[_], val columnName: String, val options: ColumnOption[_]*)
+class NamedColumn[T: TypeMapper](val table: AbstractTable[_ <: Product], val columnName: String, val options: ColumnOption[_]*)
extends OperatorColumn[T] with ColumnOps[T, T]
{
val qualifiedColumnName = table.tableName + "." + columnName
Oops, something went wrong.

0 comments on commit d9f2c78

Please sign in to comment.