In [3]:
import os
# set working directory to root 
os.chdir('/home/qiangzibro/2021-math-model-code')
os.getcwd()

'/home/qiangzibro/2021-math-model-code'

## Python中的并发编程鉴赏
数学建模期间频繁用Pandas加载多个表格，直到最后一天才发现原本写的并发编程
速度并不“并发”，借着这个机会来学习一下多线程、多进程的编程。

首先，我们写一个计时器的装饰器

In [9]:
def cal_time(func):
    def inner(*args, **kargs):
        start = time.time()
        result = func(*args, **kargs)
        end = time.time()
        print(f"{round(end-start, 2)}s passed")
        return result
    return inner

table_files = """data/附件1 监测点A空气质量预报基础数据.xlsx
data/附件2 监测点B、C空气质量预报基础数据.xlsx
data/附件3 监测点A1、A2、A3空气质量预报基础数据.xlsx""".split("\n")

In [10]:
import time
import pandas as pd
@cal_time
def load_using_single_thread():
    return  {
        f"附件{i}":pd.read_excel(table_files[i-1],
                               engine='openpyxl',
                               sheet_name=None) 
            for i in range(1,4)
    }

df = load_using_single_thread()

60.64s passed


加载需要60秒左右，理论上这是最长时间。下面来进行加速。
首先想到了Python中的Thread

In [23]:
from threading import Thread

@cal_time
def load_using_threading():
    results = {f"附件{i}":None for i in range(1,4)}
    
    def _load(file, i):
        results[f"附件{i}"] = pd.read_excel(table_files[i-1],
                           engine='openpyxl',
                           sheet_name=None) 
    
    threads = []
    for i, file in enumerate(table_files):
        threads.append(Thread(target=pd.read_excel, args=(file, i)))
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return df

data_df = load_using_threading()

47.24s passed


我们还可以使用Python的线程池来实现

In [30]:
import concurrent

@cal_time
def load_df_threading():
    results = {f"附件{i}":None for i in range(1,4)}
    
    def _load(i):
        return pd.read_excel(table_files[i-1],
                           engine='openpyxl',
                           sheet_name=None) 
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = {executor.submit(_load, i): f"附件{i}" for i in range(1,4)}
        for future in concurrent.futures.as_completed(futures):
            p = futures[future]
            try:
                results[p] = future.result()
            except Exception as exc:
                print(exc)
    return results

data_df = load_df_threading()

154.58s passed


惊人的154秒，比单线程方法还要长了三倍！真是低并发编程！但是，这是为什么呢？

In [36]:
import concurrent
def _load(i):
    return pd.read_excel(table_files[i-1],
                       engine='openpyxl',
                       sheet_name=None) 
@cal_time
def load_using_process():
    df = {f"附件{i}":None for i in range(1,4)}

    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        for i, result in zip(range(1,4), executor.map(_load, range(1,4))):
            p = f"附件{i}"
            df[p] = result
    return df
                             
data_df = load_using_process()

31.84s passed
