diff --git a/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala b/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala index 381c51f2..9f9eb562 100644 --- a/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala +++ b/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala @@ -422,12 +422,12 @@ trait Tensors extends OpenCL { } def apply[A](elements: A, padding: Float = 0.0f)( - implicit tensorBuilder: TensorBuilder.Aux[A, Float]): BufferedTensor = { + implicit tensorBuilder: TensorBuilder.Aux[A, Float]): NonInlineTensor = { val padding0 = padding new { val shape: Array[Int] = tensorBuilder.shape(elements).toArray val padding: Float = padding0 - } with BufferedTensor { + } with NonInlineTensor { private[compute] val doBuffer = { Do(TryT(ResourceT(UnitContinuation.delay { val data = tensorBuilder.flatten(elements).toArray @@ -454,13 +454,13 @@ trait Tensors extends OpenCL { } with InlineTensor } - def random(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): BufferedTensor = { + def random(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): NonInlineTensor = { val shape0 = shape val padding0 = padding new { val padding = padding0 val shape = shape0 - } with BufferedTensor { + } with NonInlineTensor { private[compute] val doBuffer: Do[PendingBuffer[Float]] = { val size = shape.product allocateBuffer[Float](size).flatMap { buffer => @@ -475,13 +475,13 @@ trait Tensors extends OpenCL { } /** Generate random numbers in normal distribution. */ - def randomNormal(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): BufferedTensor = { + def randomNormal(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): NonInlineTensor = { val shape0 = shape val padding0 = padding new { val padding = padding0 val shape = shape0 - } with BufferedTensor { + } with NonInlineTensor { private[compute] val doBuffer: Do[PendingBuffer[Float]] = { val size = shape.product val paddingSize = if (size % 2 == 1) { @@ -541,7 +541,7 @@ trait Tensors extends OpenCL { } } - def join(tensors0: Seq[Tensor]): BufferedTensor = { + def join(tensors0: Seq[Tensor]): NonInlineTensor = { def force[A](seq: Seq[A]) = { seq match { case seqView: SeqView[A, _] @unchecked => @@ -556,7 +556,7 @@ trait Tensors extends OpenCL { new { val shape = headTensor.shape :+ tensors.length val padding: Float = headTensor.padding - } with BufferedTensor { + } with NonInlineTensor { private[compute] val doBuffer = { val elements = tensors.map(_.closure) enqueueClosure(trees.tuple.join(elements: _*), headTensor.shape).asInstanceOf[Do[PendingBuffer[Float]]] @@ -591,9 +591,9 @@ trait Tensors extends OpenCL { /** * @group delayed */ - def notInline: BufferedTensor + def nonInline: NonInlineTensor - private def reduce(programs: MonoidPrograms): BufferedTensor = { + private def reduce(programs: MonoidPrograms): NonInlineTensor = { new { val padding: Float = thisTensor.padding @@ -683,7 +683,7 @@ trait Tensors extends OpenCL { } } }.shared - } with BufferedTensor { + } with NonInlineTensor { def shape: Array[Int] = Tensors.ScalarShape } } @@ -771,7 +771,7 @@ trait Tensors extends OpenCL { /** * @group delayed */ - def reshape(newShape: Array[Int]): BufferedTensor = { + def reshape(newShape: Array[Int]): NonInlineTensor = { if (newShape.product != shape.product) { throw new IllegalArgumentException } @@ -779,7 +779,7 @@ trait Tensors extends OpenCL { val padding: Float = thisTensor.padding val shape: Array[Int] = newShape private[compute] val doBuffer: Do[PendingBuffer[Float]] = thisTensor.doBuffer - } with BufferedTensor + } with NonInlineTensor } /** @@ -993,82 +993,6 @@ trait Tensors extends OpenCL { private[compute] def doBuffer: Do[PendingBuffer[closure.JvmValue]] - /** Allocates device-side cache that are managed by the [[https://github.com/ThoughtWorksInc/RAII.scala RAII.scala]] library. - * - * @note This method is similar to [[cache]], - * except the life cycle of the cache can be automatically managed. - * - * @group slow - */ - def doCache: Do[this.type] = doBuffer.map(Function.const(this)) - - /** Allocates device-side cache for this [[Tensor]], and returns a [[java.lang.AutoCloseable]] to release the cache. - * - * @note This method can be called multiple times on one [[Tensor]], - * only one copy of cache will be allocated, - * which will be finally released until all [[java.lang.AutoCloseable]] returned by [[cache]] method are closed. - * - * @group slow - */ - def cache: AutoCloseable = { - sealed trait State - case object Openning extends State - case object EarlyClosed extends State - case object Closed extends State - final case class Open(release: UnitContinuation[Unit]) extends State - - val state = new AtomicReference[State](Openning) with AutoCloseable { - @tailrec - final def close(): Unit = { - get match { - case Openning => - if (compareAndSet(Openning, EarlyClosed)) { - // Success - } else { - close() - } - case oldState @ Open(release) => - if (compareAndSet(oldState, Closed)) { - release.safeOnComplete { _: Unit => - Trampoline.done(()) - }.run - } else { - close() - } - case EarlyClosed | Closed => - throw new IllegalStateException("The resources associated to this tensor has been released.") - } - } - } - - doBuffer.safeOnComplete { resource => - @tailrec - def retry(): Trampoline[Unit] = { - state.get() match { - case EarlyClosed => - if (state.compareAndSet(EarlyClosed, Closed)) { - resource.release.safeOnComplete { _: Unit => - Trampoline.done(()) - } - } else { - retry() - } - case Openning => - if (state.compareAndSet(Openning, Open(resource.release))) { - Trampoline.done(()) - } else { - retry() - } - case _: Open | Closed => - throw new IllegalStateException() - } - } - retry() - }.run - - state - } - /** * @group slow */ @@ -1225,12 +1149,12 @@ trait Tensors extends OpenCL { enqueueClosure(closure, shape) }.shared - def notInline: BufferedTensor = + def nonInline: NonInlineTensor = new { val padding: Float = thisInlineTensor.padding private[compute] val doBuffer: Do[PendingBuffer[Float]] = thisInlineTensor.doBuffer val shape: Array[Int] = thisInlineTensor.shape - } with BufferedTensor + } with NonInlineTensor } trait TransformedTensor extends InlineTensor { @@ -1250,14 +1174,91 @@ trait Tensors extends OpenCL { } - trait BufferedTensor extends Tensor { + trait NonInlineTensor extends Tensor { - def notInline: BufferedTensor = this + def nonInline: this.type = this @transient protected lazy val closure = { arrayTerm.extract } + + /** Allocates device-side cache that are managed by the [[https://github.com/ThoughtWorksInc/RAII.scala RAII.scala]] library. + * + * @note This method is similar to [[cache]], + * except the life cycle of the cache can be automatically managed. + * + * @group slow + */ + def doCache: Do[this.type] = doBuffer.map(Function.const(this)) + + /** Allocates device-side cache for this [[Tensor]], and returns a [[java.lang.AutoCloseable]] to release the cache. + * + * @note This method can be called multiple times on one [[Tensor]]. + * Only one copy of cache will be allocated, + * which will be finally released until all [[java.lang.AutoCloseable]] returned by [[cache]] method are closed. + * + * @group slow + */ + def cache: AutoCloseable = { + sealed trait State + case object Openning extends State + case object EarlyClosed extends State + case object Closed extends State + final case class Open(release: UnitContinuation[Unit]) extends State + + val state = new AtomicReference[State](Openning) with AutoCloseable { + @tailrec + final def close(): Unit = { + get match { + case Openning => + if (compareAndSet(Openning, EarlyClosed)) { + // Success + } else { + close() + } + case oldState @ Open(release) => + if (compareAndSet(oldState, Closed)) { + release.safeOnComplete { _: Unit => + Trampoline.done(()) + }.run + } else { + close() + } + case EarlyClosed | Closed => + throw new IllegalStateException("The resources associated to this tensor has been released.") + } + } + } + + doBuffer.safeOnComplete { resource => + @tailrec + def retry(): Trampoline[Unit] = { + state.get() match { + case EarlyClosed => + if (state.compareAndSet(EarlyClosed, Closed)) { + resource.release.safeOnComplete { _: Unit => + Trampoline.done(()) + } + } else { + retry() + } + case Openning => + if (state.compareAndSet(Openning, Open(resource.release))) { + Trampoline.done(()) + } else { + retry() + } + case _: Open | Closed => + throw new IllegalStateException() + } + } + retry() + }.run + + state + } + } } diff --git a/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala b/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala index bb1aa5ca..137af16f 100644 --- a/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala +++ b/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala @@ -105,9 +105,9 @@ object benchmarks { } def doBenchmark(): Do[() => Array[Float]] = { - val weight: BufferedTensor = Tensor.randomNormal(Array(inputDepth, outputDepth)) + val weight: NonInlineTensor = Tensor.randomNormal(Array(inputDepth, outputDepth)) - val input: BufferedTensor = Tensor.randomNormal(Array(batchSize, inputDepth)) + val input: NonInlineTensor = Tensor.randomNormal(Array(batchSize, inputDepth)) weight.doCache.flatMap { weight => input.doCache.map { input => @@ -233,7 +233,7 @@ object benchmarks { trait Benchmarks extends BenchmarkTensors { def doBenchmark(): Do[() => Float] = { - val input: BufferedTensor = Tensor.randomNormal(Array.fill(numberOfDimensions)(size)) + val input: NonInlineTensor = Tensor.randomNormal(Array.fill(numberOfDimensions)(size)) input.doCache.map { input => { () => @@ -365,7 +365,7 @@ object benchmarks { trait Benchmarks extends BenchmarkTensors { - final case class ConvolutionalLayer(weight: BufferedTensor, bias: BufferedTensor) { + final case class ConvolutionalLayer(weight: NonInlineTensor, bias: NonInlineTensor) { def forward(input: Tensor): Tensor = { convolute(input, weight, bias) } @@ -467,7 +467,7 @@ object benchmarks { } def doBenchmark(): Do[() => Array[Float]] = { - val input: BufferedTensor = Tensor.randomNormal(Array(batchSize, imageHeight, imageWidth, depth)) + val input: NonInlineTensor = Tensor.randomNormal(Array(batchSize, imageHeight, imageWidth, depth)) val layers = (for (i <- (0 until numberOfLayers).view) yield { ConvolutionalLayer(weight = Tensor.randomNormal(Array(kernelHeight, kernelWidth, depth, depth)), bias = Tensor.randomNormal(Array(depth)))