# Python数据读取和处理大量数据
目的：为了进行并行处理，我们将任务划分为子单元。它增加了程序处理的作业数量，减少了整体处理时间。

In [1]:
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
import string
import pandas as pd

In [2]:
# 获取可用进程数目
n_workers = 2*mp.cpu_count()

In [3]:
%%time
file_name = "/home/gavin/Machine/data/data/data_format2/train_format2.csv"
df = pd.read_csv(file_name)
df

CPU times: user 9.74 s, sys: 984 ms, total: 10.7 s
Wall time: 14.2 s


Unnamed: 0,user_id,age_range,gender,merchant_id,label,activity_log
0,34176,6.0,0.0,944,-1,408895:1505:7370:1107:0
1,34176,6.0,0.0,412,-1,17235:1604:4396:0818:0#954723:1604:4396:0818:0#275437:1604:4396:0818:0#548906:1577:4396:1031:0#368206:662:4396:0818:0#480007:1604:4396:0818:0#954723:1604:4396:0818:0#236488:1505:4396:1024:0
2,34176,6.0,0.0,1945,-1,231901:662:2758:0818:0#231901:662:2758:0818:0#108465:662:2758:0820:0#231901:662:2758:0820:0#231901:662:2758:0820:0#840446:1142:2758:0820:0#231901:662:2758:0819:0
3,34176,6.0,0.0,4752,-1,174142:821:6938:1027:0
4,34176,6.0,0.0,643,-1,716371:1505:968:1024:3
...,...,...,...,...,...,...
7030718,229247,4.0,2.0,2000,-1,610483:737:3894:0516:2#610483:737:3894:0524:0
7030719,229247,4.0,2.0,579,-1,179514:420:3610:1014:0#1007071:420:3610:1014:0#1105120:420:3610:1014:0
7030720,229247,4.0,2.0,1860,-1,980351:1130:918:1016:3#153613:1130:918:1016:0#980351:1130:918:1016:0
7030721,229247,4.0,2.0,4140,0,308321:737:5697:1111:0#308321:737:5697:1111:0#308321:737:5697:1111:2#761143:737:5697:1110:0#434017:737:5697:1110:0#442402:737:5697:1110:0#308321:737:5697:1110:0#308321:737:5697:1110:0#1062252:737:5697:1110:0#477933:737:5697:1110:0#162415:737:5697:1110:0#194824:737:5697:1110:0#203563:737:5697:1110:0#308321:737:5697:1110:0#495083:737:5697:1110:0#357685:737:5697:1110:0#308321:737:5697:1110:0#162415:737:5697:1110:0#507163:737:5697:1110:0#194824:737:5697:1110:0#665864:737:5697:1110:0#417458:737:5697:1110:0#308321:737:5697:1110:0#538779:737:5697:1110:0


In [10]:
def clean_text(x):
    return x + 1

In [62]:
df = pd.concat([df, df], axis=0)

In [63]:
%%time
tqdm.pandas()
df["age_range"] = df["age_range"].apply(clean_text)

CPU times: user 3.79 s, sys: 1.75 s, total: 5.54 s
Wall time: 15.1 s


In [64]:
%%time
p = mp.Pool(n_workers)
df["age_range"] = p.map(clean_text, tqdm(df["age_range"]))

  0%|          | 0/14061446 [01:57<?, ?it/s]

CPU times: user 9.42 s, sys: 3.46 s, total: 12.9 s
Wall time: 2min 27s


In [65]:
def text_parallel_clean_text(array):
    result = Parallel(n_jobs=n_workers
             , backend="multiprocessing")(delayed(clean_text)(text) for text in tqdm(array))
    
    return result

In [None]:
%%time
df["age_range"] = text_parallel_clean_text(df["age_range"])

  0%|          | 0/14061446 [00:00<?, ?it/s]

In [None]:
from typing import List

## 拆分数据

In [None]:
def proc_batch(batch):
    
    return [clean_text_text(text) for text in batch]

In [None]:
def batch_file(array:List[List], n_workers:int) -> List[List]:
    """海量数据批量化.
    
    Args:
        array: 原始数据.
        n_workers: 进程数目.
    
    
    Return:
        bathces: 划分后的批量数据,以列表的形式存.
    """
    
    file_len = len(array)
    batch_size = round(file_len / n_workers)
    batches = [array[ix: ix + batch_size] for ix in tqdm(range(0, file_len, batch_size))]
    

    return batches

In [None]:
batches = batch_file(df["age_range"], n_workers)

In [None]:
# 
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
 delayed(proc_batch)
  (batch)  
 for batch in tqdm(batches)
 )

In [11]:
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)
df['age_range'] = process_map(clean_text,df['age_range'], max_workers=n_workers, chunksize=batch)

  0%|          | 0/7030723 [00:00<?, ?it/s]

CPU times: user 7.32 s, sys: 789 ms, total: 8.11 s
Wall time: 8.3 s


## 多进程与多线程编程

In [3]:
import time
import os

In [4]:
def long_time_task():
    print('当前进程: {}'.format(os.getpid()))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))

if __name__ == "__main__":
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    for i in range(2):
        long_time_task()

    end = time.time()
    print("用时{}秒".format((end-start)))

当前母进程: 355497
当前进程: 355497
结果: 1152921504606846976
当前进程: 355497
结果: 1152921504606846976
用时4.004072189331055秒


In [5]:
from multiprocessing import Process

In [6]:
def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))


if __name__=='__main__':
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子进程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

当前母进程: 355497
等待所有子进程完成。
子进程: 355653 - 任务1
子进程: 355656 - 任务2
结果: 1152921504606846976
结果: 1152921504606846976
总共用时2.117201805114746秒


In [7]:
from multiprocessing import Pool, cpu_count

In [11]:
cpu_count()

4

In [8]:
def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))


if __name__=='__main__':
    print("CPU内核数:{}".format(cpu_count()))
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子进程完成。')
    p.close()
    p.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

CPU内核数:4
当前母进程: 355497
子进程: 355814 - 任务2子进程: 355813 - 任务1子进程: 355815 - 任务3


子进程: 355812 - 任务0
等待所有子进程完成。
结果: 1152921504606846976结果: 1152921504606846976
结果: 1152921504606846976
子进程: 355815 - 任务4

结果: 1152921504606846976
结果: 1152921504606846976
总共用时4.309876918792725秒


In [9]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

Process to write: 356056Process to read:356057

Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
