### 启动与停止线程
threading库可以在单独的线程中执行任何在python中可以调用的对象,创建一个Thread对象传递可调用对象.通过start,join来控制线程

> 由于全局解释锁(GIL)的原因,Python的线程被限制到同一时刻只允许一个线程执行,如果想要利用多核推荐使用multiprocessing或concurrent.futures.ProcessPoolExecutor.想要同时运行多个I/O这个模型是可以的

In [6]:
from itertools import cycle
import time 
from threading import Thread
import threading
def spin(i):
    for s in cycle(['\\','|','/']):
        print(s)   
        # time.sleep(1)     
        i-=1
        if not i:
            break
# spin(10)

t1=Thread(target=spin,args=[10])
t1.start()
t1.join()
print('join end')
# 获取当线程个数
print(threading.active_count())
# 
threading.current_thread()

\,|,/,\,|,/,\,|,/,\,join end
7


<_MainThread(MainThread, started 8623595008)>

### 判断线程是否已启动
使用threading库下的Event对象,Event对象包含一个可由线程设置的信号标志,它运行线程等待某些事件发,在初始情况下,event对象中标志为假,为假这个线程将会被一直等待,直到为true

> event对象最好单独使用,尽管可以用clear()来清楚

In [8]:
from threading import *

ev=Event()

def print_msg(ev,t):
    print('msg start...')
    ev.set()
    while t:
        print(t)
        import time
        time.sleep(0.5)
        t-=1

t=Thread(target=print_msg,args=[ev,5])

t.start()
print('t start...')

ev.wait()

start...
5
t start...


True

4
3
2
1


### 线程间的通信
从一个线程向另一个线程发送数据最安全方法使用queue库Queue对象,线程通过使用put和get操作来向队列中添加或删除元素

> Queue对象已经包含了锁的所以是线程安全

- Queue.put(item, block=True, timeout=None)
- Queue.put_nowait(item)
- Queue.get_nowait()
- Queue.get(block=True, timeout=None)
- Queue.join():阻塞至队列里所有元素都被接受不和处理完毕

In [13]:
from threading import *
from queue import Queue
import time
q=Queue()

t1=Thread(target=lambda q:q.put(2),args=[q])
t2=Thread(target=lambda q:print(q.get(2)),args=[q])
t1.start()
time.sleep(1)
t2.start()


class BlockQueue:

    def __init__(self,size):
        self._queue=[]
        self._size=size
        self._condi=Condition()

    def push(self,ele):
        with self._condi:
            while len(self._queue)==self._size:
                print('push wait')
                self._condi.wait()
            self._queue.append(ele)
            self._condi.notify_all()
    
    def poll(self):
        with self._condi:
            while len(self._queue)==0:
                self._condi.wait()
            self._condi.notify_all()
            print('poll')
            return self._queue.pop()

b=BlockQueue(1)

def show_pop(queue):
    time.sleep(3)
    queue.poll()
    print('end')

 

t1=Thread(target=show_pop,args=[b])

b.push(1)
t1.start()
print('t1 start')
b.push(2)
print('2')


2t1 start
push wait

poll
end
2


### 加锁
多线程加锁使用的是threading库中Lock对象,Lock对象可以与with语句块一起使用来保证互斥执行

In [None]:
import threading

class Counter:

    # 累锁每次只能一个线程进入该对象
    _lock=threading.Lock()

    def __init__(self) -> None:
        self._i=0

    def incr(self,i=1):
        with Counter._lock:
            self._i+=1
        
    
    def decr(self,i=1):
        with Counter._lock:
            self._i-=i


### 防止死锁的加锁机制
造成死锁的原因由于线程同时获取多个锁造成的,比如一个线程获取一个锁,然后在获取第二锁的时候发生了阻塞,在此上已经有一个线程获取第二锁,阻塞获取第一个锁就会造成死锁现象.

解决死锁的方法:顺序加锁

In [16]:
import threading
import time
def die_lock(left_lock,right_lock):
    with left_lock:
        print(f'step1 lock->{left_lock}',flush=True)
        time.sleep(1) 
        with right_lock:
            print(f'step2 lock->{right_lock}',flush=True)
l=threading.Lock()       
r=threading.Lock()

t1=threading.Thread(target=die_lock,args=[l,r])

t2=threading.Thread(target=die_lock,args=[r,l])

t1.start()
t2.start()

# contextlib.contextmanger 生成器装饰with
from contextlib import contextmanager
# 把锁的最小值存储在该变量上
_thread_local=threading.local()
def acquired(*locks):
    locks=sorted(locks,key=lambda a:id(a)) 
    
    acquired=getattr(_thread_local,'acquired',[])
    if acquired and max(id(acquired) for l in acquired)>=id(local[0]):
        raise RuntimeError('lock order error')
    
    acquired.extend(locks)
    _thread_local.acquired=acquired
    try:
        # 正着加锁
        for l in locks:
            l.acquire()
    finally:
        for l in reversed(locks):
            # 反着解锁
            l.release()
        del acquired[-len(locks):]

step1 lock-><locked _thread.lock object at 0x109018380>
step1 lock-><locked _thread.lock object at 0x1090c4800>


### 保存线程的状态信息
通过threading.local()来创建一个本地线层存储对象,这个对象的属性的保持和读取操作只会对执行线程可见,其他线程不可见.

> local()实例为每个线程维护着一个单独实例字典,也就是说每个线程使用一个独立的字典就可以保证数据隔离

In [22]:
import threading

main_local=threading.local()
main_local.i=10
def print_local(__local):
    print(getattr(__local,'i',0),id(__local))
print_local(main_local)

t1=threading.Thread(target=print_local,args=[main_local])

t1.start()

t1.join()

10 4446460544
0 4446460544


### 线程池
concurrent.futures函数库有一个ThreadPoolExecutor线程池对象

它是父类为Executor对象
- submit(fn,/,*args,**kwargs)方法,返回一个Future对象
- map(fn,*iterables,timeout)类似于map(func,*iterables)
- shutdown(wait=True),wait为True等待所有future对象执行完毕

> ProcessPoolExecutor另外一个子类管理进程的,map类似与fork-join,

> threading.stack_size() 控制栈大小

In [2]:
import requests
from concurrent.futures import ThreadPoolExecutor
def web():
    resp=requests.get('http://www.baidu.com')
    print(resp.status_code)

with ThreadPoolExecutor(max_workers=1) as pool:
    fu=pool.submit(web)
    fu.result()

200


### python全局锁问题

尽管Python完全支持多线程编程， 但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。 实际上，解释器被一个全局解释器锁保护着，它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 

In [None]:
#Actor 模型 
import queue
import threading

class Actor():
    def __init__(self):
        self.queue=queue.Queue()
        self.t=threading.Thread(target=self.invoke)

    def invoke(self):
        while True:
            res=self.rev()
            if res==1:
                raise RuntimeError('invoke end')
            func,args=res
            func(args)

    def send(self,act):
        self.queue.put(act)
    
    def rev(self):
        return self.queue.get()
    
    def start(self):
        self.t.start() 

a=Actor()

a.start()

a.send((print,'12'))
a.send((print,'13'))
a.send(1)
a.send((print,'14'))




12

Exception in thread Thread-7 (invoke):
Traceback (most recent call last):
  File "/Users/harden/anaconda3/lib/python3.11/threading.py", line 1038, in _bootstrap_inner



13


    self.run()
  File "/Users/harden/anaconda3/lib/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/m4/fgyn19bn1cb4vzrvd9hbd2xr0000gn/T/ipykernel_929/740105043.py", line 14, in invoke


RuntimeError: invoke end
