Skip to content

Commit

Permalink
Added some laziness to the graph operators
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed Apr 25, 2016
1 parent 9658be5 commit 23ceee7
Showing 1 changed file with 44 additions and 17 deletions.
61 changes: 44 additions & 17 deletions src/main/scala/workflow/Node.scala
Expand Up @@ -76,16 +76,22 @@ sealed trait Operator {
}

sealed trait OperatorOutput
case class DatasetOutput(rdd: RDD[_]) extends OperatorOutput
case class DatumOutput(data: Any) extends OperatorOutput
case class TransformerOperatorOutput(op: TransformerOperator) extends OperatorOutput
class DatasetOutput(getRDD: => RDD[_]) extends OperatorOutput {
lazy val get: RDD[_] = getRDD
}
class DatumOutput(getDatum: => Any) extends OperatorOutput {
lazy val get: Any = getDatum
}
class TransformerOperatorOutput(getTransformer: => TransformerOperator) extends OperatorOutput {
lazy val get: TransformerOperator = getTransformer
}

private[workflow] case class DatasetOperator(rdd: RDD[_]) extends Operator {
override def label: String = "%s[%d]".format(
Option(rdd.name).map(_ + " ").getOrElse(""), rdd.id)

override def execute(deps: Seq[OperatorOutput]): DatasetOutput = {
DatasetOutput(rdd)
new DatasetOutput(rdd)
}
}

Expand All @@ -97,55 +103,76 @@ private[workflow] case class DatumOperator(datum: Any) extends Operator {
}

override def execute(deps: Seq[OperatorOutput]): DatumOutput = {
DatumOutput(datum)
new DatumOutput(datum)
}
}

private[workflow] abstract class TransformerOperator extends Operator with Serializable {
private[workflow] def singleTransform(dataDependencies: Seq[_]): Any
private[workflow] def bulktransform(dataDependencies: Seq[RDD[_]]): RDD[_]
private[workflow] def singleTransform(dataDependencies: Seq[DatumOutput]): Any
private[workflow] def bulkTransform(dataDependencies: Seq[DatasetOutput]): RDD[_]

override def execute(deps: Seq[OperatorOutput]): OperatorOutput = {
require(deps.forall(_.isInstanceOf[DatasetOutput]) || deps.forall(_.isInstanceOf[DatumOutput]),
"Transformer dependencies must be either all RDDs or all single data items")

val depsAsRDD = deps.collect {
case DatasetOutput(data) => data
case data: DatasetOutput => data
}

val depsAsDatum = deps.collect {
case DatumOutput(datum) => datum
case datum: DatumOutput => datum
}

if (depsAsRDD.nonEmpty) {
DatasetOutput(bulktransform(depsAsRDD))
// DatasetOutput is constructed as call-by-name,
// meaning bulktransform will only be called when the output is used
new DatasetOutput(bulkTransform(depsAsRDD))
} else {
DatumOutput(singleTransform(depsAsDatum))
// DatumOutput is constructed as call-by-name,
// meaning singleTransform will only be called when the output is used
new DatumOutput(singleTransform(depsAsDatum))
}
}
}

private[workflow] abstract class EstimatorOperator extends Operator with Serializable {
def fitRDDs(in: Seq[RDD[_]]): TransformerOperator
def fitRDDs(in: Seq[DatasetOutput]): TransformerOperator

override def execute(deps: Seq[OperatorOutput]): TransformerOperatorOutput = {
val rdds = deps.collect {
case DatasetOutput(rdd) => rdd
case data: DatasetOutput => data
}
require(rdds.size == deps.size)

val fittedTransformer = fitRDDs(rdds)
TransformerOperatorOutput(fittedTransformer)
// TransformerOutput is constructed as call-by-name,
// meaning fitRDDs will only be called when the transformer is used
new TransformerOperatorOutput(fitRDDs(rdds))
}
}

private[workflow] class DelegatingOperator extends Operator with Serializable {
override def execute(deps: Seq[OperatorOutput]): OperatorOutput = {
val transformer = deps.collectFirst {
case TransformerOperatorOutput(t) => t
case t: TransformerOperatorOutput => t
}.get

transformer.execute(deps.tail)
val depsAsRDD = deps.tail.collect {
case data: DatasetOutput => data
}

val depsAsDatum = deps.tail.collect {
case datum: DatumOutput => datum
}

if (depsAsRDD.nonEmpty) {
// DatasetOutput is constructed as call-by-name,
// meaning bulktransform will only be called when the output is used
new DatasetOutput(transformer.get.bulkTransform(depsAsRDD))
} else {
// DatumOutput is constructed as call-by-name,
// meaning singleTransform will only be called when the output is used
new DatumOutput(transformer.get.singleTransform(depsAsDatum))
}
}
}

Expand Down

0 comments on commit 23ceee7

Please sign in to comment.