Skip to content

bosens-China/async-queue

Repository files navigation

async-queue

管理异步队列的工具,支持任务重试、以及并发和 stream 模式。

# 两种模式下完成任务的时间
tasks = [1s,2s,4s,3s];
max = 2
# 并发
tasks => [1s,2s,4s,3s] => [1s,2s],[4s,3s] => 6s # 取两项之间的最大值
# 流模式
tasks => [1s,2s,4s,3s] => [1s,2s],[4s,3s] => [4s,1s],[3s] => [3s, 3s] => 5s

快速上手

安装

yarn add @boses/async-queue

使用方式

import { asyncQueue } from '@boses/async-queue';
// commonjs 引用方式
// const { asyncQueue } = require('@boses/async-queue');

const App = async () => {
  // 异步请求,隐藏具体过程
  const getRquire = [
    async () => {
      // xxx
    },
    async () => {
      // xxx
    },
  ];
  const result = await asyncQueue(getRquire);
  console.log(result);
};

App();

Api

async-queue 暴露五个 api

asyncQueue

(tasks: Array<Function>, options?: Options) => AsyncQueueValue

创建异步队列任务,具体说明如下

tasks

等待执行的任务列表,传递值必须为 Array<Function>

你可能会疑惑,为什么要求必须为 () => xxx 的形式?

这是因为大多数的情况下任务都是通过函数形式调用,例如给定一个 tasks 通常情况下它会完成一次网络请求或执行一次操作,如果不通过函数形式调用,它的值始终为固定形式。

为了方便使用,你可以结合lodash之类的函数库使用,下面是一个例子

import { asyncQueue } from '@boses/async-queue';
import _ from 'lodash';
const fn = () => {};
// 在这里传递参数
asyncQueue([_.partial(fn, 'test')]);

options

名称 类型 默认值 描述
max number 2 最大请求数
waitTime number or ((index: number, retry: number) => number) 0 每次请求前等待时间,index 为任务索引 retry 为重试次数(在并发模式下,max > index 的索引都不会执行此选项,而在流模式,第一个索引也不会执行)
waitTaskTime number or (() => number) 0 每批任务结束后等待时间,注意flowMode 模式下不起作用(在最后一批任务不受此影响)
throwError boolean true 是否抛出错误,如果为 false 会把错误值当结果记录下来
retryCount number 0 任务重试次数,你可以理解成额外运行次数,例如重试为 3,则这个程序最多运行 4 次
flowMode boolean true 是否为流模式,并发和流的区别可以查看文档开头开头例子

defaults

上述 options 的初始值都包含在 defaults 对象上,因此你可以直接修改这个对象,做到全局修改。

注意 asyncQueueSingle 函数的默认值也同样受此影响

import { asyncQueue } from '@boses/async-queue';
asyncQueue.defaults.max = 1;

AsyncQueueValue

asyncQueue 的返回值,具体返回值如下。

名称 描述
addListener 添加监听器
removeListener 删除监听器,如果不提供 fn 参数则删除全部

注意:addListener 添加的监听器会在任务完成自动 removeListener,所以不是中途取消监听,无需手动调用。

上面的方法返回的都是 this,因此可以很容易链式调用。

asyncQueueSingle

(tasks: Function, options?: SingleOptions) => Promise<any>

asyncQueue 使用基本一致,为了方便使用而封装的单次使用方法,具体 Api 如下

SingleOptions

名称 类型 默认值 描述
throwError boolean true 是否抛出错误
retryCount number 0 任务重试次数

create

create(options) => ({asyncQueue, asyncQueueSingle})

每次创建任务都需要填写相同的配置项,你会觉得繁琐这也是为什么有 create 这个方法的原因,它接收一个 options

返回值为 asyncQueue 以及 asyncQueueSingle

sleep

const sleep = (time: number):Promise<void>

在一定时间内,让程序等待执行

await sleep(3000);
// 3s后执行这条语句
console.log(`被执行`);

random

const random = (min: number, max: number, integer = true):number

返回传递 min 和 max 区间的数,integer 决定是否返回整数

// 返回1-10之间的整数
const num = random(1, 10);

例子

链式调用

import { asyncQueue, wait } from '@boses/async-queue';
const result = await asyncQueue([
  async () => {
    await wait(100);
    return 1;
  },
]).addListener(() => {
  // 监听变化
});

console.log(result);
// 1

爬虫

const axios = require('axios');
const { asyncQueue } = require('@boses/async-queue');

const getPage = async (page = 1) => {
  const { data } = await axios.get(`xxx?p=${page}`);
  return data;
};

const App = async () => {
  const tasks = Array.from({ length: 4 })
    .fill()
    .map((_, index) => () => getPage(index + 1));
  const data = await asyncQueue(tasks, {
    max: 2,
    waitTime: 1000,
    waitTaskTime: 1000,
    retryCount: 1,
  }).addListener((value) => {
    console.log(`当前完成进度为: ${value.progress.toFixed(2)}, 状态为: ${value.status}`);
  });
  console.log(data);
};

App();

批量插入 element

import { asyncQueue } from '@boses/async-queue';

const app = document.body.querySelector('#app');

const ul = document.createElement('ul');

const task = (_, index) => {
  return () => {
    const li = document.createElement('li');
    li.textContent = `当前元素为:${index + 1}项`;
    ul.appendChild(li);
  };
};

// 为了防止页面卡顿,分批插入
const tasks = Array.from({ length: 1000 }).fill('').map(task);

asyncQueue(tasks, { max: 100, waitTaskTime: 10 }).then((res) => {
  app.appendChild(ul);
});

重试

对于爬虫之类的任务,你可能想着可以失败后重试,在 asyncQueue 中可以轻松实现,只需要指定 retryCount 属性即可。

下面是一个示例,在计次小于 3 的时候一直失败

import { asyncQueue } from '@boses/async-queue';

let i = 0;
const fn = async () => {
  if (++i < 3) {
    throw new Error(`error`);
  }
  return i;
};

const result = await asyncQueueSingle(fn, { retryCount: 2 });
// result to 3

兼容性

支持现代浏览器,因为使用了 proxy 等特性,所以在使用 webpack 等构建工具时,让务必让 babel 转译此模块。

//  vue.config.js
module.exports = {
  transpileDependencies: ['@boses/async-queue'],
};
module: {
  rules: [
    {
      test: /\.m?js$/,
      exclude: (modulePath) => {
        // 禁止过滤
        if (modulePath.includes('@boses/async-queue')) {
          return false;
        }
        return /node_modules/.test(modulePath);
      },
      use: {
        loader: 'babel-loader',
        options: {
          presets: ['@babel/preset-env'],
        },
      },
    },
  ];
}

其他

如果你有其他建议欢迎反馈和 pr

升级计划

之后会对版本的 options 进行调整,支持以下选项

  • addListener 新增 this,可以通过 this 来修改 options
  • 拓展 waitTime 属性,让其支持函数或者对象
    • ignoreFirst 决定是否忽略首次不执行 waitTime 限制
    • retryIgnore 重试是否受 waitTime 参数影响
  • 调整强制 error 参数为 Error 类型,如果是对象会导致报错信息丢失,改为任意

待完成工作

  • jest 测试用例完善
  • 如果有必要,使用构建工具完成打包

协议

MIT License

Releases

No releases published

Packages

No packages published