From dbad14c69831d7b39d2d0a9b0f527d167f643929 Mon Sep 17 00:00:00 2001 From: Yang Bo Date: Mon, 26 Mar 2018 13:48:42 +0800 Subject: [PATCH] Add a plug-in for synchronized creating kernels This plug-in is a workaround for https://software.intel.com/en-us/forums/opencl/topic/760981 --- .../com/thoughtworks/compute/OpenCL.scala | 82 +++++++++++++------ .../com/thoughtworks/compute/Tensors.scala | 12 +-- .../com/thoughtworks/compute/benchmarks.scala | 1 + 3 files changed, 63 insertions(+), 32 deletions(-) diff --git a/OpenCL/src/main/scala/com/thoughtworks/compute/OpenCL.scala b/OpenCL/src/main/scala/com/thoughtworks/compute/OpenCL.scala index b283eeef..82ff59a4 100644 --- a/OpenCL/src/main/scala/com/thoughtworks/compute/OpenCL.scala +++ b/OpenCL/src/main/scala/com/thoughtworks/compute/OpenCL.scala @@ -784,12 +784,6 @@ object OpenCL { extends AnyVal with MonadicCloseable[UnitContinuation] { - private def numberOfKernels: Int = { - val result = Array.ofDim[Int](1) - checkErrorCode(clCreateKernelsInProgram(handle, null, result)) - result(0) - } - def deviceIds: Seq[DeviceId[Owner]] = { val stack = stackPush() try { @@ -797,7 +791,7 @@ object OpenCL { checkErrorCode(clGetProgramInfo(this.handle, CL_PROGRAM_DEVICES, null: PointerBuffer, sizeBuffer)) val numberOfDeviceIds = sizeBuffer.get(0).toInt / POINTER_SIZE val programDevicesBuffer = stack.mallocPointer(numberOfDeviceIds) - checkErrorCode(clGetProgramInfo(this.handle, CL_PROGRAM_DEVICES, programDevicesBuffer, sizeBuffer)) + checkErrorCode(clGetProgramInfo(this.handle, CL_PROGRAM_DEVICES, programDevicesBuffer, null: PointerBuffer)) (0 until numberOfDeviceIds).map { i => DeviceId[Owner](programDevicesBuffer.get(i)) } @@ -806,27 +800,16 @@ object OpenCL { } } - def createKernels(): Seq[Kernel[Owner]] = { - (0 until createKernelBuffer().capacity).map { i => - Kernel[Owner](createKernelBuffer().get(i)) - } - } - - private def createKernelBuffer(): PointerBuffer = { - val kernelBuffer = BufferUtils.createPointerBuffer(numberOfKernels) - checkErrorCode(clCreateKernelsInProgram(handle, kernelBuffer, null: IntBuffer)) - kernelBuffer + def createKernels()(implicit witness: Witness.Aux[Owner]): Seq[Kernel[Owner]] = { + witness.value.createKernels(this.asInstanceOf[witness.value.Program]).asInstanceOf[Seq[Kernel[Owner]]] } - def createFirstKernel(): Kernel[Owner] = { - val stack = stackPush() - try { - val kernelBuffer = stack.mallocPointer(1) - checkErrorCode(clCreateKernelsInProgram(handle, kernelBuffer, null: IntBuffer)) - Kernel(kernelBuffer.get(0)) - } finally { - stack.close() - } + /** Creates single kernel from this [[Program]]. + * + * @throws InvalidValue if the this [[Program]] has more than one kernel. + */ + def createKernel()(implicit witness: Witness.Aux[Owner]): Kernel[Owner] = { + witness.value.createKernel(this.asInstanceOf[witness.value.Program]).asInstanceOf[Kernel[Owner]] } private def buildLogs(deviceIds: Seq[DeviceId[Owner]]): Map[DeviceId[Owner], String] = { @@ -1044,11 +1027,58 @@ object OpenCL { } } + /** Make the calls to [[createKernels]] and [[createKernel]] synchronized. + * + * @note If you are using Intel OpenCL SDK, you will need this plug-in as a workaround + * @see [[https://software.intel.com/en-us/forums/opencl/topic/760981 + * Bug report: clCreateKernelsInProgram is not thread-safe]] + */ + trait SynchronizedCreatingKernel extends OpenCL { + override protected def createKernels(program: Program): Seq[Kernel] = synchronized { + super.createKernels(program) + } + + override protected def createKernel(program: Program): Kernel = synchronized { + super.createKernel(program) + } + } + } trait OpenCL extends MonadicCloseable[UnitContinuation] with ImplicitsSingleton with DefaultCloseable { import OpenCL._ + protected def createKernels(program: Program): Seq[Kernel] = { + val stack = stackPush() + try { + val numberOfKernelsBuffer = stack.mallocInt(1) + checkErrorCode(clCreateKernelsInProgram(program.handle, null, numberOfKernelsBuffer)) + val numberOfKernels = numberOfKernelsBuffer.get(0) + val kernelBuffer = stack.mallocPointer(numberOfKernels) + checkErrorCode(clCreateKernelsInProgram(program.handle, kernelBuffer, null: IntBuffer)) + (0 until kernelBuffer.capacity).map { i => + new Kernel(kernelBuffer.get(i)) + } + } finally { + stack.close() + } + } + + /** Creates single kernel from this [[Program]]. + * + * @throws InvalidValue if the this [[Program]] has more than one kernel. + */ + protected def createKernel(program: Program): Kernel = { + val stack = stackPush() + try { + val kernelBuffer = stack.mallocPointer(1) + checkErrorCode(clCreateKernelsInProgram(program.handle, kernelBuffer, null: IntBuffer)) + new Kernel(kernelBuffer.get(0)) + } finally { + stack.close() + } + } + protected val logger: Logger type Program = OpenCL.Program[this.type] diff --git a/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala b/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala index d8bb2265..fb6e10f0 100644 --- a/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala +++ b/Tensors/src/main/scala/com/thoughtworks/compute/Tensors.scala @@ -461,7 +461,7 @@ trait Tensors extends OpenCL { val doBuffer: Do[PendingBuffer[Float]] = { val size = shape.product allocateBuffer[Float](size).flatMap { buffer => - Do.monadicCloseable(randomProgram.createFirstKernel()).intransitiveFlatMap { kernel => + Do.monadicCloseable(randomProgram.createKernel()).intransitiveFlatMap { kernel => kernel(0) = buffer kernel(1) = seed dispatch(kernel.enqueue(_, globalWorkSize = Array(size.toLong))).map(EventBuffer[Float](buffer, _)) @@ -487,7 +487,7 @@ trait Tensors extends OpenCL { size } allocateBuffer[Float](paddingSize).flatMap { buffer => - Do.monadicCloseable(randomNormalProgram.createFirstKernel()).intransitiveFlatMap { kernel => + Do.monadicCloseable(randomNormalProgram.createKernel()).intransitiveFlatMap { kernel => kernel(0) = buffer kernel(1) = seed val globalWorkSize = Array((paddingSize / 2).toLong) @@ -578,7 +578,7 @@ trait Tensors extends OpenCL { dispatch { commandQueue => commandQueue.deviceId.deviceType match { case CL_DEVICE_TYPE_CPU => - Do.monadicCloseable(programs.sequentialReductionProgram.createFirstKernel()).intransitiveFlatMap { + Do.monadicCloseable(programs.sequentialReductionProgram.createKernel()).intransitiveFlatMap { kernel1: Kernel => kernel1(0) = inputPendingBuffer.buffer kernel1(1) = length @@ -592,7 +592,7 @@ trait Tensors extends OpenCL { ) } case _ => - Do.monadicCloseable(programs.parallelReductionProgram.createFirstKernel()).intransitiveFlatMap { + Do.monadicCloseable(programs.parallelReductionProgram.createKernel()).intransitiveFlatMap { kernel1: Kernel => val stage1LocalWorkSize: Long = math.min(length, kernel1.workGroupSize(commandQueue.deviceId)) val maxNumberOfReductionGroups = commandQueue.deviceId.maxComputeUnits @@ -627,7 +627,7 @@ trait Tensors extends OpenCL { waitingEvents = inputPendingBuffer.eventOption.map(_.handle).toSeq ) .intransitiveFlatMap { scratchEvent: Event => - Do.monadicCloseable(programs.parallelReductionProgram.createFirstKernel()) + Do.monadicCloseable(programs.parallelReductionProgram.createKernel()) .intransitiveFlatMap { kernel2: Kernel => // FIXME: An exception thrown here will not be handled. Need further investigation. @@ -1005,7 +1005,7 @@ trait Tensors extends OpenCL { } .unwrap .intransitiveFlatMap { arguments: List[PendingBuffer[_]] => - Do.monadicCloseable(program.createFirstKernel()).intransitiveFlatMap { kernel: Kernel => + Do.monadicCloseable(program.createKernel()).intransitiveFlatMap { kernel: Kernel => val valueType = convertedTerm.valueType.asInstanceOf[ValueType] val memory = valueType.memory.asInstanceOf[Memory[convertedTerm.JvmValue]] allocateBuffer[convertedTerm.JvmValue](shape.product)(memory).flatMap { outputBuffer => diff --git a/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala b/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala index e2ce6694..fbd9fe0b 100644 --- a/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala +++ b/benchmarks/src/jmh/scala/com/thoughtworks/compute/benchmarks.scala @@ -36,6 +36,7 @@ object benchmarks { with OpenCL.GlobalExecutionContext with OpenCL.CommandQueuePool with OpenCL.DontReleaseEventTooEarly + with OpenCL.SynchronizedCreatingKernel with Tensors.WangHashingRandomNumberGenerator { @transient protected lazy val (platformId: PlatformId, deviceIds: Seq[DeviceId]) = {