# 12 并行化执行

**如何同时做许多件事-->如何更快地做一件事**

从计算机或操作系统的角度来看，并行是同时执行多个任务。
从用户的角度来看，并行是为了确定不同代码段和数据之间的依赖关系，使得代码可以更快地执行。

并行性通常能够让计算机更快地处理问题。
在面对多个处理器时，编写、调试、打开文件，甚至打印到屏幕上会变得更加困难。

**使用并行编程的典型原因：**
1.使用正常计算机会创建或需要大量数据。
2.计算耗时太长。
3.使用的算法易于并行化。
4.遇到无法用少量资源模拟的物理行为。

## 12.1 规模和扩展能力

### 规模

**规模：并行计算处理问题的大小。**

衡量方式：
1.进程数量（进程数量P的某个比例）
2.计算速率（每秒浮点运算量）
3.数据大小（查看计算结果生成了多少数据）

**衡量方式角度的要点：**
1．衡量值通常互相之间成一定比例，比如机器的其中一项属性比较大，那么其他属性值也很大。
2．计算规模逐步增加，不要一步到位。

代码应有条不紊地扩展。先在简单的情况下尝试在一个进程上运行代码，接着再尝试10个进程，之后100个以及更多的进程。
不要直接在一百万核心上运行代码。这样肯定会有问题，而且很难跟踪和解决。
慢慢扩大规模可以逐个解决遇到的问题。

### 可扩展性

**可扩展性：扩大规模的难易程度。**
测量可扩展性：在计算方面，衡量运行时性能；指定的一台机器，有强弱两种形式来衡量可扩展性。

**强可扩展：对于固定大小的问题，运行时的效率是随着处理器数量变化的函数。**
通常由加速比值$s$衡量。这是在单个处理器上执行所花费的时间$t_1$与在$P$个处理器上执行所需的时间$t_P$的比值：
$$s(P)=\frac{t_1}{t_P}$$
理想的高效系统中，强可扩展的效率提升是线性的。处理器数量增加一倍能将运行时间减少一半。

**弱可扩展：每个处理器能够解决的问题大小固定，因此运行时的能力由处理器的数量决定。**
通常用所谓的扩大值（sizeup）z 来衡量。
对于大小为N的问题，扩大值定义为：
在理想弱扩展系统中，规模是线性的。处理器数量增加一倍，可以解决的问题大小也将增加一倍。

**阿姆达尔定律：算法的某些部分（称为$\alpha$）无法并行化。因此，P个处理器可能的最大加速或最大扩容值为：
$$\max(s(P))=\frac{1}{\alpha-\frac{1-\alpha}{P}}$$
将此限制扩展到无限多个处理器时，最大可能加速为：
$$\max(s)=\lim\limits_{P\rightarrow\infty}\frac{1}{\alpha-\frac{1-\alpha}{P}}=\frac{1}{\alpha}$$
根据阿姆达尔定律，如果程序的10％是不可并行的（α= 0.1），那么可能实现的最佳加速是10的因子，但实践中很难确定复杂算法中的α值。
α通常远小于0.1，因此可实现的加速度非常大。
阿姆达尔定律指明了并行上限。

## 12.2 问题分类

### 并行性分类

有些算法天然适用于并行性，而有些不行。

**完美并行问题**：问题的任何部分可以独立于任何其他部分求和，然后各部分相加，获得的结果与数组中元素依次相加的结果相同。计算过程是否在同一个处理器上或同一时间计算无关紧要。
**非完美并行问题**：在算法中存在不可避免的瓶颈，靠后的元素取决于靠前的元素。。因此很难编写高效的并行代码，因为每个进程都必须知道前一个进程的结果。

**解决非完美并行问题**
1.大多数情况下可以进行数学变换，暂时降低部分问题的依赖程度。
这些转换通常会在其他方面带来一些计算开销。
由于增加了并行性，解决问题的运行时间变短。
2.在其他情况下，对数据了解越多，并行的可能就越大。
3.可以试图使用可并行化的算法，而不必寻求一个完全通用的算法。

### HPC高性能计算和HTC高吞吐量计算

在大规模并行中，这些分类的名称来自于所运行的机器架构，而不是算法属性。

**高性能计算（HPC）：用于运行非完美并行问题。**
通常HPC系统上的所有节点都是相同的。每个节点具有相同数量的CPU和GPU，以及相同的内存；每个处理器运行相同的操作系统。
节点在预定拓扑（环、tauruses等）中连接在一起，但是节点均匀化产生了一种节点的任何子集与相同大小的节点的其他子集行为相同的错觉。

问题规模很大且不是完美并行的情况下，或者问题很大且需要节点同质化时，应该考虑使用HPC系统。
每个节点的内存在HPC中一直呈下降趋势，因此如果有一个应用程序会占用许多内存时，除了计算并行以外，还需要做额外的工作来使算法数据并行。

**高吞吐量计算（HTC）**：设计用于在很少或没有通信的情况下，尽可能多地执行操作。
HTC非常适合完美并行问题。
HTC系统中的节点不需要相同，在某些情况下甚至不必共享相同的操作系统。
HTC系统不能视为单个机器，而是一个协调的机器网络。当这些机器分散在各地时，就称为**分布式计算**。

## 12.4 非并行

在实现并行算法时，一般先实现和分析简单的传统串行版本。
通常可靠的做法是先写串行版本，然后再用并行重写。
串行算法不等于P = 1的并行算法。虽然两者都在一个处理器上运行，但是性能特性可能差别非常大。

代码示例见示例笔记本。

### 二体问题的推广

在大多数初始情况中，各个物体彼此之间相向加速运动，从而获得速度。
之后，由于动量通常不会正确地对齐，所以物体会彼此环绕，或者以随机方向飞离当前测试空间。
设置重力常数G = 1。在这种情况下合理的时间间隔是dt = 0.001。

直觉：N体问题的复杂度为N^2^。
添加一个新物体，外循环（timestep()函数）和内循环（a()函数）中各添加一个迭代。



## 12.5 线程

**处理一个并行算法：**

在Python中，大多数科学任务都不应该使用线程。

线程是互相不会阻碍、独立工作的对象。
在一个线程中执行代码不会阻止另一个线程执行自己的代码。
线程可以通过各自的状态彼此通信。
线程可以生成子线程，程序可以运行任意个数的线程。
Python程序中至少有一个线程，即主线程。

线程是一次性的。
run()方法运行完线程后，线程就结束了。
程序中并不显式调用run()方法，而是在调用start()方法时隐式调用。
线程不能在外部杀死，而必须让run()方法来完成。例外：守护进程，Python程序本身退出时也会杀死该线程。

通过标准库中的threading模块使用线程。

**Python中所有线程在同一个进程中执行。**
原因：Python是一种解释型语言，Python解释器本身只存在于一个进程中。因此即使线程互相不阻塞，但综合运行速度也不会超过机器上单个处理器的运行速度。数字处理大部分是CPU密集型的操作，即受处理器速度的限制。如果计算机已经以最快速度运行，再添加更多的线程也不会提升速度。事实上，添加更多任务通常会减慢执行时间，此时处理器变得非常拥挤。

全局解释器锁（global interpreter lock，GIL）：
标准Python解释器（CPython）实现限制了Python只能使用单个处理器执行线程的细节。
GIL管理当前正在运行的线程，以及何时从执行一个线程切换到执行另一个线程。

`**multiprocessing可以通过子进程绕过GIL！！！**`

对于IO密集型的高延迟任务，例如从磁盘读取文件或通过网络下载信息，大多数时间程序都在等待返回下一个数据块，然后才能继续。此时可以线程，因为在等待的时间可以执行其他代码。

此处跳过多线程解决方案。

## 12.6 多处理

### 多处理、多任务

多处理（multiprocessing）：以Python的方式代替操作系统完成并行任务调度的任务。
多任务允许所有进程的总和超过计算机的资源限制，因为多任务不一定允许所有进程同时活动。

多任务能力是计算机执行常见任务的核心。
单CPU系统：一次只能在物理层面执行一个进程。
多任务操作系统：可以暂停该进程并将上下文切换到另一进程。

创建或生成新的操作系统级进程由Python处理。
在POSIX（类UNIX系统，如Linux和Mac OS X）机器上，所有进程都是父进程的fork。fork会继承它的父环境，但对fork环境所做的任何修改都不会影响父进程。
fork可以生成自己的fork，后者会获得修改后的环境，而杀死父进程将杀死所有fork。当Python产生更多的进程时，这些进程都能使用主Python进程所具有的环境。

Python中的多处理通过标准库的multiprocessing模块实现。
**该模块为用类似线程的接口来处理进程，但有两个主要区别：**
1．不能直接在交互式解释器中使用multiprocessing，因为主模块（__main__）必须确保其fork能够导入主模块。
2．multiprocessing模块提供了一个Pool类，因此无需自行实现。

### Pool类

Pool.map()为通用处理方法。
Pool.map()拥有与内置map()函数类似的接口。
函数需要两个参数：一个函数和一个传递给该函数的可迭代的参数。
map()返回一个值列表，其顺序与原始迭代器中给出的顺序相同。
完成计算之前，Pool.map()会阻止其他Python代码执行。

Pool.map()的主要限制是作为参数的函数必须只有一个参数。
解决：在调用之前将需要的参数存储在元组或字典中。

### 多处理的扩展性

双核计算机上:期望具有两个进程的池的运行速度比具有一个进程的池快两倍。对于两个以上的进程，预期性能会有一些下降，因为操作系统花费更多的时间来进行上下文切换。

实际并不是正好2倍，而是在1.8倍附近。额外的0.2倍花费在Python本身、处理子线程和N体算法的开销上。
随着核数增加，加速比减小，到1000个处理器后逆转。



In [None]:
# multiprocessing的使用方法
import multiprocessing


# 待执行函数
def my_function(data):
    result = data
    return result


num_processes = 5  # multiprocessing.cpu_count()  # 获取CPU核心数
pool = multiprocessing.Pool(processes=num_processes)  # 创建Pool对象

data = [1, 2, 3, 4, 5]
# 调用Pool对象的map方法，传递任务函数和数据。map方法将自动将数据分配给进程池中的进程进行处理，并返回结果列表。
results = pool.map(my_function, data)
# 关闭进程池
pool.close()
pool.join()

print(results)


## 12.7　MPI

高性能并行的黄金标准。

消息传递接口（Message-Passing Interface，MPI）：相当于一种规范，定义如何在各个进程之间传递信息。
目前有两个实现MPI的主要开源项目：MPICH和Open MPI。由于这两个项目实现了相同的标准，因此在很大程度上是可互换的。这两个项目都非常小心地提供了完整且正确的MPI接口。

MPI是一种并行性的抽象，与具体的机器无关。这允许物理学家（和其他领域专家）学习和编写MPI代码，并使其在任何计算机上工作。
MPI规模能达到数十万到数百万个处理器的水平，MPI也适用于少量的处理器。
超过1000个处理器的更优解。

MPI的基本元素都涉及如何在进程之间进行通信。
MPI有很多种Python接口。最常用的是mpi4py。
MPI术语中，进程称为rank，具有从零起算的整数标识符。
rank可能比实际的物理处理器个数多。
MPI将尽力在可用资源上平均分配rank。
rank 0 是一个特殊的“主”进程，用来控制其他rank。

MPI的核心是各种通信器communicator对象。
从这些对象中可以获得与所处理进程相关的元数据。
通信器还提供一个工具，用于从一个处理器发送消息并通过send()和recv()方法在其他进程上接收消息。

mpi4py包有两种主要的数据通信方式。
发送任意Python对象：较慢但更一般，但需要对象是完全picklable的。
NumPy数组通信：数据已经使用NumPy数组存储的情况下，最合适的做法是让mpi4py使用这些数组。

mpi4py包附带的常用且已经实例化的通信器：COMM_WORLD。
表示MPI以基本的点对点通信方式启动所有进程，允许任何两个进程直接通信。

启动MPI：
<code>$ mpiexec -n 4 python n-body-mpi.py</code>
表示启动4个进程。

在此处实现一个MPI的进程池。

In [1]:
# 导入MPI和辅助函数
from mpi4py import MPI  # MPI主模块
from mpi4py.MPI import COMM_WORLD  # COMM_WORLD通信器
from types import FunctionType  # 用于实现MPI进程池


class Pool(object):  # MPI进程池
    def __init__(self):
        self.f = None  # 对执行函数的引用，进程池开始时没有函数
        self.P = COMM_WORLD.Get_size()  # 处理器的总数
        self.rank = COMM_WORLD.Get_rank()  # 所位于的处理器


    def wait(self):  # 当进程池没有任务时接收数据的方法
        if self.rank == 0:  # 主进程不能等待
            raise RuntimeError("Proc 0 cannot wait!")
        status = MPI.Status()
        while True:
            task = COMM_WORLD.recv(source=0, tag=MPI.ANY_TAG, status=status)  # 从主进程接收新任务
            if not task:
                break  # 无新任务，不再接收
            if isinstance(task, FunctionType):  # 如果任务是一个函数，将其放在对象上，然后继续等待
                self.f = task
                continue
            result = self.f(task)  # 调用此任务上的函数并发送回结果
            COMM_WORLD.isend(result, dest=0, tag=status.tag)


    def map(self, f, tasks):  # map()方法
        N = len(tasks)  # 任务数
        P = self.P  # 处理器数
        Pless1 = P - 1  # 留一个处理器用于调度
        if self.rank != 0:  # 让worker等待主进程发出任务
            self.wait()
            return

        if f is not self.f:
            self.f = f
            requests = []
            for p in range(1, self.P):  # 发送所有worker的函数
                r = COMM_WORLD.isend(f, dest=p)
                requests.append(r)
            MPI.Request.waitall(requests)

        requests = []
        for i, task in enumerate(tasks):  # 将任务平均分配给所有worker
            r = COMM_WORLD.isend(task, dest=(i%Pless1)+1, tag=i)
            requests.append(r)
        MPI.Request.waitall(requests)

        results = []
        for i in range(N):  # 等待worker返回结果
            result = COMM_WORLD.recv(source=(i%Pless1)+1, tag=i)
            results.append(result)
        return results


    def __del__(self):
        if self.rank == 0:
            for p in range(1, self.P):  # 当进程池关闭时，关闭所有worker
                COMM_WORLD.isend(False, dest=p)

以上代码实现了一个用于实现MPI的Pool类。

Pool类的目的是提供一个与多处理线程池上的map()相同的map()方法。
这个类实现了以rank0作为主进程的策略。
主进程为master，其余为worker。
**相同：**
map()方法的使用方式与之前介绍的进程池中的map()相同。
**不同：**
无需告知进程的大小。
每个处理器上都有一个Pool实例，因为MPI在所有地方都运行相同的可执行文件（python）和脚本（n-body-mpi.py）。个进程池应该知道自己的rank，以便它可以确定自己是主进程还是只是一个worker。

wait()方法与12.5节中介绍的Thread.run()相同。
**wait()的三种处理方式：**
1．如果接收到函数，则将该函数分配给属性f以供稍后使用。
2．如果接收到了实际任务，则以该任务作为参数调用f属性。
3．如果任务为False，则停止等待。

主进程不允许等待，因此不能做真正的工作，因此让MPI使用P+1个节点。
map()的参数是一个函数和一个任务列表。将任务平均分配给各worker。
map()方法只能在master上运行，而worker需要等待。

如果传入的函数不等于当前的f属性，则将该函数发送到所有worker。这一步通过“初始发送（initiate send）”（COMM_WORLD.isend（））完成。之后调用MPI.Request.waitall()来确保该函数应用于所有worker。这相当于在发送者和所有接收者之间进行确认。接下来，将任务分发到对应的rank上。最后从worker那里接收结果。

删除主进程池实例时，进程池会自动让worker停止等待，因此能正确清理各worker。

在N体问题中，速度提升十分明显。



## 12.8 小结

列举一些并行系统：

OpenMP：基于预处理器，易于使用，为C、C++、Fortran提供底层并行。
GNU Portable Threads：针对C、C++、Fortan的跨平台线程系统。
IPython Parallel：IPython使用的并行架构，基于ZeroMQ。
Twisted：事件驱动的并行，针对Python Web应用。

**知识点：**
衡量规模的方式有很多种。
有些问题能完美并行，而有些则难以并行。
构建高性能计算系统以处理非完美并行问题。
高吞吐量计算系统最适合用于完美并行或异构问题。
非并行算法比单进程的并行代码更快。
远离Python线程。
多任务适用于处理一千个进程以内的问题。
进一步扩大规模时请使用MPI。