Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 94 additions & 93 deletions Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,12 @@ trait Tensors extends OpenCL {
}

def apply[A](elements: A, padding: Float = 0.0f)(
implicit tensorBuilder: TensorBuilder.Aux[A, Float]): BufferedTensor = {
implicit tensorBuilder: TensorBuilder.Aux[A, Float]): NonInlineTensor = {
val padding0 = padding
new {
val shape: Array[Int] = tensorBuilder.shape(elements).toArray
val padding: Float = padding0
} with BufferedTensor {
} with NonInlineTensor {
private[compute] val doBuffer = {
Do(TryT(ResourceT(UnitContinuation.delay {
val data = tensorBuilder.flatten(elements).toArray
Expand All @@ -454,13 +454,13 @@ trait Tensors extends OpenCL {
} with InlineTensor
}

def random(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): BufferedTensor = {
def random(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): NonInlineTensor = {
val shape0 = shape
val padding0 = padding
new {
val padding = padding0
val shape = shape0
} with BufferedTensor {
} with NonInlineTensor {
private[compute] val doBuffer: Do[PendingBuffer[Float]] = {
val size = shape.product
allocateBuffer[Float](size).flatMap { buffer =>
Expand All @@ -475,13 +475,13 @@ trait Tensors extends OpenCL {
}

/** Generate random numbers in normal distribution. */
def randomNormal(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): BufferedTensor = {
def randomNormal(shape: Array[Int], seed: Int = Random.nextInt(), padding: Float = 0.0f): NonInlineTensor = {
val shape0 = shape
val padding0 = padding
new {
val padding = padding0
val shape = shape0
} with BufferedTensor {
} with NonInlineTensor {
private[compute] val doBuffer: Do[PendingBuffer[Float]] = {
val size = shape.product
val paddingSize = if (size % 2 == 1) {
Expand Down Expand Up @@ -541,7 +541,7 @@ trait Tensors extends OpenCL {
}
}

def join(tensors0: Seq[Tensor]): BufferedTensor = {
def join(tensors0: Seq[Tensor]): NonInlineTensor = {
def force[A](seq: Seq[A]) = {
seq match {
case seqView: SeqView[A, _] @unchecked =>
Expand All @@ -556,7 +556,7 @@ trait Tensors extends OpenCL {
new {
val shape = headTensor.shape :+ tensors.length
val padding: Float = headTensor.padding
} with BufferedTensor {
} with NonInlineTensor {
private[compute] val doBuffer = {
val elements = tensors.map(_.closure)
enqueueClosure(trees.tuple.join(elements: _*), headTensor.shape).asInstanceOf[Do[PendingBuffer[Float]]]
Expand Down Expand Up @@ -591,9 +591,9 @@ trait Tensors extends OpenCL {
/**
* @group delayed
*/
def notInline: BufferedTensor
def nonInline: NonInlineTensor

private def reduce(programs: MonoidPrograms): BufferedTensor = {
private def reduce(programs: MonoidPrograms): NonInlineTensor = {
new {
val padding: Float = thisTensor.padding

Expand Down Expand Up @@ -683,7 +683,7 @@ trait Tensors extends OpenCL {
}
}
}.shared
} with BufferedTensor {
} with NonInlineTensor {
def shape: Array[Int] = Tensors.ScalarShape
}
}
Expand Down Expand Up @@ -771,15 +771,15 @@ trait Tensors extends OpenCL {
/**
* @group delayed
*/
def reshape(newShape: Array[Int]): BufferedTensor = {
def reshape(newShape: Array[Int]): NonInlineTensor = {
if (newShape.product != shape.product) {
throw new IllegalArgumentException
}
new {
val padding: Float = thisTensor.padding
val shape: Array[Int] = newShape
private[compute] val doBuffer: Do[PendingBuffer[Float]] = thisTensor.doBuffer
} with BufferedTensor
} with NonInlineTensor
}

/**
Expand Down Expand Up @@ -993,82 +993,6 @@ trait Tensors extends OpenCL {

private[compute] def doBuffer: Do[PendingBuffer[closure.JvmValue]]

/** Allocates device-side cache that are managed by the [[https://github.com/ThoughtWorksInc/RAII.scala RAII.scala]] library.
*
* @note This method is similar to [[cache]],
* except the life cycle of the cache can be automatically managed.
*
* @group slow
*/
def doCache: Do[this.type] = doBuffer.map(Function.const(this))

/** Allocates device-side cache for this [[Tensor]], and returns a [[java.lang.AutoCloseable]] to release the cache.
*
* @note This method can be called multiple times on one [[Tensor]],
* only one copy of cache will be allocated,
* which will be finally released until all [[java.lang.AutoCloseable]] returned by [[cache]] method are closed.
*
* @group slow
*/
def cache: AutoCloseable = {
sealed trait State
case object Openning extends State
case object EarlyClosed extends State
case object Closed extends State
final case class Open(release: UnitContinuation[Unit]) extends State

val state = new AtomicReference[State](Openning) with AutoCloseable {
@tailrec
final def close(): Unit = {
get match {
case Openning =>
if (compareAndSet(Openning, EarlyClosed)) {
// Success
} else {
close()
}
case oldState @ Open(release) =>
if (compareAndSet(oldState, Closed)) {
release.safeOnComplete { _: Unit =>
Trampoline.done(())
}.run
} else {
close()
}
case EarlyClosed | Closed =>
throw new IllegalStateException("The resources associated to this tensor has been released.")
}
}
}

doBuffer.safeOnComplete { resource =>
@tailrec
def retry(): Trampoline[Unit] = {
state.get() match {
case EarlyClosed =>
if (state.compareAndSet(EarlyClosed, Closed)) {
resource.release.safeOnComplete { _: Unit =>
Trampoline.done(())
}
} else {
retry()
}
case Openning =>
if (state.compareAndSet(Openning, Open(resource.release))) {
Trampoline.done(())
} else {
retry()
}
case _: Open | Closed =>
throw new IllegalStateException()
}
}
retry()
}.run

state
}

/**
* @group slow
*/
Expand Down Expand Up @@ -1225,12 +1149,12 @@ trait Tensors extends OpenCL {
enqueueClosure(closure, shape)
}.shared

def notInline: BufferedTensor =
def nonInline: NonInlineTensor =
new {
val padding: Float = thisInlineTensor.padding
private[compute] val doBuffer: Do[PendingBuffer[Float]] = thisInlineTensor.doBuffer
val shape: Array[Int] = thisInlineTensor.shape
} with BufferedTensor
} with NonInlineTensor
}

trait TransformedTensor extends InlineTensor {
Expand All @@ -1250,14 +1174,91 @@ trait Tensors extends OpenCL {

}

trait BufferedTensor extends Tensor {
trait NonInlineTensor extends Tensor {

def notInline: BufferedTensor = this
def nonInline: this.type = this

@transient
protected lazy val closure = {
arrayTerm.extract
}

/** Allocates device-side cache that are managed by the [[https://github.com/ThoughtWorksInc/RAII.scala RAII.scala]] library.
*
* @note This method is similar to [[cache]],
* except the life cycle of the cache can be automatically managed.
*
* @group slow
*/
def doCache: Do[this.type] = doBuffer.map(Function.const(this))

/** Allocates device-side cache for this [[Tensor]], and returns a [[java.lang.AutoCloseable]] to release the cache.
*
* @note This method can be called multiple times on one [[Tensor]].
* Only one copy of cache will be allocated,
* which will be finally released until all [[java.lang.AutoCloseable]] returned by [[cache]] method are closed.
*
* @group slow
*/
def cache: AutoCloseable = {
sealed trait State
case object Openning extends State
case object EarlyClosed extends State
case object Closed extends State
final case class Open(release: UnitContinuation[Unit]) extends State

val state = new AtomicReference[State](Openning) with AutoCloseable {
@tailrec
final def close(): Unit = {
get match {
case Openning =>
if (compareAndSet(Openning, EarlyClosed)) {
// Success
} else {
close()
}
case oldState @ Open(release) =>
if (compareAndSet(oldState, Closed)) {
release.safeOnComplete { _: Unit =>
Trampoline.done(())
}.run
} else {
close()
}
case EarlyClosed | Closed =>
throw new IllegalStateException("The resources associated to this tensor has been released.")
}
}
}

doBuffer.safeOnComplete { resource =>
@tailrec
def retry(): Trampoline[Unit] = {
state.get() match {
case EarlyClosed =>
if (state.compareAndSet(EarlyClosed, Closed)) {
resource.release.safeOnComplete { _: Unit =>
Trampoline.done(())
}
} else {
retry()
}
case Openning =>
if (state.compareAndSet(Openning, Open(resource.release))) {
Trampoline.done(())
} else {
retry()
}
case _: Open | Closed =>
throw new IllegalStateException()
}
}
retry()
}.run

state
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ object benchmarks {
}

def doBenchmark(): Do[() => Array[Float]] = {
val weight: BufferedTensor = Tensor.randomNormal(Array(inputDepth, outputDepth))
val weight: NonInlineTensor = Tensor.randomNormal(Array(inputDepth, outputDepth))

val input: BufferedTensor = Tensor.randomNormal(Array(batchSize, inputDepth))
val input: NonInlineTensor = Tensor.randomNormal(Array(batchSize, inputDepth))

weight.doCache.flatMap { weight =>
input.doCache.map { input =>
Expand Down Expand Up @@ -233,7 +233,7 @@ object benchmarks {
trait Benchmarks extends BenchmarkTensors {

def doBenchmark(): Do[() => Float] = {
val input: BufferedTensor = Tensor.randomNormal(Array.fill(numberOfDimensions)(size))
val input: NonInlineTensor = Tensor.randomNormal(Array.fill(numberOfDimensions)(size))

input.doCache.map { input =>
{ () =>
Expand Down Expand Up @@ -365,7 +365,7 @@ object benchmarks {

trait Benchmarks extends BenchmarkTensors {

final case class ConvolutionalLayer(weight: BufferedTensor, bias: BufferedTensor) {
final case class ConvolutionalLayer(weight: NonInlineTensor, bias: NonInlineTensor) {
def forward(input: Tensor): Tensor = {
convolute(input, weight, bias)
}
Expand Down Expand Up @@ -467,7 +467,7 @@ object benchmarks {
}

def doBenchmark(): Do[() => Array[Float]] = {
val input: BufferedTensor = Tensor.randomNormal(Array(batchSize, imageHeight, imageWidth, depth))
val input: NonInlineTensor = Tensor.randomNormal(Array(batchSize, imageHeight, imageWidth, depth))
val layers = (for (i <- (0 until numberOfLayers).view) yield {
ConvolutionalLayer(weight = Tensor.randomNormal(Array(kernelHeight, kernelWidth, depth, depth)),
bias = Tensor.randomNormal(Array(depth)))
Expand Down