Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

有并发限制的Promise.all(ts类型) #147

Open
Sunny-117 opened this issue Nov 3, 2022 · 2 comments
Open

有并发限制的Promise.all(ts类型) #147

Sunny-117 opened this issue Nov 3, 2022 · 2 comments

Comments

@Sunny-117
Copy link
Owner

No description provided.

@fencer-yd
Copy link

class Run {
  tasks = [];
  flag = 0;
  constructor(tasks, num) {
    this.tasks = tasks;
    this.num = num;
    this.flag = 0;
  }

  run() {
    if (!this.num) return;
    new Array(this.num).fill(0).forEach(this.runOne.bind(this));
  }

  async runOne() {
    if (!this.tasks.length) return;
    if (this.flag >= this.num) return;
    const task = this.tasks.shift();
    this.flag++;
    await task();
    if (this.flag > 0) this.flag--;
    await this.runOne();
  }
}

const sleep = (duration) =>
  new Promise((resolve) =>
    setTimeout(() => {
      console.log("task " + duration + " done");
      resolve();
    }, duration)
  );

const tasks = new Array(10).fill(() => sleep(1000 + Math.random() * 3000));

const run = new Run(tasks, 2);

run.run();

@wangjs-jacky
Copy link

wangjs-jacky commented Oct 18, 2023

测试代码:

function sleep(text, delay = 1000) {
  return () => new Promise(resolve => {
    setTimeout(() => {
      console.log(text)
      resolve();
    }, delay);
  });
}

const tasks = [1, 2, 3, 4, 5].map((i) => {
  return sleep(i)
})

测试代码:asyncPool(tasks, 2)

预期结果:

  • 第一次打印 12
  • 第二次打印 34
  • 第三次打印 5

方案一: 基于 async-pool 方案

维护两个数组:

  1. allTasks
  2. poolTasks

取出 tasks[i] 作为当前的任务:

allTaskstask[i]
poolTaskse = task[i].then(()=>{poolTasks.splice(poolTasks.indexOf(e),1)})

通过 Promise.race + await 控制 poolTasks 始终在并发数范围内,最后通过 Promise.all 实现所有任务的并发。

const asyncPool = async (tasks, poolLimit) => {
  /* 所有异步任务执行状态 */
  const allTasks = [];
  /* 正在执行的任务数组 */
  const poolTasks = [];

  for (let i = 0; i < tasks.length; i++) {
    const curTask = Promise.resolve(tasks[i]());
    allTasks.push(curTask);

    /* 当 poolLimit <= tasks.length 时,实现并发控制 */
    if (poolLimit <= tasks.length) {
      /* 在原有异步包裹处理操作 */
      const e = curTask.then(() => {
        /* 成功后,从正在执行的任务数组中删除 */
        poolTasks.splice(poolTasks.indexOf(e), 1)
      })
      poolTasks.push(e);

      /* poolTasks 持续增加会超出限制数量 */
      if (poolTasks.length >= poolLimit) {
        /* 始终控制 poolTasks 的数量 */
        await Promise.race(poolTasks);
      }
    }
  }

  /* 此时 allTasks 中剩余 pending < poolTasks */
  return Promise.all(allTasks);
}

asyncPool(tasks, 2)

参考实现:asyncPool 的使用

方案二: 使用 koa 内的 compose 函数思想,构建 dispatch 函数

对于同步任务的递归为:

const looptask = () => {
  looptask()
}

对于异步任务的递归:

const dispatch = () => {
  asyncFn().then(() => {
    dispatch();
  })
}

基于这个思想,结合 Promise.all 的源码实现。

/* 使用 dispatch 实现 */
const asyncPool = (tasks, poolLimit) => {
  return new Promise((resolve, reject) => {
    const result = [];
    let resolveCount = 0;
    let currentIndex = 0;

    const dispatch = () => {
      const curTask = Promise.resolve(tasks[currentIndex]());
      const index = currentIndex;
      currentIndex++;
      /* 异步任务的递归,通过 .then 实现 */
      curTask.then(res => {
        result[index] = res;
        resolveCount++;
        if (resolveCount === tasks.length) {
          resolve(res);
        }

        /* 递归的触发(currentIndex指针还未触发) */
        if (currentIndex < tasks.length) {
          dispatch();
        }
      });
    }

    for (let i = 0; i < poolLimit && i < tasks.length; i++) {
      dispatch();
    }
  })
}

asyncPool(tasks, 2)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants