## 控制进程数量的两种方式

1. 进程池
2. Semaphore，进程锁

In [None]:
import pandas as pd 
import numpy as np 
import multiprocessing as mp
import time 
from multiprocessing import Process, Pool, Queue, Semaphore
mp.set_start_method('fork')
from queue import Empty

In [None]:
def genDf(df_queue):
    df = pd.DataFrame(np.random.rand(3,3))
    df_queue.put(df)
    return 0

def write_df(df_queue: Queue, csv_name: str):
    num = 0
    while True:
        try:
            df = df_queue.get(timeout=5)
            df.to_csv(csv_name+"_"+str(num)+".csv")
            num += 1
            print("write 1 data")
        except Empty:
            break
    return 0

def main():

    csv_name = "data/temp/test"
    m = mp.Manager()
    df_queue = m.Queue()
    pool = mp.Pool(4)
    writor = pool.apply_async(write_df, (df_queue, csv_name))
    jobs = []
    for i in range(20):
        job = pool.apply_async(genDf, (df_queue,))
        jobs.append(job)
    
    for job in jobs:
        job.get()
    
    writor.get()
    pool.close()
    pool.join()
    


In [None]:
def genDf(df_queue, sema: Semaphore):
    sema.acquire() # 计数器减一
    print(f"Process {mp.current_process().name} starting")
    df = pd.DataFrame(np.random.rand(3,3))
    df_queue.put(df)
    time.sleep(5)
    sema.release() # 计数器加一
    return 0

def write_df(df_queue: Queue, csv_name: str):
    num = 0
    while True:
        try:
            df = df_queue.get(timeout=10)
            df.to_csv(csv_name+"_"+str(num)+".csv")
            num += 1
            print("write 1 data")
        except Empty:
            break
    return 0

def main2():
    sema = Semaphore(3)
    df_queue = Queue()
    writor = Process(target=write_df, args=(df_queue, "data/temp/test"))
    writor.start()
    p_ls = []
    for i in range(20):
        p = Process(target=genDf, args=(df_queue, sema), name=str(i))
        p_ls.append(p)
        p.start()

    for p in p_ls:
        p.join()
    writor.join()

In [None]:
main2()