Skip to content

Commit

Permalink
Add FutureUtil tests (#3126)
Browse files Browse the repository at this point in the history
* Add FutureUtilTest for both FutureUtil.batchAndParallelExecute and FutureUtil.batchAndSyncExecute

* Make Vector same size as available processors as CI machines don't have as many processors as I have locally

* Try to fix test case

* Add processors to the log

* Replace java scheduler with a scalajs scheduler called monix

* Make asyncUtilsTest compatible with Scalajs, add them to the CI matrix

* Add more missing runs of asyncUtilsTestJS/test
  • Loading branch information
Christewart committed Jun 28, 2021
1 parent d7b753a commit a9292fc
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-cache
- name: run tests
run: sbt ++2.12.14 downloadBitcoind coverage keyManagerTest/test keyManager/coverageReport keyManager/coverageAggregate keyManager/coveralls feeProviderTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls dlcOracleTest/test asyncUtilsTestJVM/test oracleExplorerClient/test dlcOracle/coverageReport dlcOracle/coverageAggregate dlcOracle/coveralls
run: sbt ++2.12.14 downloadBitcoind coverage keyManagerTest/test keyManager/coverageReport keyManager/coverageAggregate keyManager/coveralls feeProviderTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls dlcOracleTest/test asyncUtilsTestJVM/test asyncUtilsTestJS/test oracleExplorerClient/test dlcOracle/coverageReport dlcOracle/coverageAggregate dlcOracle/coveralls
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-cache
- name: run tests
run: sbt ++2.13.6 downloadBitcoind coverage keyManagerTest/test keyManager/coverageReport keyManager/coverageAggregate keyManager/coveralls feeProviderTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls dlcOracleTest/test asyncUtilsTestJVM/test oracleExplorerClient/test dlcOracle/coverageReport dlcOracle/coverageAggregate dlcOracle/coveralls
run: sbt ++2.13.6 downloadBitcoind coverage keyManagerTest/test keyManager/coverageReport keyManager/coverageAggregate keyManager/coveralls feeProviderTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls dlcOracleTest/test asyncUtilsTestJVM/test asyncUtilsTestJS/test oracleExplorerClient/test dlcOracle/coverageReport dlcOracle/coverageAggregate dlcOracle/coveralls
2 changes: 1 addition & 1 deletion .github/workflows/Linux_2.13_ScalaJS_Tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-cache
- name: run tests
run: sbt ++2.13.6 cryptoTestJS/test coreJS/test 'set scalaJSStage in Global := FullOptStage' cryptoTestJS/test coreJS/test
run: sbt ++2.13.6 cryptoTestJS/test coreJS/test 'set scalaJSStage in Global := FullOptStage' cryptoTestJS/test coreJS/test asyncUtilsTestJS/test
2 changes: 1 addition & 1 deletion .github/workflows/Windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-cache
- name: Windows Crypto, Core, and Database tests
run: sbt ++2.13.6 cryptoTestJVM/test coreTestJVM/test dlcTest/test dbCommonsTest/test asyncUtilsTestJVM/test
run: sbt ++2.13.6 cryptoTestJVM/test coreTestJVM/test dlcTest/test dbCommonsTest/test asyncUtilsTestJVM/test asyncUtilsTestJS/test
shell: bash
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,19 @@ class AsyncUtilTest extends BitcoinSJvmTest {
}

it must "handle blocking tasks ok" in {
//schedule a blocking task first
val start = TimeUtil.currentEpochMs
val sleepMs = 10000
val stop = start + sleepMs
def blockingTask(): Boolean = {
Thread.sleep(sleepMs)
while (stop < System.currentTimeMillis()) {
//do nothing, block until 10 seconds has passed
//can't do the dumb thing and use Thread.sleep()
//as that isn't available on scalajs
}
true
}

//schedule a blocking task first
val start = TimeUtil.currentEpochMs
val _ =
AsyncUtil.awaitCondition(blockingTask)

Expand All @@ -148,10 +153,10 @@ class AsyncUtilTest extends BitcoinSJvmTest {
}

it must "handle async blocking tasks ok" in {
val sleepMs = 10000
def blockingTask(): Future[Boolean] = Future {
Thread.sleep(sleepMs)
true
def blockingTask(): Future[Boolean] = {
AsyncUtil
.nonBlockingSleep(10.seconds)
.map(_ => true)
}

//schedule a blocking task first
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.bitcoins.asyncutil

import org.bitcoins.asyncutil.AsyncUtil.scheduler
import org.bitcoins.core.api.asyncutil.AsyncUtilApi

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
import java.util.concurrent.{ThreadFactory, TimeUnit}
import scala.concurrent._
import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand Down Expand Up @@ -95,8 +94,9 @@ abstract class AsyncUtil extends AsyncUtilApi {
val p = Promise[Boolean]()
val runnable = retryRunnable(condition, p)

AsyncUtil.scheduler
.schedule(runnable, interval.toMillis, TimeUnit.MILLISECONDS)
AsyncUtil.scheduler.scheduleOnce(interval.toMillis,
TimeUnit.MILLISECONDS,
runnable)

p.future.flatMap {
case true => Future.unit
Expand Down Expand Up @@ -147,17 +147,16 @@ abstract class AsyncUtil extends AsyncUtilApi {
override def nonBlockingSleep(duration: FiniteDuration): Future[Unit] = {
val p = Promise[Unit]()
val r: Runnable = () => p.success(())
scheduler.schedule(r, duration.toMillis, TimeUnit.MILLISECONDS)
AsyncUtil.scheduler.scheduleOnce(duration.toMillis,
TimeUnit.MILLISECONDS,
r)
p.future
}
}

object AsyncUtil extends AsyncUtil {

private[this] val threadFactory = getNewThreadFactory("bitcoin-s-async-util")

private[bitcoins] val scheduler =
Executors.newScheduledThreadPool(2, threadFactory)
private[bitcoins] val scheduler = monix.execution.Scheduler.global

/** The default interval between async attempts
*/
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ lazy val asyncUtils = crossProject(JVMPlatform, JSPlatform)
.crossType(CrossType.Pure)
.in(file("async-utils"))
.settings(CommonSettings.prodSettings: _*)
.settings(name := "bitcoin-s-async-utils")
.settings(name := "bitcoin-s-async-utils",
libraryDependencies ++= Deps.asyncUtils.value)
.jvmSettings(CommonSettings.jvmSettings: _*)
.jsSettings(commonJsSettings: _*)
.dependsOn(core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class CallbackTest extends BitcoinSJvmTest {
new RuntimeException("2nd callback did not start before timeout"))
}
}
AsyncUtil.scheduler.schedule(runnable,
testTimeout.toMillis,
TimeUnit.MILLISECONDS)
AsyncUtil.scheduler.scheduleOnce(testTimeout.toMillis,
TimeUnit.MILLISECONDS,
runnable)
promise.future.map(_ => ())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.bitcoins.core.util

import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.testkitcore.util.BitcoinSJvmTest
import org.scalatest.compatible.Assertion

import java.time.temporal.ChronoUnit
import scala.concurrent._
import scala.concurrent.duration.DurationInt

class FutureUtilTest extends BitcoinSJvmTest {
it must "execute futures sequentially in the correct order" in {
Expand Down Expand Up @@ -44,4 +47,66 @@ class FutureUtilTest extends BitcoinSJvmTest {

futs.map(xs => assert(List(1, 2) == xs)).flatMap(_ => assertionF)
}

it must "execute futures in synchronous fashion with batchAndSyncExecute" in {
val vec = 0.until(Runtime.getRuntime.availableProcessors()).toVector

val f: Vector[Int] => Future[Vector[Int]] = { vec =>
AsyncUtil
.nonBlockingSleep(1.second)
.map(_ => vec)
}
val start = TimeUtil.now
val doneF =
FutureUtil.batchAndSyncExecute(elements = vec, f = f, batchSize = 1)

//here is how this test case works:
//the vector above has the same number of elements as available processors in it, and the batchSize is 1
//each function sleeps for 1000ms (1 second). If things are
//not run in parallel, the total time should be 5 seconds (5 elements * 1 second sleep)
for {
_ <- doneF
stop = TimeUtil.now
} yield {
val difference = ChronoUnit.MILLIS.between(start, stop)
if (difference >= Runtime.getRuntime.availableProcessors() * 1000) {
succeed
} else {
fail(
s"Batch did not execute in parallel! difference=${difference} seconds")
}
}
}

it must "execute futures in parallel with batchAndParallelExecute" in {
val vec = 0.until(Runtime.getRuntime.availableProcessors()).toVector

val f: Vector[Int] => Future[Vector[Int]] = { vec =>
AsyncUtil
.nonBlockingSleep(1.second)
.map(_ => vec)
}
val start = TimeUtil.now
val doneF =
FutureUtil.batchAndParallelExecute(elements = vec, f = f, batchSize = 1)

//here is how this test case works:
//the vector above has the same number of elements as available processors, and the batchSize is 1
//each function sleeps for 1000ms (1 second).
//if things are run in parallel, it should take ~1 second to run all these in parallel
for {
_ <- doneF
stop = TimeUtil.now
} yield {
val difference = ChronoUnit.MILLIS.between(start, stop)
if (difference < 2000) {
succeed
} else {
fail(
s"Batch did not execute in parallel! difference=${difference} seconds processors=${Runtime.getRuntime
.availableProcessors()}")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object FutureUtil {
}

/** Batches the elements by batchSize, executes f, and then aggregates all of the results
* into a vector and returns it. This is is the synchronous version of [[batchAndParallelExecute()]]
* into a vector and returns it. This is is the synchronous version of batchAndParallelExecute
*/
def batchAndSyncExecute[T, U](
elements: Vector[T],
Expand Down
9 changes: 9 additions & 0 deletions project/Deps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object Deps {
val scalaJsTimeV = "2.3.0"
val zxingV = "3.4.1"

val monixV = "3.4.0"
}

object Compile {
Expand Down Expand Up @@ -224,6 +225,10 @@ object Deps {

val zxingJ2SE =
"com.google.zxing" % "javase" % V.zxingV withSources () withJavadoc ()

val monixExecution =
Def.setting(
"io.monix" %%% "monix-execution" % V.monixV withSources () withJavadoc ())
}

object Test {
Expand Down Expand Up @@ -266,6 +271,10 @@ object Deps {
"com.opentable.components" % "otj-pg-embedded" % V.pgEmbeddedV % "test" withSources () withJavadoc ()
}

def asyncUtils = Def.setting {
Vector(Compile.monixExecution.value)
}

val chain = List(
Compile.logback
)
Expand Down

0 comments on commit a9292fc

Please sign in to comment.