Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

实现有并行限制的 Promise 调度器 #149

Open
Sunny-117 opened this issue Nov 3, 2022 · 10 comments
Open

实现有并行限制的 Promise 调度器 #149

Sunny-117 opened this issue Nov 3, 2022 · 10 comments

Comments

@Sunny-117
Copy link
Owner

No description provided.

@lzxjack
Copy link
Contributor

lzxjack commented Jan 11, 2023

Scheduler调度器:

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 阻塞的任务队列
    this.queue = [];
  }

  async add(fn) {
    if (this.count >= this.max) {
      // 若当前正在执行的任务,达到最大容量max
      // 阻塞在此处,等待前面的任务执行完毕后将resolve弹出并执行
      await new Promise(resolve => this.queue.push(resolve));
    }
    // 当前并发任务数++
    this.count++;
    // 使用await执行此函数
    const res = await fn();
    // 执行完毕,当前并发任务数--
    this.count--;
    // 若队列中有值,将其resolve弹出,并执行
    // 以便阻塞的任务,可以正常执行
    this.queue.length && this.queue.shift()();
    // 返回函数执行的结果
    return res;
  }
}

使用:

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

@bearki99
Copy link

class Schedular {
    constructor(limit) {
        this.limit = limit;
        this.queue = [];
        this.runCounts = 0;
    }
    add(time, order) {
        const mypromise = () => {
            return new Promise((resolve, reject)=>{
                setTimeout(()=>{
                    console.log(order); //执行order
                    resolve();
                }, time);
            })
        }
        this.queue.push(mypromise);
    }
    taskStart() {
        for(let i = 0; i < this.limit; i++){
            this.request();
        }
    }
    request() {
        if(!this.queue || !this.queue.length || this.runCounts >= this.limit) return;
        this.runCounts++;
        this.queue.shift()().then((res)=>{
            this.runCounts--;
            this.request();
        })
    }
}
const scheduler = new Schedular(2)
const addTask = (time, order) => {
  scheduler.add(time, order)
}
addTask(1000, '1')
addTask(500, '2')
addTask(300, '3')
addTask(400, '4')
scheduler.taskStart()

@2239351258
Copy link

题一

题意

并发控制Promise,要求:实现Scheduler

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

实现

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 任务队列
    this.queue = [];
  }

  add(fn) {
    this.queue.push(fn)
    this.run()
  }
  run() {
    if (this.count >= this.max || this.queue.length === 0) return
    this.count++
    Promise.resolve(this.queue.shift()()).finally(() => {
      this.count--
      this.run()
    })
  }
}
// ------------test-------------------
// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

题二

题意

并发控制Promise,要求:实现Scheduler

注意 Scheduler.add()返回一个Promise(与题一的区别)

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time)
  }).then(() => console.log(val));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

题解

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 任务队列
    this.queue = [];
  }

  add(fn) {
    return new Promise((resolve, reject) => {
      this.queue.push({ fn, resolve, reject })
      this.run()
    })
  }
  run() {
    if (this.count >= this.max || this.queue.length === 0) return
    this.count++
    const { fn, resolve, reject } = this.queue.shift()
    return fn().then(() => {
      resolve()
      this.count--
      this.run()
    })
  }
}
// ---------------------test-------------------
// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time)
  }).then(() => console.log(val));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

题三

题意

并发控制Promise,要求:实现Scheduler

与题二题一的区别:scheduler.add()传入的不是一个返回Promise的函数

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {sleep(time)}).then(() => console.log(val));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 500ms 打印2
// 800ms 打印3
// 1000ms 打印1
// 1200ms 打印4

题解

题没写出来,不知道是不是因为题有问题

@cscty
Copy link

cscty commented Jun 19, 2023

class scheduler {
constructor(max) {
this.max = max;
// 当前并发数
this.count = 0;
// 阻塞队列
this.queue = [];
}
async add(fn) {
// 判断并发数是否大于最大数
if (this.count >= this.max) {
// 是就暂停,通过resolve开启下次调用
await new Promise((resolve) => this.queue.push(resolve));
// console.log(this.queue.length);
}
// 并发数+1
this.count++;
fn().then(() => {
//执行完毕
// 并发数-1
this.count--;
// 看看是否有阻塞队列是否存在,是就停止阻塞
if (this.queue.length) this.queue.shift()();
});
}
}

@cscty
Copy link

cscty commented Jul 3, 2023

let tasks = [];
for (let i = 0; i < 8; i++) {
tasks.push(() => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(i);
console.log(i);
}, 1000);
});
});
}
function tdq(tasks, max) {
// 当前并发数
let count = 0;
// 阻塞队列
let queue = [];
async function add(task) {
if (count >= max) {
// 阻塞
await new Promise((resolve) => {
queue.push(resolve);
});
}
count++;
task().then((data) => {
count--;
if (queue.length) {
queue.shift()();
}
});
}
for (let i = 0; i < tasks.length; i++) {
add(tasks[i]);
}
}
tdq(tasks, 3);

1 similar comment
@cscty
Copy link

cscty commented Jul 3, 2023

let tasks = [];
for (let i = 0; i < 8; i++) {
tasks.push(() => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(i);
console.log(i);
}, 1000);
});
});
}
function tdq(tasks, max) {
// 当前并发数
let count = 0;
// 阻塞队列
let queue = [];
async function add(task) {
if (count >= max) {
// 阻塞
await new Promise((resolve) => {
queue.push(resolve);
});
}
count++;
task().then((data) => {
count--;
if (queue.length) {
queue.shift()();
}
});
}
for (let i = 0; i < tasks.length; i++) {
add(tasks[i]);
}
}
tdq(tasks, 3);

@ZhangYedi-cmd
Copy link

function AsyncPool(limit) {
  this.limit = limit
  this.executingQueue = []
  this.queue = []
  this.finished = []
}

AsyncPool.prototype.add = function (fn) {
  this.queue.push(fn)
}

AsyncPool.prototype.run = async function () {
  let {queue, executingQueue, limit, finished} = this
  for (let i = 0; i < queue.length; i++) {
    let task = queue[i]
    let p = task()
    finished.push(p)
    if (queue.length >= limit) {
      p = p.then(() => {
        executingQueue.splice(executingQueue.indexOf(p), 1)
      })
      executingQueue.push(p)
      if (executingQueue.length >= limit) {
        await Promise.race(executingQueue)
      }
    }
  }
  return Promise.all(finished)
}

let pool = new AsyncPool(2)
for (let i = 1; i <= 5; i++) {
  pool.add(
    () => new Promise(resolve => setTimeout(() => {
      console.log(i * 1000)
      resolve(i * 1000)
    },i * 1000))
  )
}

pool.run().then(res => {
  console.log(res)
})

@bobocomeon
Copy link

/**
 * @desc 实现有并行限制的 Promise 调度
 * JS 实现一个带并发限制的异步调度器 Scheduler,保证同时运行的任务最多有两个。
例如目前有 4 个任务,完成时间分别为,1000ms、500ms、300ms、400ms
那么在该调度器中的执行完成顺序应该为 2、3、1、4
分析:因为1、2先进入队列中,2完成则输出2,3进入,3完成输出3,此时为800ms,4进入后的200ms,1完成输出1,而后4完成输出 4
 */
class Scheduler {
  constructor(limit) {
    this.limit = limit;
    this.queue = [];
    this.running = 0;
  }
  createTask(duration, fn) {
    return () => new Promise((resolve) => {
      setTimeout(() => {
        resolve(fn());
      }, duration);
    });
  }
  addTask(callback, duration) {
    const task = this.createTask(duration, callback);
    this.queue.push(task);
  }
  start() {
    for (let i = 0; i < this.limit; i++) {
      this.scheduler();
    }
  }
  scheduler() {
    if (this.queue.length === 0 || this.running > this.limit) {
      return;
    }
    this.running++;
    const task = this.queue.shift();
    task().then((() => {
      this.running--;
      this.scheduler();
    }));
  }
}

// 实例化一个调度器
const scheduler = new Scheduler(2);

// 添加任务
scheduler.addTask(() => {
  console.log("任务1");
}, 1000);
scheduler.addTask(() => {
  console.log("任务2");
}, 500);
scheduler.addTask(() => {
  console.log("任务3");
}, 300);
scheduler.addTask(() => {
  console.log("任务4");
}, 400);
// 任务执行
scheduler.start();

@topulikeweb
Copy link

topulikeweb commented Dec 20, 2023

class Scheduler {
  constructor (max) {
    // 最大并发量
    this.max = max
    // 当前并发数
    this.count = 0
    // 执行队列
    this.queue = []
    // 是否可以运行
    this.isRunning = true
  }
  
  // 添加任务
  addTask (time, callback) {
    this.queue.push({ time, callback })
    if (this.isRunning) {
      this.runTask()
    }
  }
  
  // 执行任务
  runTask () {
    if (this.queue.length === 0) {
      this.isRunning = false;
      return;
    }
    // 达到最大并发
    // this.queue.length === this.max(因为queue中的任务会出去执行,因此判断会有问题)
    if (this.count >= this.max) {
      this.isRunning = false
      return
    }
    // 未到达最大并发
    const { time, callback } = this.queue.shift()
    // 释放出空间,
    this.count++
    setTimeout(() => {
      callback()
      this.count--
      // 继续执行下一个任务
      this.runTask()
    }, time)
  }
}



// 示例用法
const scheduler = new Scheduler(2);

scheduler.addTask(4000, () => {
  console.log('Task 1 executed after 1000ms');
});

scheduler.addTask(4000, () => {
  console.log('Task 2 executed after 2000ms');
});

scheduler.addTask(4000, () => {
  console.log('Task 3 executed after 1000ms');
});

@wangyang-o
Copy link

Scheduler调度器:

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 阻塞的任务队列
    this.queue = [];
  }

  async add(fn) {
    if (this.count >= this.max) {
      // 若当前正在执行的任务,达到最大容量max
      // 阻塞在此处,等待前面的任务执行完毕后将resolve弹出并执行
      await new Promise(resolve => this.queue.push(resolve));
    }
    // 当前并发任务数++
    this.count++;
    // 使用await执行此函数
    const res = await fn();
    // 执行完毕,当前并发任务数--
    this.count--;
    // 若队列中有值,将其resolve弹出,并执行
    // 以便阻塞的任务,可以正常执行
    this.queue.length && this.queue.shift()();
    // 返回函数执行的结果
    return res;
  }
}

使用:

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {>     return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

兄弟,你这代码gpt写的吗,还是哪个仓库看见的,这种写法应该是我第一次写。注释都是我的注释🙉。我只给字节面试官写过一次。链接

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants