In [17]:
import time
import os
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

### basics

- GIL：Global Interpreter Lock
    - 确保同一时刻只有一个线程执行 Python 字节码。
        - only one thread can execute **Python Bytecode** at a time
    - GIL 的存在导致：伪多线程。**GIL contention across threads**
        - 多个线程无法同时执行 Python 字节码，导致多线程在 CPU 密集型任务中无法提高性能。
            - 名义上可以继续写 multi thread 的代码，但实际上并不会得到预期的效率提升；
        - 在执行 I/O 操作（如文件读写、网络请求）时，线程会释放 GIL，允许其他线程运行。
- 多线程 vs. 多进程
    - 多线程
        - 线程间通信成本低，数据共享方便。
        - 适用于 I/O 密集型任务，如网络请求、文件读写。
    - 多进程
        - 能够充分利用多核 CPU，实现真正的并行计算。
        - 适用于 CPU 密集型任务，如复杂计算、数据处理。
    - 涉及到拿到函数（线程化/进程化）执行后的返回值时；
        - 线程共享同一进程的内存空间
            - 在同一进程内运行多个线程，线程之间共享内存空间和全局变量。
        - 在多进程中，每个进程都有自己独立的内存空间。
- `concurrent.futures`：多线程/多进程管理及更方便地取出线程及进程的返回值
    - 当你使用 `with ThreadPoolExecutor(...) as executor:` 时，with 块会在退出时自动调用 `executor.shutdown(wait=True)`。shutdown 方法会等待所有正在执行的任务完成。
- `threading` => `concurrent.futures.ThreadPoolExecutor`
- `multiprocessing` => `concurrent.futures.ProcessPoolExecutor`

### DP vs. DDP (torch)

- https://pytorch.org/tutorials/beginner/ddp_series_theory.html


| DataParallel                                    | DistributedDataParallel                   |
|-------------------------------------------------|-------------------------------------------|
| More overhead; model is replicated and destroyed at each forward pass | Model is replicated only once             |
| Only supports single-node parallelism           | Supports scaling to multiple machines     |
| Slower; uses multithreading on a single process and runs into Global Interpreter Lock (GIL) contention | Faster (no GIL contention) because it uses multiprocessing |


### io vs. cpu


- I/O 密集型任务主要受限于 I/O 操作的速度，任务的执行时间主要取决于外部设备（如磁盘、网络）的响应速度。
    - 需要频繁的磁盘读写、网络请求或其他外部设备交互。
    - CPU 大部分时间处于等待状态，等待 I/O 操作完成。
    - 性能瓶颈在于 I/O 设备的速度而非 CPU。
- CPU-bound tasks
    - CPU 密集型任务主要消耗 CPU 的计算资源，任务的执行时间主要取决于 CPU 的运算速度。
    - 需要大量的数学计算或数据处理。
    - 任务过程中很少或没有 I/O 操作（如磁盘读写、网络通信）。
    - 性能瓶颈在于 CPU 的处理能力。

In [35]:
import requests

def fetch_url(url):
    response = requests.get(url)
    print(f"访问 {url}，状态码：{response.status_code}")

In [40]:
urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://www.github.com',
    # 可以添加更多的 URL
]

In [42]:
t0 = time.time()
for url in urls:
    status = fetch_url(url)
print(f"总耗时：{time.time() - t0:.2f} 秒")

访问 https://www.example.com，状态码：200
访问 https://www.python.org，状态码：200
访问 https://www.github.com，状态码：200
总耗时：2.24 秒


In [45]:
t0 = time.time()
threads = []
for url in urls:
    t = threading.Thread(target=fetch_url, args=(url,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"总耗时：{time.time() - t0:.2f} 秒")

访问 https://www.python.org，状态码：200
访问 https://www.github.com，状态码：200
访问 https://www.example.com，状态码：200
总耗时：0.90 秒


In [44]:
def is_prime(n):
    if n <= 1:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

def count_primes(limit):
    count = 0
    for num in range(2, limit):
        if is_prime(num):
            count += 1
    return count

In [48]:
t0 = time.time()
prime_count = count_primes(10000000)
print(f"找到素数数量：{prime_count}, 耗时：{time.time() - t0:.2f} 秒")

找到素数数量：664579, 耗时：68.84 秒


In [49]:
def count_primes_in_range(start, end):
    count = 0
    for num in range(start, end):
        if is_prime(num):
            count += 1
    return count

def count_primes_parallel(limit):
    pool = multiprocessing.Pool()
    num_processes = multiprocessing.cpu_count()
    ranges = []
    step = limit // num_processes
    for i in range(num_processes):
        start = i * step
        end = limit if i == num_processes - 1 else (i + 1) * step
        ranges.append((start, end))
    results = pool.starmap(count_primes_in_range, ranges)
    return sum(results)

In [52]:
limit = 10000000
t0 = time.time()
prime_count = count_primes_parallel(limit)
print(f"找到素数数量：{prime_count}, 耗时：{time.time() - t0:.2f} 秒")

找到素数数量：664579, 耗时：3.97 秒


### threading

In [6]:
# cpu 
def count(n):
    process_id = os.getpid()
    thread_id = threading.get_ident()
    res = 0
    print(f"Process: {process_id} Thread: {thread_id} start")
    while n > 0:
        res += n
        n -= 1
    print(f"Process: {process_id} Thread: {thread_id} end")
    return res

In [5]:
NUM1 = 100000000
NUM2 = 100000000

In [9]:
def single_thread():
    start = time.time()
    res1 = count(NUM1)
    res2 = count(NUM2)
    end = time.time()
    print(f"单线程耗时：{end - start:.2f} 秒, res1: {res1}, res2: {res2}")

In [11]:
import numpy as np
np.sum(np.arange(1, 100000000+1))

5000000050000000

In [10]:
single_thread()

Process: 825838 Thread: 130371915138880 start
Process: 825838 Thread: 130371915138880 end
Process: 825838 Thread: 130371915138880 start
Process: 825838 Thread: 130371915138880 end
单线程耗时：11.26 秒, res1: 5000000050000000, res2: 5000000050000000


In [13]:
def multi_thread():
    start = time.time()
    res1 = None
    res2 = None
    
    def thread_func1(num):
        # 线程共享同一进程的内存空间
        nonlocal res1
        res1 = count(num)
    
    def thread_func2(num):
        nonlocal res2
        res2 = count(num)
    
    t1 = threading.Thread(target=thread_func1, args=(NUM1,))
    t2 = threading.Thread(target=thread_func2, args=(NUM2,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    end = time.time()
    print(f"多线程耗时：{end - start:.2f} 秒, res1: {res1}, res2: {res2}")

In [14]:
multi_thread()

Process: 825838 Thread: 130370871752256 start
Process: 825838 Thread: 130370863359552 start
Process: 825838 Thread: 130370863359552 end
Process: 825838 Thread: 130370871752256 end
多线程耗时：39.29 秒, res1: 5000000050000000, res2: 5000000050000000


In [15]:
def multi_thread_2():
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=2)
    res1 = executor.submit(count, n=NUM1)
    res2 = executor.submit(count, n=NUM2)
    
    submit_time = time.time()
    print(f"多线程2提交任务耗时：{submit_time - start:.2f} 秒")
    
    # Wait for the result (this will block until the tasks are done)
    result1 = res1.result()
    result2 = res2.result()
    end = time.time()
    
    print(f"多线程2总耗时：{end - start:.2f} 秒， res1: {result1}, res2: {result2}")

In [16]:
multi_thread_2()

Process: 825838 Thread: 130370871752256 start
Process: 825838 Thread: 130370863359552 start
多线程2提交任务耗时：0.01 秒
Process: 825838 Thread: 130370871752256 end
Process: 825838 Thread: 130370863359552 end
多线程2总耗时：42.97 秒， res1: 5000000050000000, res2: 5000000050000000


In [18]:
def multi_thread_3():
    start = time.time()
    with ThreadPoolExecutor(max_workers=2) as executor:
        res1 = executor.submit(count, n=NUM1)
        res2 = executor.submit(count, n=NUM2)
    submit_time = time.time()
    print(f"多线程3提交任务耗时：{submit_time - start:.2f} 秒")
    
    # Wait for the result (this will block until the tasks are done)
    result1 = res1.result()
    result2 = res2.result()
    end = time.time()
    
    print(f"多线程3总耗时：{end - start:.2f} 秒， res1: {result1}, res2: {result2}")

In [19]:
multi_thread_3()

Process: 825838 Thread: 130370863359552 start
Process: 825838 Thread: 130370871752256 start
Process: 825838 Thread: 130370863359552 end
Process: 825838 Thread: 130370871752256 end
多线程3提交任务耗时：44.27 秒
多线程3总耗时：44.27 秒， res1: 5000000050000000, res2: 5000000050000000


In [24]:
def thread_share():
    start = time.time()
    shared_list = []

    def thread_func1(num):
        for i in range(num):
            shared_list.append(f"Thread1-{i}")
            time.sleep(0.1)

    def thread_func2(num):
        for i in range(num):
            shared_list.append(f"Thread2-{i}")

    t1 = threading.Thread(target=thread_func1, args=(10,))
    t2 = threading.Thread(target=thread_func2, args=(5,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    end = time.time()
    print(f"多线程耗时：{end - start:.2f} 秒, shared_list: {shared_list}")

In [25]:
thread_share()

多线程耗时：1.01 秒, shared_list: ['Thread1-0', 'Thread2-0', 'Thread2-1', 'Thread2-2', 'Thread2-3', 'Thread2-4', 'Thread1-1', 'Thread1-2', 'Thread1-3', 'Thread1-4', 'Thread1-5', 'Thread1-6', 'Thread1-7', 'Thread1-8', 'Thread1-9']


### processes

In [29]:
res1 = None
res2 = None

def process_func1(num):
    # 在多进程中,每个进程都有自己独立的内存空间。
    global res1
    res1 = count(num)

def process_func2(num):
    global res2
    res2 = count(num)

def multi_process():
    start = time.time()
    
    p1 = multiprocessing.Process(target=process_func1, args=(NUM1,))
    p2 = multiprocessing.Process(target=process_func2, args=(NUM2,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print(f"多进程耗时：{end - start:.2f} 秒, res1: {res1}, res2: {res2}")

In [30]:
multi_process()

Process: 6581 Thread: 130371915138880 start
Process: 6579 Thread: 130371915138880 start
Process: 6579 Thread: 130371915138880 end
Process: 6581 Thread: 130371915138880 end
多进程耗时：5.60 秒, res1: None, res2: None


In [32]:
def multi_process_2():
    start = time.time()
    with ProcessPoolExecutor(max_workers=2) as pool:
        res1 = pool.submit(count, n=NUM1)
        res2 = pool.submit(count, n=NUM2)
    end = time.time()
    # print(f"多进程2耗时：{end - start:.2f} 秒， res1: {res1.result()}, res2: {res2.result()}")
    print(f"多进程2 耗时：{end - start:.2f} 秒")
    print(f"多进程2 res1: {res1.result()}, res2: {res2.result()}")

In [33]:
multi_process_2()

Process: 7071 Thread: 130371915138880 startProcess: 7072 Thread: 130371915138880 start

Process: 7072 Thread: 130371915138880 end
Process: 7071 Thread: 130371915138880 end
多进程2 耗时：5.42 秒
多进程2 res1: 5000000050000000, res2: 5000000050000000
