# 简单了解线程

这里面watch函数是一个装饰器,基本上装饰器就这么玩

* 由于有GIL, python线程不适用于计算密集型任务, 但对IO密集型任务有很大作用.

## GIL 仅适用于CPython, 但其他解释器比如Pypy, Jython就没有GIL

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/01_Introduction.html

In [6]:
import time 
from threading import Thread

def countdown(n):
    while n>0:
        print('T-minus',n)
        n-=1
        time.sleep(1)
def fib(n):
    if n==1 or n==2:
        return 1
    return fib(n-1)+fib(n-2)

def watch(func):
    def wrapper(*args,**kwargs):
        print('{} start!'.format(func.__name__))
        import time
        start=time.time()
        func(*args,**kwargs)
        end=time.time()
#         print('Cost: ', end-start)
        print("{} ends, spends {:0,.4f} seconds!".format(func.__name__, end-start))
    return wrapper

@watch
def single_thread():
    i=fib(20)
    ii=fib(20)
    return 1
@watch
def duo_threads():
    t1=Thread(target=fib,args=(35,))
    t2=Thread(target=fib,args=(35,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
if __name__=='__main__':
    single_thread()
    duo_threads()
    

single_thread start!
single_thread ends, spends 0.0080 seconds!
duo_threads start!
duo_threads ends, spends 5.2912 seconds!


## 线程类

一下是一个threading的类, 用用法和函数差不多~

In [18]:
import time
from threading import Thread

class CountDownTask:
    def __init__(self):
        self._running=True
    def terminate(self):
        self._running=False
        print('Threading has been terminated!')
    def run(self,n):
        while self._running and n>0:
            print('T-minus',n)
            n-=1
            time.sleep(1)
            
if __name__ =='__main__':
    c=CountDownTask()
    t=Thread(target=c.run, args=(10,))
#     t=Thread(target=c.run, args=(10,), daemon=True)# 这是后台线程,后台线程无法等待,但会在主线程停止后自动销毁
    t.start()
    time.sleep(1)
    
    if t.is_alive():
        print('Still running!')
    else:
        print('Completed!')
    time.sleep(1)
    c.terminate()
    
    if t.is_alive():
        print('Still running!')
    else:
        print('Completed!')
#     t.join()
    
    if t.is_alive():
        print('Still running!')
    else:
        print('Completed!')

T-minus 10
Still running!
T-minus 9
Still running!
Still running!
T-minus 8
T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


## join() & daemon
join() 是子线程用来阻塞主线程的一个函数, 当子线程daemon设置为True时,主线程执行完后自动退出,不会等待子线程. 而随着主线程的退出,子线程也随之强行退出.

当daemon==True 时, joint(timeout)里面的timeout有效, 主线程会等待timeout时间后结束子线程.

当daemon==False 时, join(timeout)里面的timeout无效, 主线程会一直等当子线程结束.


当多个timeout和多个子线程时, 主线程会等待超时时间最长为N*timeout

In [None]:
from threading import Thread

from time import sleep

class CookBook(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.msg='hello!'
        self.daemon=False

    def print_msg(self):
        print(self.msg)

    def run(self):
        print('Thread start!')
        x=0
        while x<10:
            self.print_msg()
            sleep(1)
            x+=1
        print('Thread end!')

print('Main process start!')
t=CookBook()

t.start()
t.join(2)
print('thread end!thread end!thread end!thread end!thread end!thread end!')


## threading.Lock()

锁是用来让子线程访问共享内存的

 防止多线程出现问题的锁.lock=threading.Lock()
 
 作为全局变量出现在程序中. 如果某线程需要改变某个全局变量,就会用lock.acquire() 来获得所
 当有其他线程占用lock时,子线程处于阻塞状态

In [None]:
import threading
import time

balance = 0
lock = threading.Lock()


class AddOne(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        global balance, lock
        while True:
            lock.acquire()
            try:
                balance += 1
                time.sleep(1)

                print('This is {} and balance =={}'.format(self.name, balance))
            except:
                print('This is {} error!'.format(self.name))
                time.sleep(1)
            finally:
                lock.release()


class MinusOne(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        global balance, lock
        while True:
            lock.acquire()
            try:
                balance -= 1
                time.sleep(1)

                print('This is {} and balance =={}'.format(self.name, balance))
            except:
                print('This is {} error!'.format(self.name))
                time.sleep(1)
            finally:
                lock.release()


if __name__ == '__main__':
    t1 = AddOne('add1')
    t2 = MinusOne('Minus1')
    t1.start()
    t2.start()
    t1.join()
    t2.join()

## threading.RLock()

这个比Lock()带劲. Reentrant Lock, 可重复的锁/递归锁.
    * 谁拿到谁释放
    * 同一线程可以多次拿到
    * 拿到多次,必须释放多次, 只有最后一次release才可以让RLock() 变成unblock!
    

In [None]:
import threading
import time

class Box:
    lock=threading.RLock()

    def __init__(self, items=0):
        self.total_items=items

    def execute(self,n):
        Box.lock.acquire()
        self.total_items+=n
        print('remains ',self.total_items)
        Box.lock.release()

    def add(self):
        Box.lock.acquire()
        self.execute(1)
        Box.lock.release()

    def remove(self):
        Box.lock.acquire()
        self.execute(-1)
        Box.lock.release()


def add(box,items):
    while items>0:
        box.add()
        time.sleep(1)
        items-=1
        print('Adding 1 item in the box')

def remove(box, items):
    while items>0:
        box.remove()
        items-=1
        time.sleep(1)
        print('Removing 1 item in the box')


if __name__=='__main__':
    items=5
    print('put {} items in the box'.format(items))
    box=Box()
    t1=threading.Thread(target=add, args=(box,items))
    t2=threading.Thread(target=remove, args=(box,items))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("{} items still remain in the box".format(box.total_items))

## threading.Semaphore()

信号同步

semaphore=threading.Semaphore(0)是一种信号量(可以理解成船上打旗语的人)
如果以生产者消费者模型来解释的话就是:

生产者每生产一个item后, 执行semaphore.release(), 本操作会使得semaphore李阿敏的计数器+1. 

而当semaphore里面的计数器!=0时, 会广播给所有之前因为触发semaphore.acquire()而暂停的消费者子线程通知,然后把item分发给消费者直到计数器重新==0.

当然啦~再有AB两个计数器时,可能会有死锁的情况. 

* 信号量在初始化时必须加入数字, threading.Semaphore(0)!否则会出现问题.默认是1

信号同步更适合控制访问量,比如说单位时间内访问server的次数之类的.

In [None]:
# 一个生产者和3个消费者模型!
import threading
import time
import random

semaphore=threading.Semaphore(0)
items=[]
lock=threading.Lock()
def consumer(name):
    global items
    while True:
        print('Conusmer {} is waiting!'.format(name))
        semaphore.acquire()

        # lock.acquire()
        print(items)
        # if items:
        times=items.pop()
        print('Cosumer {} get the ball!need {} seconds!'.format(name, times))
        time.sleep(times)
        # lock.release()

        print('Consumer {} finish working!'.format(name))

def producer():
    global items
    print("Producer starts working!")
    n=5
    while n>0:
        print('#################round ', 5-n)
        n-=1
        item=random.randint(2,5)
        items.append(item)
        print(' Produce # {}'.format(item))
        semaphore.release()
        time.sleep(1)

if __name__== '__main__':
    threads=[]
    for i in range(3):
        threads.append(threading.Thread(target=consumer, args=(str(i),), daemon=True))
    p=threading.Thread(target=producer)
    for i in threads:
        i.start()
    p.start()
    for i in threads:
        i.join(10)
    p.join()
    print("program terminated!!!!!")

In [None]:
import threading
import time
import random

semaphore=threading.Semaphore(0)

def consumer(name):
    global item
    print('Cosumer {} is waiting!'.format(name))
    semaphore.acquire()
    print('Consumer {} notify: consumed item number {}'.format(name, item))

def producer():
    global item
    time.sleep(5)
    item=random.randint(0,1000)
    print("producer notify: produced item number {}".format(item))
    semaphore.release()

if __name__=='__main__':
    for i in range(5):
        t1=threading.Thread(target=producer)
        t2=threading.Thread(target=consumer, args=('c2',))

        t3=threading.Thread(target=consumer, args=('c3',))
        t1.start()
        t2.start()
        t3.start()
        t1.join(6)
        t2.join(6)
        t3.join(6)
    print("program terminated!!")

## threading.Condition()

条件同步

感觉条件同步比信号同步更适合生产消费者模式// 主动休眠,被动唤醒

condition=threading.Condition()

这个condition有四种方法:
    * condition.aquire()
    * condition.wait()
    * condition.release() 
    * condition.notify()   

一个一个说~

### condition.acquire()
    这个就是拿到锁,然后可以访问共享资源
    
### condition.notify() // condition.release()
    这两个一般可以放一起看, 一个是发出通知唤醒其他的等待线程,然后释放公共资源

### condition.wait()
    这个是线程等待状态,运行到这后线程会被阻塞,知道收到notify()

### condition.notifyAll()
    这个是会唤醒所有在等待的线程, 貌似会忽略唤醒顺序,而notify()会依次唤醒线程

在condition里面是有个以Rlock的, 如果在创建对象时没有主动创建Rlock的话,那 程序自己给自己new一个Rlock

In [None]:
from threading import Thread, Condition
from time import sleep

items=[]
condition =Condition()

class Consumer(Thread):

    def __init__(self):
        Thread.__init__(self)
        self.daemon=True

    def consume(self):
        global condition
        global items
        condition.acquire()
        if not len(items):
            condition.wait()
            print('Consumer notify: no item in basket')
        items.pop()
        print('Consumer {} notify: consume 1 item'.format(self.name))
        print('Consumer {} notify: items to consume are '.format(self.name)+ str(len(items)))

        condition.notify()
        condition.release()

    def run(self):
        for i in range(20):
            sleep(3)
            self.consume()

class Producer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items)==10:
            condition.wait()
            print('Producer notify: items produced are'+str(len(items)))
            print('Producer notify: stop the production!')
        items.append(1)
        print('Producer notify: total items produced '+ str(len(items)))
        condition.notify()
#         condition.notfiyAll()
        condition.release()

    def run(self):
        for i in range(20):
            sleep(1)
            self.produce()
if __name__=='__main__':
    producer=Producer()
    consumer1=Consumer()
    consumer2=Consumer()
    producer.start()
    consumer1.start()
    consumer2.start()
    producer.join()
    consumer1.join(3)
    consumer2.join(3)
    

## threading.Event()

event=threading.Event() 可以理解成一个子线程控制开关

### event.set()/ event.clear()
event在创建时默认是False, (就是说event在创建完成后event.is_set()==False),
可以用event.set() 设置成True. 在设置成True后可以通过event.clear()方法重新恢复成False.

### event.wait()
event.wait(timeout) 类似于join在主线程里的玩法, 这个命令会阻塞子线程的运行.
可以设置timeout阻塞时间, 超过时间还没反应就直接运行.或者在某线程设置event.set() 后运行.

但在pythonCookBook上写到, event()更适合单次使用, Condition 比较适合循环使用.

这个网址有个图片感觉挺好的~解释生产消费者模型

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/10_Thread_synchronization_with_an_event.html

In [None]:
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('worker1 waiting for redis ready..')
    event.wait()
    logging.debug('worker1 redis ready, start working!')
    # time.sleep(1)

def worker2(event):
    while not event.is_set():
        logging.debug('worker2 Waiting for redis ready')
        event.wait(1)
    logging.debug('worker2 redis ready, start working!')
    time.sleep(1)

redis_ready=threading.Event()
t1=threading.Thread(target=worker2, args=(redis_ready,), name='t1', daemon=True)
t1.start()
t2=threading.Thread(target=worker2, args=(redis_ready,), name='t2')
t2.start()

logging.debug('check server!')
for i in range(3):
    time.sleep(1)
    logging.debug('Checking!')

time.sleep(3)
print("Redis has been launched!")
redis_ready.set()
print('waiting for 3 more seconds.')
time.sleep(3)

if t1.is_alive():
    print('t1 is still running!')
else:
    print('t1 down')
if t2.is_alive():
    print('t2 is still running!')
else:
    print('t2 down!')

In [None]:
import time
from threading import Thread, Event
import random
items=[]
event=Event()

class Consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items=items
        self.event=event

    def run(self):
        while True:
            time.sleep(2)
            self.event.wait()
            item=self.items.pop()
            print('Consumer notify: {} popped from list by {}'.format(item, self.name))

class Producer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items=items
        self.event=event

    def run(self):
        global item
        for i in range(100):
            time.sleep(1)
            item=random.randint(0,256)
            self.items.append(item)
            print('Producer notify: item {} appended to list by {}'.format(item, self.name))
            self.event.set()
            print('Produce notify: event cleared by ', self.name)
            self.event.clear()

if __name__=='__main__':
    t1=Producer(items, event)
    t2=Consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()








## with 在threading 中的用法

with 也被叫做上下文管理器

就是有__enter__() 和__exit__()方法.
可以自动加载和退出.相对保险.

https://www.kawabangga.com/posts/2010

In [None]:
import threading
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def threading_with(statement):
    with statement:
        logging.debug('%s acquired via with' % statement)

def threading_not_with(statement):
    statement.acquire()
    try:
        logging.debug('%s acquired directly' % statement)
    finally:
        statement.release()

if __name__=='__main__':
    lock=threading.Lock()
    rlock=threading.RLock()
    condition=threading.Condition()
    mutex=threading.Semaphore(1)
    threading_sync_list=[lock,rlock,condition,mutex]

    for statement in threading_sync_list:
        t1 = threading.Thread(target=threading_with, args=(statement,))
        t2 = threading.Thread(target=threading_not_with, args=(statement,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()


## 用Queue传递信息

queue的用法还是很多的
基本的就是get(), put() 分别对应pop()和append()

还有task_done()和join() 这两个可以用来阻塞线程.

http://www.vuln.cn/8610

https://docs.python.org/3/library/queue.html

In [None]:
from threading import Thread, Event
from queue import Queue
import time
import random

class Producer(Thread):
    def __init__(self,queue):
        Thread.__init__(self)
        self.queue=queue
        self.name='Producer'
    def run(self):
        for i in range(10):
            item=random.randint(0,256)
            self.queue.put(item)
            print('Producer notify: item {} appeneded to queue by {}.'.format(item, self.name))
            time.sleep(1)

class Consumer(Thread):
    def __init__(self, queue,name):
        Thread.__init__(self)
        self.queue=queue
        self.name=name
    def run(self):
        while True:
            item = self.queue.get()
            print('Consumer notify: {} popped from queue by {}.'.format(item, self.name))
        self.queue.task_done()

if __name__=='__main__':
    queue=Queue()
    t1 = Producer(queue)
    t2 = Consumer(queue, 'c1')
    t3 = Consumer(queue, 'c2')
    t4 = Consumer(queue, 'c3')
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()