Skip to content

foldright/cffu

Repository files navigation

🦝 CompletableFuture Fu (CF-Fu)

Github Workflow Build Status Github Workflow Build Status Codecov Java support Kotlin License Javadocs dokka Maven Central GitHub Releases GitHub Stars GitHub Forks GitHub Issues GitHub Contributors GitHub repo size gitpod: Ready to Code

👉 cffuCompletableFuture Fu 🦝)是一个小小的CompletableFuture(CF)辅助增强库,提升CF使用体验并减少误用,期望在业务中更方便高效安全地使用CF

欢迎 👏 💖

shifu



🔧 功能

提供的功能有:

  • ☘️ 补全业务使用中缺失的功能
    • 更方便的功能,如
      • allResultsOf方法:返回多个CF的结果,而不是无返回结果VoidCompletableFuture#allOf()
      • allTupleOf方法:返回多个CF不同类型的结果,而不是同一类型(allResultsOf
    • 更高效灵活的并发执行策略,如
      • allOfFastFail方法:有CF失败时快速返回,而不再等待所有CF运行完成(allOf
      • anyOfSuccess方法:返回首个成功的CF结果,而不是首个完成(但可能失败)的CFanyOf
    • 更安全的使用方式,如
      • 支持设置缺省的业务线程池,CffuFactory#builder(executor)方法
      • 支持超时的join的方法,join(timeout, unit)方法
      • 支持禁止强制篡改,CffuFactoryBuilder#forbidObtrudeMethods方法
      • 在类方法附加完善的代码质量注解(如@NonNull@Nullable@CheckReturnValue@Contract等),在编码时IDE能尽早提示出问题
  • 💪 已有功能的增强,如
    • anyOf方法:返回类型是T(类型安全),而不是返回ObjectCompletableFuture#anyOf()
  • Backport支持Java 8Java 9+高版本的所有CF新功能在Java 8等低Java版本直接可用,如
    • 超时控制:orTimeout/completeOnTimeout方法
    • 延迟执行:delayedExecutor方法
    • 工厂方法:failedFuture/completedStage/failedStage
    • 处理操作:completeAsync/exceptionallyAsync/exceptionallyCompose/copy
  • 🍩 一等公民支持Kotlin

更多cffu的使用方式与功能说明详见 User Guide

关于CompletableFuture

如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。

并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单

其中CompletableFuture(CF)有其优点:

  • Java标准库内置
    • 无需额外依赖,几乎总是可用
    • 相信有极高的实现质量
  • 广为人知广泛使用,有一流的群众基础
    • CompletableFuture在2014年发布的Java 8提供,有~10年了
    • CompletableFuture的父接口Future早在2004年发布的Java 5中提供,有~20年了
    • 虽然Future接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java开发者熟悉了Future这个典型的概念与工具
  • 功能强大、但不会非常庞大复杂
    • 足以应对日常的业务需求开发
    • 其它的大型并发框架(比如AkkaRxJava)在使用上需要理解的内容要多很多
    • 当然基本的并发关注方面及其复杂性,与具体使用哪个工具无关,都是要理解与注意的
  • 高层抽象

和其它并发工具、框架一样,CompletableFuture用于

  • 并发执行业务逻辑,或说编排并发的处理流程/处理任务
  • 利用多核并行处理
  • 提升业务响应性

值得更深入了解和应用。 💕

👥 User Guide

1. cffu的三种使用方式

cffu支持三种使用方式:

  • 🦝 1) 使用Cffu
    • 项目使用Java语言时,推荐这种使用方式
    • 直接使用CompletableFuture类的代码可以比较简单的迁移到Cffu类,包含2步修改:
    • 依赖io.foldright:cffu
  • 🛠️️ 2) 使用CompletableFutureUtils工具类
    • 如果你不想在项目中引入新类(Cffu类)、觉得这样增加了复杂性的话,
      • 完全可以把cffu库作为一个工具类来用
      • 优化CompletableFuture使用的工具方法在业务项目中很常见,CompletableFutureUtils提供了一系列实用可靠的工具方法
    • 这种使用方式有些cffu功能没有提供(也没有想到实现方案) 😔
      如支持设置缺省的业务线程池、禁止强制篡改
    • 依赖io.foldright:cffu
  • 🍩 3) 使用Kotlin扩展方法
    • 项目使用Kotlin语言时,推荐这种使用方式
    • 要依赖io.foldright:cffu-kotlin

在介绍功能点之前,可以先看看cffu不同使用方式的示例。 🎪

1) Cffu

public class CffuDemo {
  private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
  // Create a CffuFactory with configuration of the customized thread pool
  private static final CffuFactory cffuFactory = CffuFactory.builder(myBizThreadPool).build();

  public static void main(String[] args) throws Exception {
    final Cffu<Integer> cf42 = cffuFactory
        .supplyAsync(() -> 21)  // Run in myBizThreadPool
        .thenApply(n -> n * 2);

    // Below tasks all run in myBizThreadPool
    final Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
      sleep(1001);
      return n / 2;
    });
    final Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
      sleep(1002);
      return n / 2;
    });
    final Cffu<Integer> longTaskC = cf42.thenApplyAsync(n -> {
      sleep(100);
      return n * 2;
    });
    final Cffu<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
      sleep(1000);
      throw new RuntimeException("Bang!");
    });

    final Cffu<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum)
        .orTimeout(1500, TimeUnit.MILLISECONDS);
    System.out.println("combined result: " + combined.get());

    final Cffu<Integer> anyOfSuccess = cffuFactory.anyOfSuccess(longTaskC, longFailedTask);
    System.out.println("anyOfSuccess result: " + anyOfSuccess.get());
  }
}

# 完整可运行的Demo代码参见CffuDemo.java

2) CompletableFutureUtils工具类

public class CompletableFutureUtilsDemo {
  private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();

  public static void main(String[] args) throws Exception {
    final CompletableFuture<Integer> cf42 = CompletableFuture
        .supplyAsync(() -> 21, myBizThreadPool)  // Run in myBizThreadPool
        .thenApply(n -> n * 2);

    final CompletableFuture<Integer> longTaskA = cf42.thenApplyAsync(n -> {
      sleep(1001);
      return n / 2;
    }, myBizThreadPool);
    final CompletableFuture<Integer> longTaskB = cf42.thenApplyAsync(n -> {
      sleep(1002);
      return n / 2;
    }, myBizThreadPool);
    final CompletableFuture<Integer> longTaskC = cf42.thenApplyAsync(n -> {
      sleep(100);
      return n * 2;
    }, myBizThreadPool);
    final CompletableFuture<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
      sleep(1000);
      throw new RuntimeException("Bang!");
    }, myBizThreadPool);

    final CompletableFuture<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum);
    final CompletableFuture<Integer> combinedWithTimeout =
        CompletableFutureUtils.orTimeout(combined, 1500, TimeUnit.MILLISECONDS);
    System.out.println("combined result: " + combinedWithTimeout.get());

    final CompletableFuture<Integer> anyOfSuccess = CompletableFutureUtils.anyOfSuccess(longTaskC, longFailedTask);
    System.out.println("anyOfSuccess result: " + anyOfSuccess.get());
  }
}

# 完整可运行的Demo代码参见CompletableFutureUtilsDemo.java

3) Kotlin扩展方法

private val myBizThreadPool: ExecutorService = Executors.newCachedThreadPool()

// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = CffuFactory.builder(myBizThreadPool).build()

fun main() {
  val cf42 = cffuFactory
    .supplyAsync { 21 }   // Run in myBizThreadPool
    .thenApply { it * 2 }

  // Below tasks all run in myBizThreadPool
  val longTaskA = cf42.thenApplyAsync { n: Int ->
    sleep(1001)
    n / 2
  }
  val longTaskB = cf42.thenApplyAsync { n: Int ->
    sleep(1002)
    n / 2
  }
  val longTaskC = cf42.thenApplyAsync { n: Int ->
    sleep(100)
    n * 2
  }
  val longFailedTask = cf42.thenApplyAsync<Int> { _ ->
    sleep(1000)
    throw RuntimeException("Bang!")
  }

  val combined = longTaskA.thenCombine(longTaskB, Integer::sum)
    .orTimeout(1500, TimeUnit.MILLISECONDS)
  println("combined result: ${combined.get()}")

  val anyOfSuccess: Cffu<Int> = listOf(longTaskC, longFailedTask).anyOfSuccessCffu()
  println("anyOfSuccess result: ${anyOfSuccess.get()}")
}

# 完整可运行的Demo代码参见CffuDemo.kt

2. cffu功能介绍

2.1 返回多个运行CF的结果

CompletableFutureallOf方法没有返回结果,只是返回Void,不方便获得所运行的多个CF结果。
# 要再通过入参CFget方法来获取结果。

cffuallResultsOf方法提供了返回多个CF结果的功能。

示例代码如下:

public class AllResultsOfDemo {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) throws Exception {
    //////////////////////////////////////////////////
    // CffuFactory#allResultsOf
    //////////////////////////////////////////////////
    Cffu<Integer> cffu1 = cffuFactory.completedFuture(21);
    Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);

    Cffu<Void> allOf = cffuFactory.allOf(cffu1, cffu2);
    // Result type is Void!
    //
    // the result can be got by input argument `cf1.get()`, but it's cumbersome.
    // so we can see a lot the util methods to enhance allOf with result in our project.

    Cffu<List<Integer>> allResults = cffuFactory.allResultsOf(cffu1, cffu2);
    System.out.println(allResults.get());

    //////////////////////////////////////////////////
    // or CompletableFutureUtils#allResultsOf
    //////////////////////////////////////////////////
    CompletableFuture<Integer> cf1 = CompletableFuture.completedFuture(21);
    CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);

    CompletableFuture<Void> allOf2 = CompletableFuture.allOf(cf1, cf2);
    // Result type is Void!

    CompletableFuture<List<Integer>> allResults2 = CompletableFutureUtils.allResultsOf(cf1, cf2);
    System.out.println(allResults2.get());
  }
}

# 完整可运行的Demo代码参见AllResultsOfDemo.java

上面多个相同结果类型的CFcffu还提供了返回多个不同类型CF结果的方法,allTupleOf方法。

示例代码如下:

public class AllTupleOfDemo {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) throws Exception {
    //////////////////////////////////////////////////
    // allTupleOf
    //////////////////////////////////////////////////
    Cffu<String> cffu1 = cffuFactory.completedFuture("21");
    Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);

    Cffu<Tuple2<String, Integer>> allTuple = cffu1.allTupleOf(cffu2);
    // or: cffuFactory.allTupleOf(cffu1, cffu2);
    System.out.println(allTuple.get());

    //////////////////////////////////////////////////
    // or CompletableFutureUtils.allTupleOf
    //////////////////////////////////////////////////
    CompletableFuture<String> cf1 = CompletableFuture.completedFuture("21");
    CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);

    CompletableFuture<Tuple2<String, Integer>> allTuple2 = CompletableFutureUtils.allTupleOf(cf1, cf2);
    System.out.println(allTuple2.get());
  }
}

# 完整可运行的Demo代码参见AllTupleOfDemo.java

2.2 支持设置缺省的业务线程池

  • CompletableFuture执行执行(即CompletableFuture*Async方法),使用的缺省线程池是ForkJoinPool.commonPool()
  • 这个线程池差不多是CPU个线程,合适执行CPU密集的任务;对于业务逻辑,往往有很多等待操作(如网络IO、阻塞等待),并不是CPU密集的。
  • 业务使用这个缺省线程池ForkJoinPool.commonPool()是很危险的❗

结果就是,业务调用CompletableFuture*Async方法时,几乎每次都要反复传入业务线程池;这让CompletableFuture的使用很繁琐易错 🤯

示例代码如下:

public class NoDefaultExecutorSettingForCompletableFuture {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    CompletableFuture<Void> cf1 = CompletableFuture.runAsync(
        () -> System.out.println("doing a long time work!"),
        myBizExecutor);

    CompletableFuture<Void> cf2 = CompletableFuture
        .supplyAsync(
            () -> {
              System.out.println("doing another long time work!");
              return 42;
            },
            myBizExecutor)
        .thenAcceptAsync(
            i -> System.out.println("doing third long time work!"),
            myBizExecutor);

    CompletableFuture.allOf(cf1, cf2).join();
  }
}

# 完整可运行的Demo代码参见NoDefaultExecutorSettingForCompletableFuture.java

Cffu支持设置缺省的业务线程池,规避上面的繁琐与危险。示例代码如下:

public class DefaultExecutorSettingForCffu {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) {
    Cffu<Void> cf1 = cffuFactory.runAsync(() -> System.out.println("doing a long time work!"));

    Cffu<Void> cf2 = cffuFactory.supplyAsync(() -> {
      System.out.println("doing another long time work!");
      return 42;
    }).thenAcceptAsync(i -> System.out.println("doing third long time work!"));

    cffuFactory.allOf(cf1, cf2).join();
  }
}

# 完整可运行的Demo代码参见DefaultExecutorSettingForCffu.java

2.3 高效灵活的并发执行策略(allOfFastFail/anyOfSuccess

  • CompletableFutureallOf方法会等待所有输入CF运行完成;即使有CF失败了也要等待后续CF运行完成,再返回一个失败的CF
    • 对于业务逻辑来说,这样失败且继续等待策略,减慢了业务响应性;会希望如果有输入CF失败了,则快速失败不再做于事无补的等待
    • cffu提供了相应的allOfFastFail/allResultsOfFastFail方法
    • allOf/allOfFastFail两者都是,只有当所有的输入CF都成功时,才返回成功结果
  • CompletableFutureanyOf方法返回首个完成的CF(不会等待后续没有完成的CF,赛马模式);即使首个完成的CF是失败的,也会返回这个失败的CF结果。
    • 对于业务逻辑来说,会希望赛马模式返回首个成功的CF结果,而不是首个完成但失败的CF
    • cffu提供了相应的anyOfSuccess方法
    • anyOfSuccess只有当所有的输入CF都失败时,才返回失败结果

📔 关于多个CF的并发执行策略,可以看看JavaScript规范Promise Concurrency;在JavaScript中,Promise即对应CompletableFuture

JavaScript Promise提供了4个并发执行方法:

  • Promise.all():等待所有Promise运行成功,只要有一个失败就立即返回失败(对应cffuallOfFastFail方法)
  • Promise.allSettled():等待所有Promise运行完成,不管成功失败(对应cffuallOf方法)
  • Promise.any():赛马模式,立即返回首个成功的Promise(对应cffuanyOfSuccess方法)
  • Promise.race():赛马模式,立即返回首个完成的Promise(对应cffuanyOf方法)

PS:JavaScript Promise的方法命名真考究~ 👍

cffu新加2个方法后,对齐了JavaScript Promise规范的并发方法~ 👏

示例代码如下:

public class ConcurrencyStrategyDemo {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) throws Exception {
    ////////////////////////////////////////////////////////////////////////
    // CffuFactory#allOfFastFail / allResultsOfFastFail
    // CffuFactory#anyOfSuccess
    ////////////////////////////////////////////////////////////////////////
    final Cffu<Integer> successAfterLongTime = cffuFactory.supplyAsync(() -> {
      sleep(3000); // sleep LONG time
      return 42;
    });
    final Cffu<Integer> failed = cffuFactory.failedFuture(new RuntimeException("Bang!"));

    // Result type is Void!
    Cffu<Void> cffuAll = cffuFactory.allOfFastFail(successAfterLongTime, failed);

    Cffu<List<Integer>> fastFailed = cffuFactory.allResultsOfFastFail(successAfterLongTime, failed);
    // fast failed without waiting successAfterLongTime
    System.out.println(fastFailed.exceptionNow());

    Cffu<Integer> anyOfSuccess = cffuFactory.anyOfSuccess(successAfterLongTime, failed);
    System.out.println(anyOfSuccess.get());

    ////////////////////////////////////////////////////////////////////////
    // or CompletableFutureUtils#allOfFastFail / allResultsOfFastFail
    //    CompletableFutureUtils#anyOfSuccess
    ////////////////////////////////////////////////////////////////////////
    final CompletableFuture<Integer> successAfterLongTimeCf = CompletableFuture.supplyAsync(() -> {
      sleep(3000); // sleep LONG time
      return 42;
    });
    final CompletableFuture<Integer> failedCf = CompletableFutureUtils.failedFuture(new RuntimeException("Bang!"));

    // Result type is Void!
    CompletableFuture<Void> cfAll = CompletableFutureUtils.allOfFastFail(successAfterLongTimeCf, failedCf);

    CompletableFuture<List<Integer>> fastFailedCf = CompletableFutureUtils.allResultsOfFastFail(successAfterLongTimeCf, failedCf);
    // fast failed without waiting successAfterLongTime
    System.out.println(CompletableFutureUtils.exceptionNow(fastFailedCf));

    CompletableFuture<Integer> cfSuccess = CompletableFutureUtils.anyOfSuccess(successAfterLongTimeCf, failedCf);
    System.out.println(cfSuccess.get());
  }
}

# 完整可运行的Demo代码参见ConcurrencyStrategyDemo.java

2.4 支持超时的join的方法

cf.join()会「不超时永远等待」,在业务中很危险❗️当意外出现长时间等待时,会导致:

  • 主业务逻辑阻塞,没有机会做相应的处理,以及时响应用户
  • 会费掉一个线程,线程是很有限的资源(一般几百个),耗尽线程意味着服务瘫痪故障

join(timeout, unit)方法即支持超时的join的方法;就像cf.get(timeout, unit) 之于 cf.get()

这个新方法使用简单类似,不附代码示例。

2.5 Backport支持Java 8

Java 9+高版本的所有CF新功能在Java 8等低Java版本直接可用。

其中重要的Backport功能有:

  • 超时控制:orTimeout/completeOnTimeout方法
  • 延迟执行:delayedExecutor方法
  • 工厂方法:failedFuture/completedStage/failedStage
  • 处理操作:completeAsync/exceptionallyAsync/exceptionallyCompose/copy

这些backport的方法是CompletableFuture的已有功能,不附代码示例。

2.6 返回具体类型的anyOf方法

CompletableFuture.anyOf方法返回类型是Object,丢失具体类型,不够类型安全,使用时需要转型也不方便。

cffu提供了anyOf/anyOf方法,返回类型是T(类型安全),而不是返回ObjectCompletableFuture#anyOf())。

这个新方法使用简单类似,不附代码示例。

更多功能说明

可以参见:

3. 如何从直接使用CompletableFuture类迁移到Cffu

为了使用cffu增强功能,可以迁移已有直接使用CompletableFuture的代码到Cffu。包含2步修改:

  • 在类型声明地方,CompletableFuture改成Cffu
  • CompletableFuture静态方法调用的地方,类名CompletableFuture改成cffuFactory实例

之所以可以这样迁移,是因为:

  • CompletableFuture类的所有实例方法都在Cffu类,且有相同的方法签名与功能
  • CompletableFuture类的所有静态方法都在CffuFactory类,且有相同的方法签名与功能

🔌 API Docs

代码示例:

🍪依赖

可以在 central.sonatype.com 查看最新版本与可用版本列表。

  • cffu库(包含Java CompletableFuture的增强CompletableFutureUtils):
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu</artifactId>
        <version>1.0.0-Alpha5</version>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      implementation("io.foldright:cffu:1.0.0-Alpha5")
      // Gradle Groovy DSL
      implementation 'io.foldright:cffu:1.0.0-Alpha5'
  • cffu Kotlin支持库:
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu-kotlin</artifactId>
        <version>1.0.0-Alpha5</version>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      implementation("io.foldright:cffu-kotlin:1.0.0-Alpha5")
      // Gradle Groovy DSL
      implementation 'io.foldright:cffu-kotlin:1.0.0-Alpha5'
  • cffu bom:
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu-bom</artifactId>
        <version>1.0.0-Alpha5</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      implementation(platform("io.foldright:cffu-bom:1.0.0-Alpha5"))
      // Gradle Groovy DSL
      implementation platform('io.foldright:cffu-bom:1.0.0-Alpha5')
  • 📌 TransmittableThreadLocal(TTL)cffu executor wrapper SPI实现
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu-ttl-executor-wrapper</artifactId>
        <version>1.0.0-Alpha5</version>
        <scope>runtime</scope>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      runtimeOnly("io.foldright:cffu-ttl-executor-wrapper:1.0.0-Alpha5")
      // Gradle Groovy DSL
      runtimeOnly 'io.foldright:cffu-ttl-executor-wrapper:1.0.0-Alpha5'

📚 更多资料

👋 关于库名

cffuCompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅

嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝

shifu