# 多进程示例

## apply_async 实现

### 定义一个需要并行的函数

- 注意，create_engine('mysql+pymysql://root@localhost:3306/rdf?charset=utf8')不能序列化，所以不能作为参数传入，只能传入url

In [None]:
def _storedf(dm, tmppath, sql_link):  # 与MAP方式不同
    engine = create_engine(sql_link)
    t0 = time.time()
    df = pd.read_sql('SELECT * FROM aindexeodprices WHERE S_INFO_WINDCODE="%s"' % dm, con=engine)
    df.to_pickle(tmppath+dm+'.pkl')
    return '[pid=' + str(os.getpid()) + '] ' + dm +' 用时：%.2fs' % (time.time() - t0)

### 定义一个回调函数，如显示实时进度，注意这个回调函数需要即时完成，不然会形成阻塞

- 其中msg为并行函数return的部分

In [None]:
def show_process(msg):
    print(msg)  # 与MAP方式不同

### 主函数

In [None]:
def search_db(tmppath, sql_link, n_workers=4):
    engine = create_engine(sql_link)
    dms = pd.read_sql('SELECT DISTINCT S_INFO_WINDCODE FROM AIndexEODPrices LIMIT 20', con=engine)['S_INFO_WINDCODE'].tolist()
    with Pool(n_workers) as pool:
        for dm in dms:
            _xinput = (dm, tmppath, sql_link)  
            res = pool.apply_async(_storedf, args=_xinput, callback=show_process)
        pool.close()
        pool.join()


- 如果需要打包进程池的输出结果

In [None]:
def search_db(tmppath, sql_link, n_workers=4):
    engine = create_engine(sql_link)
    dms = pd.read_sql('SELECT DISTINCT S_INFO_WINDCODE FROM AIndexEODPrices LIMIT 20', con=engine)['S_INFO_WINDCODE'].tolist()
    res_l=[]
    with Pool(n_workers) as pool:
        for dm in dms:
            res = pool.apply_async(_storedf, 
                                   args=(dm, tmppath, sql_link), 
                                   callback=show_process)
            res_l.append(res)
        pool.close()
        pool.join()
        for res in res_l:
            print(res.get())

## map_async 实现

In [None]:
def _storedf(xinput):  # 与APPLY方式不同，只能输入一个参数，所以多参数用元组或者列或者字典形式传入
    dm, tmppath, sql_link = xinput
    engine = create_engine(sql_link)
    t0 = time.time()
    df = pd.read_sql('SELECT * FROM aindexeodprices WHERE S_INFO_WINDCODE="%s"' % dm, con=engine)
    df.to_pickle(tmppath+dm+'.pkl')
    return '[pid=' + str(os.getpid()) + '] ' + dm +' 用时：%.2fs' % (time.time() - t0)

**同时，回调函数要做调整**
- 并行进程用列形式返回值，所以需要对回调函数进行修改，改成msg[0]

In [None]:
def show_process(msg):
    print(msg[0])  # 与apply的主要差别

In [None]:
def search_db(tmppath, sql_link, n_workers=4):
    engine = create_engine(sql_link)
    dms = pd.read_sql('SELECT DISTINCT S_INFO_WINDCODE FROM AIndexEODPrices LIMIT 20', con=engine)['S_INFO_WINDCODE'].tolist()
    with Pool(n_workers) as pool:
        for dm in dms:
            _xinput = (dm, tmppath, sql_link)  
            res = pool.map_async(_storedf, 
                                 args=(_xinput,),  # 主要差别：打包传入
                                 callback=show_process)
        pool.close()
        pool.join()

## 运行

In [None]:
if __name__ == '__main__':
    sql_link = 'mysql+pymysql://root@localhost:3306/rdf?charset=utf8'
    tmppath = 'E:\\TEMP\\csvtmp2\\'
    search_db(tmppath, sql_link)