# threading模块实现多线程的方法

## 线程是最小的执行单元，而进程由至少一个线程组成。如何调度进程和线程，完全由操作系统决定，程序自己不能决定什么时候执行，执行多长时间。多进程和多线程的程序涉及到同步、数据共享的问题，编写起来更复杂。

多线程就是在一个进程中同时执行多个线程，尤其对于网络爬虫这种 IO 密集型任务来说，使用多线程会大大提高程序整体的爬取效率。在 Python 中，实现多线程的模块叫作 threading，是 Python 自带的模块。

## 方法一：直接创建子线程

In [57]:
import threading
import time

def target(second):
    print(f"Threading {threading.current_thread().name} is running")
    print(f"Threading {threading.current_thread().name} sleep {second}s")
    time.sleep(second)
    print(f"Threading {threading.current_thread().name} is ended")
    
print(f"Threading {threading.current_thread().name} is running")

for i in [1,5]:
    t = threading.Thread(target=target,args=[i])
    t.start()
    
print(f"Threading {threading.current_thread().name} is ended")
    

Threading MainThread is running
Threading Thread-149 is running
Threading Thread-149 sleep 1s
Threading Thread-150 is running
Threading Thread-150 sleep 5s
Threading MainThread is ended
Threading Thread-149 is ended
Threading Thread-150 is ended


In [12]:
import threading
import time

def target(second):
    print(f"Threading {threading.current_thread().name} is running")
    print(f"Threading {threading.current_thread().name} sleep {second}s")
    time.sleep(second)
    print(f"Threading {threading.current_thread().name} is ended")
    
print(f"Threading {threading.current_thread().name} is running")

for i in [1,5]:
    t = threading.Thread(target=target,args=[i])
    t.start()
    t.join() #调用join方法，主线程就会等待各个子线程执行完毕再退出
    
print(f"Threading {threading.current_thread().name} is ended")

Threading MainThread is running
Threading Thread-14 is running
Threading Thread-14 sleep 1s
Threading Thread-14 is ended
Threading Thread-15 is running
Threading Thread-15 sleep 5s
Threading Thread-15 is ended
Threading MainThread is ended


## 方法二：继承 Thread 类创建子线程

In [23]:
import threading
import time

class MyThread(threading.Thread):
    def __init__(self,second):
        threading.Thread.__init__(self)
        self.second = second
    
    def run(self):
        print(f"Threading {threading.current_thread().name} is running")
        print(f"Threading {threading.current_thread().name} sleep {self.second}s")
        time.sleep(self.second)
        print(f"Threading {threading.current_thread().name} is ended")

print(f"Threading {threading.current_thread().name} is running")

for i in [1,5]:
    t = MyThread(i)
    t.start()
    t.join()

print(f"Threading {threading.current_thread().name} is ended")

Threading MainThread is running
Threading Thread-36 is running
Threading Thread-36 sleep 1s
Threading Thread-36 is ended
Threading Thread-37 is running
Threading Thread-37 sleep 5s
Threading Thread-37 is ended
Threading MainThread is ended


## 实践：


函数fork()调用，把当前进程（称为父进程）复制了一份（称为子进程），然后，分别在父进程和子进程内返回。子进程永远返回0，而父进程返回子进程的ID。这样做的理由是，一个父进程可以fork出很多子进程，所以，父进程要记下每个子进程的ID，而子进程只需要调用getppid()获取父进程的ID。getpid()获取进程自己的ID。Python的os模块封装了常见的系统调用，其中就包括fork，可以在Python程序中轻松创建子进程。

**Unix/Linux操作系统有fork系统调用，Windows无。**

In [83]:
import os

print(f'Process {os.getpid()}start...')
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print(f'I am child process {os.getpid()}and my parent is {os.getppid()}.' )
else:
    print(f'I {os.getpid()} just created a child process {pid}.' )

Process 57370start...
I 57370 just created a child process 61360.
I am child process 61360and my parent is 57370.


**multiprocessing模块就是跨平台版本的多进程模块，其提供了一个Process类来代表一个进程对象**

In [85]:
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print(f'Run child process {name} {os.getpid()}...')
if __name__=='__main__':
    print(f'Parent process {os.getpid()}.')
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

Parent process 57370.
Child process will start.
Run child process test 61434...
Child process end.


创建子进程时，只需要传入一个执行函数和函数的参数，创建一个Process实例，用start()方法启动，这样创建进程比fork()还要简单。
join()方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步。

**Pool
如果要启动大量的子进程，可以用进程池的方式批量创建子进程：**

In [93]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print(f'Run task {name} {os.getpid()}...' )
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print(f'Task {name} runs {(end - start):.2f} seconds.' )

if __name__=='__main__':
    print(f'Parent process {os.getpid()}.')
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Parent process 57370.
Run task 1 61572...
Run task 0 61571...
Run task 2 61573...
Run task 3 61574...
Waiting for all subprocesses done...
Task 0 runs 0.17 seconds.
Run task 4 61571...
Task 3 runs 1.55 seconds.
Task 1 runs 1.82 seconds.
Task 4 runs 2.21 seconds.
Task 2 runs 2.89 seconds.
All subprocesses done.


*对Pool对象调用join()方法会等待所有子进程执行完毕，调用join()之前必须先调用close()，调用close()之后就不能继续添加新的Process了。*

**子进程**<br>
很多时候，子进程并不是自身，而是一个外部进程。我们创建了子进程后，还需要控制子进程的输入和输出。subprocess模块可以让我们非常方便地启动一个子进程，然后控制其输入和输出。
下面的例子演示了如何在Python代码中运行命令nslookup www.python.org ，这和命令行直接运行的效果是一样的。

In [111]:
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

$ nslookup www.python.org
Exit code: 0


如果子进程还需要输入，则可以通过communicate()方法输入：

In [109]:
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

$ nslookup
Server:		192.168.200.1
Address:	192.168.200.1#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
org	nameserver = b0.org.afilias-nst.org.
org	nameserver = c0.org.afilias-nst.info.
org	nameserver = a2.org.afilias-nst.info.
org	nameserver = a0.org.afilias-nst.info.
org	nameserver = d0.org.afilias-nst.org.
org	nameserver = b2.org.afilias-nst.org.
mail.python.org	internet address = 188.166.95.178
a0.org.afilias-nst.info	internet address = 199.19.56.1
a0.org.afilias-nst.info	has AAAA address 2001:500:e::1
a2.org.afilias-nst.info	internet address = 199.249.112.1
a2.org.afilias-nst.info	has AAAA address 2001:500:40::1
b0.org.afilias-nst.org	internet address = 199.19.54.1
b0.org.afilias-nst.org	has AAAA address 2001:500:c::1
b2.org.afilias-nst.org	internet address = 199.249.120.1
b2.org.afilias-nst.org	has AAAA address 2001:500:48::1
c0.org.afilias-nst.info	internet address = 199.19.53.1
c0.org.afilias-nst.info	has

**进程间通信**<br>
Process之间肯定是需要通信的，操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制，提供了Queue、Pipes等多种方式来交换数据。

In [118]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print(f'Process to write: {os.getpid()}')
    for value in ['A', 'B', 'C']:
        print(f'Put {value} to queue...')
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print(f'Process to write: {os.getpid()}')
    while True:
        value = q.get(True)
        print(f'Get {value} to queue...')
        
if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

Process to write: 61978
Put A to queue...
Process to write: 61979
Get A to queue...
Put B to queue...
Get B to queue...
Put C to queue...
Get C to queue...


multiprocessing还提供了几个比较有用的方法，如我们可以通过cpu_count的方法来获取当前机器CPU的核心数量，通过active_children方法获取当前还在运行的所有进程。

In [116]:
from multiprocessing import cpu_count,active_children

print(f'cpu number:{cpu_count()}')
print(f'process:{active_children()}')

cpu number:4
process:[]


## 多任务可以由多进程完成，也可以由一个进程内的多线程完成。threading模块实现。启动一个线程就是把一个函数传入并创建Thread实例，然后调用start()开始执行：

In [120]:
import time, threading

# 新线程执行的代码:
def loop():
    print(f'thread {threading.current_thread().name} is running...' )
    n = 0
    while n < 5:
        n = n + 1
        print(f'thread {threading.current_thread().name} >>> {n}' )
        time.sleep(1)
    print(f'thread {threading.current_thread().name} ended.' )

print(f'thread {threading.current_thread().name} is running...')
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print(f'thread (threading.current_thread().name) ended.' )

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread (threading.current_thread().name) ended.


In [145]:
import time, threading

# 假定这是你的银行存款:
balance = 0
lock = threading.Lock()
def change_it(n):
    # 先存后取，结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()
        

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

0


In [142]:
balance = 0

lock = threading.Lock()
def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()