# 多线程
 - www.cnblogs.com/jokerbj/p/7460260.html

## 多线程 vs 多进程
- 程序: 一堆代码以文本形式存入一个文档
- 进程: 程序运行的一个状态
    - 包含地址空间, 内存, 数据栈等
    - 每个进程又自己完全毒里的运行环境, 多过程共享数据是一个问题
- 线程: 
    - 一个进程的独立运行片段, 一个进程可以有多个线程
    - 轻量化的进程
    - 一个进程的多个线程共享数据和上下文运行环境
    - 共享互斥问题
- 全局解释锁(GIL)
    - Python 代码的执行是由python虚拟机进行控制
    - 在主循环中只能有一个控制线程在执行
- python 包
    - thread 有问题, python 3 改成了_thread
    - threading, 通行的包


In [6]:
# 多线程案例
# 时间比较短 
import time
import _thread as thread
def loop1():
    print("线程1启动", time.ctime())
    time.sleep(4)
    print("线程1结束", time.ctime())
    
def loop2():
    print("线程2启动", time.ctime())
    time.sleep(2)
    print("线程2结束", time.ctime())
    
def main():
    print("主程序启动", time.ctime())
    thread.start_new_thread(loop1, ())
    thread.start_new_thread(loop2, ())
    print("主程序结束", time.ctime())

if __name__ == "__main__":
    main()


主程序启动 Sun Sep  2 20:22:25 2018
主程序结束 Sun Sep  2 20:22:25 2018
线程1启动线程2启动 Sun Sep  2 20:22:25 2018
 Sun Sep  2 20:22:25 2018
线程2结束 Sun Sep  2 20:22:27 2018
线程1结束 Sun Sep  2 20:22:29 2018


In [4]:
# 顺序执行 时间比较长
import time
def loop1():
    print("线程1启动", time.ctime())
    time.sleep(4)
    print("线程1结束", time.ctime())
    
def loop2():
    print("线程2启动", time.ctime())
    time.sleep(2)
    print("线程2结束", time.ctime())
    
def main():
    print("主程序启动", time.ctime())
    loop1()
    loop2()
    print("主程序结束", time.ctime())

if __name__ == "__main__":
    main()

主程序启动 Sun Sep  2 20:19:09 2018
线程1启动 Sun Sep  2 20:19:09 2018
线程1结束 Sun Sep  2 20:19:13 2018
线程2启动 Sun Sep  2 20:19:13 2018
线程2结束 Sun Sep  2 20:19:15 2018
主程序结束 Sun Sep  2 20:19:15 2018


## threading 的使用
   - 直接利用threadong.Thread 生成Thread实例
        1. t = threading.Thread(target=xxx, args=(xxx,))
        2. t.start(): 启动多线程
        3. t.join(): 等待多线程执行完成
        - 守护线程 daemon
            - 如果在程序中将子线程设置成守护线程, 则子线程会在主线程结束的时候自动退出
            - 一般认为, 守护线程不重要, 或者不允许离开主线程独立运行
            - 守护线程案例能否有效果与环境有关
        - 线程常用属性
            - threading.currentThread : 返回当前线程变量
            - threading.enumerate: 返回一个包含正在运行的线程的list, 正在运行的线程值得是线程启动后谁子啊跑
            - threading.activeCount: 返回正在运行的线程的数量,
            - thr.setName: 给线程设置名字
            - thr.getNnme: 得到线程的名字
   - 直接继承自 threading.Thread
        - 直接继承Thread
        - 重新run 方法
        - 类实例可以直接运行

In [14]:
# threading 案例
import time
import threading
def loop1():
    print("线程1启动", time.ctime())
    time.sleep(4)
    print("线程1结束", time.ctime())
    
def loop2():
    print("线程2启动", time.ctime())
    time.sleep(2)
    print("线程2结束", time.ctime())
    
def main():
    print("主程序启动", time.ctime())
    t1 = threading.Thread(target=loop1, args=())
    t1.start()
    t2 = threading.Thread(target=loop2, args=())
    t2.start()
    
    t1.join()
    t2.join()
    print("主程序结束", time.ctime())

if __name__ == "__main__":
    main()


主程序启动 Sun Sep  2 20:44:33 2018
线程1启动 Sun Sep  2 20:44:33 2018
线程2启动 Sun Sep  2 20:44:33 2018
线程2结束 Sun Sep  2 20:44:35 2018
线程1结束 Sun Sep  2 20:44:37 2018
主程序结束 Sun Sep  2 20:44:37 2018


In [21]:
# 守护线程 案例
import time
import threading
def func():
    print("线程启动")
    time.sleep(2)
    print("线程结束")
print("主程序启动")    
t1 = threading.Thread(target=func, args=())
t1.start()
time.sleep(1)
print("主程序结束")


主程序启动
线程启动
主程序结束
线程结束


In [20]:
# 环境有误
import time
import threading
def func():
    print("线程启动")
    time.sleep(2)
    print("线程结束")
print("主程序启动")    
t1 = threading.Thread(target=func, args=())
t1.setDaemon(True)
t1.start()
time.sleep(1)
print("主程序结束")


主程序启动
线程启动
主程序结束
线程结束


In [22]:
import time
import threading
def loop1():
    print("线程1启动")
    time.sleep(4)
    print("线程1结束")
    
def loop2():
    print("线程2启动")
    time.sleep(2)
    print("线程2结束")
    
def loop3():
    print("线程3启动")
    time.sleep(6)
    print("线程3结束")
def main():
    print("主程序启动")
    t1 = threading.Thread(target=loop1, args=())
    t1.setName("线程1")
    t1.start()
    t2 = threading.Thread(target=loop2, args=())
    t2.start()
    t2.setName("线程2")
    t3 = threading.Thread(target=loop3, args=())
    t3.start()
    t3.setName("线程3")
    time.sleep(3)
    for thr in threading.enumerate():
        print("当前运行的线程有{}".format(thr.getName()))
    print("运行的线程数量为{}".format(threading.activeCount()))
    
    
    t1.join()
    t2.join()
    t3.join()
    
    print("主程序结束", time.ctime())

if __name__ == "__main__":
    main()
    while True:
        time.sleep(10)

主程序启动
线程1启动
线程2启动
线程3启动
线程2结束
当前运行的线程有<_MainThread(MainThread, started 5324)>
当前运行的线程有<Thread(Thread-4, started daemon 7392)>
当前运行的线程有<Heartbeat(Thread-5, started daemon 1472)>
当前运行的线程有<HistorySavingThread(IPythonHistorySavingThread, started 8228)>
当前运行的线程有<ParentPollerWindows(Thread-3, started daemon 6308)>
当前运行的线程有<Thread(线程1, started 6372)>
当前运行的线程有<Thread(线程3, started 6920)>
运行的线程数量为7
线程1结束
线程3结束
主程序结束 Sun Sep  2 21:14:09 2018


KeyboardInterrupt: 

In [31]:
# 工业风案例
import threading
from time import ctime, sleep

#loop = [4, 2]
class ThreadFunc:
    def __init__(self, name):
        self.name = name
    def loop(self, nloop, nsec):
        print("开始线程", nloop, ctime())
        sleep(nsec)
        print("结束线程", nloop, ctime())
    
def main():
    print("主程序启动", ctime())
    
    t = ThreadFunc("loop")
    t1 = threading.Thread(target = t.loop, args=("LOOP1", 4))
    t2 = threading.Thread(target = ThreadFunc("loop").loop, args=("LOOP2", 2))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("主程序结束", ctime())
    
if __name__ == "__main__":
    main()


主程序启动 Sun Sep  2 21:41:32 2018
开始线程 LOOP1 Sun Sep  2 21:41:32 2018
开始线程 LOOP2 Sun Sep  2 21:41:32 2018
结束线程 LOOP2 Sun Sep  2 21:41:34 2018
结束线程 LOOP1 Sun Sep  2 21:41:36 2018
主程序结束 Sun Sep  2 21:41:36 2018


## 共享变量
 - 共享变量: 当多个线程访问一个变量的时候, 会产生共享变量的问题
 - 解决变量问题: 锁, 信号灯
     - 锁
         - 是一个标志, 表示一个线程在占用一些资源
         - 使用方法
             - 上锁
             - 使用共享资源, 放心的用
             - 取消锁, 释放锁
             - 看下面案例
             - 锁不是锁住谁 而是一个令牌
 - 线程安全问题:
     - 如果一个资源/变量, 他对与多线程来讲, 不用加锁也不会引起任何问题, 则称为线程安全
     - 线程不安全变量类型: list, set, dict
     - 线程安全变量类型: queue
 - 生产者, 消费者问题
     - 一个模型 用来搭建消息队列
     - queue 是一个用来存放变量的数据结构, 特点是先进先出, 内部元素排队, 可以理解成一个特殊的list
 - 死锁问题
 - 锁的等待时间问题
 - semaphore
     - 允许一个资源最多有几个线程同时使用

 

In [52]:
# 共享变量问题
import time
Sum = 0
loopsum = 60
def myAdd():
    global Sum, loopsum
    for i in range(loopsum):
        Sum += 1
        time.sleep(0.1)
 
def myMinu():
    global Sum, loopsum
    for i in range(loopsum):
        Sum -= 1
        time.sleep(0.1)

if __name__ == "__main__":
    t = time.time()
    print(Sum)
    myAdd()
    myMinu()
    print(Sum,time.time() - t)

0
0 13.012259244918823


In [36]:
import threading
# 共享变量问题
# 发现结果变乱了
Sum = 0
loopsum = 1000000
def myAdd():
    global Sum, loopsum
    for i in range(loopsum):
        Sum += 1
    print(Sum)    
def myMinu():
    global Sum, loopsum
    for i in range(loopsum):
        Sum -= 1
    print(Sum)
if __name__ == "__main__":
    print(Sum)
    t1 = threading.Thread(target=myAdd, args=())
    t2 = threading.Thread(target=myMinu, args=())
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(Sum)

0
-712486
-413208
-413208


In [53]:
# 上锁案例
import threading
import time
Sum = 0
loopsum = 60
lock = threading.Lock()
def myAdd():
    global Sum, loopsum
    for i in range(loopsum):
        lock.acquire()
        Sum += 1
        lock.release()
        time.sleep(0.1)
      
def myMinu():
    global Sum, loopsum
    for i in range(loopsum):
        lock.acquire()
        Sum -= 1
        lock.release()
        time.sleep(0.1)
if __name__ == "__main__":
    t = time.time()
    print(Sum)
    t1 = threading.Thread(target=myAdd, args=())
    t2 = threading.Thread(target=myMinu, args=())
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(Sum, time.time() - t)

0
0 6.523836851119995


In [66]:
# 消费者, 生产者问题
import queue
import threading
import time

class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count +=1
                    msg = "生成产品" + str(count)
                    # put 向queue里放入一个值
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(threading.Thread):
    def rum(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    # queue.get() 取出一个值
                    msg = self.name + "消费了" + queue.get()
                    print(msg)
            time.sleep(1)
            
if __name__ == "__main__":
    queue = queue.Queue()
    for i in range(500):
        queue.put("初始产品", i)
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()
    

生成产品1
生成产品2
生成产品3
生成产品4
生成产品5
生成产品6
生成产品7
生成产品8
生成产品1生成产品9
生成产品10
生成产品11
生成产品12
生成产品13
生成产品14
生成产品15
生成产品16
生成产品17
生成产品18
生成产品19
生成产品20

生成产品2
生成产品3
生成产品4
生成产品5生成产品21
生成产品22
生成产品23
生成产品24
生成产品25
生成产品26
生成产品27
生成产品28
生成产品29
生成产品30
生成产品31
生成产品32
生成产品33
生成产品34
生成产品35
生成产品36
生成产品37
生成产品38
生成产品39
生成产品40
生成产品41
生成产品42
生成产品43
生成产品44
生成产品45
生成产品46
生成产品47
生成产品48
生成产品49
生成产品50
生成产品51
生成产品52
生成产品53
生成产品54
生成产品55
生成产品56
生成产品57
生成产品58
生成产品59
生成产品60
生成产品61
生成产品62
生成产品63
生成产品64
生成产品65
生成产品66
生成产品67
生成产品68
生成产品69
生成产品70
生成产品71
生成产品72
生成产品73
生成产品74
生成产品75
生成产品76
生成产品77
生成产品78
生成产品79
生成产品80
生成产品81
生成产品82
生成产品83
生成产品84
生成产品85
生成产品86
生成产品87
生成产品88
生成产品89
生成产品90
生成产品91
生成产品92
生成产品93
生成产品94
生成产品95
生成产品96
生成产品97
生成产品98
生成产品99
生成产品100

生成产品6
生成产品7
生成产品8
生成产品9
生成产品10
生成产品11
生成产品12
生成产品13
生成产品14
生成产品15
生成产品16
生成产品17
生成产品18
生成产品19
生成产品20
生成产品21
生成产品22
生成产品23
生成产品24
生成产品25
生成产品26
生成产品27
生成产品28
生成产品29
生成产品30
生成产品31
生成产品32
生成产品33
生成产品34
生成产品35
生成产品36
生成产品37
生成产品38
生成产品39
生成产品40
生成产品41
生成产品42
生成产品43
生成产品44
生成产品45
生成

In [None]:
# 死锁 问题
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def func1():
    print("程序1启动")
    lock_1.acquire()
    print("锁1申请")
    time.sleep(2)
    lock_2.acquire()
    print("锁2申请")
    lock_2.release()
    print("锁2解锁")
    lock_1.release()
    print("锁1解锁")
    print("程序1结束")
    
def func2():
    print("程序2启动")
    lock_2.acquire()
    print("锁2申请")
    time.sleep(4)
    lock_1.acquire()
    print("锁1申请")
    lock_1.release()
    print("锁1解锁")
    lock_2.release()
    print("锁2解锁")
    print("程序2结束")
        
if __name__ == "__main__":
    print("主程序启动")
    t1 = threading.Thread(target = func1, args=())
    t2 = threading.Thread(target = func2, args=())
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("主程序结束")

主程序启动
程序1启动
锁1申请
程序2启动
锁2申请


In [None]:
# 锁等待时间问题
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def func1():
    print("程序1启动")
    lock_1.acquire(timeout=6)
    print("锁1申请")
    time.sleep(2)
    
    rst = lock_2.acquire(timeout=6)
    if rst:
        print("锁2申请")
        
        lock_2.release()
        print("锁2解锁")
    else:
        print("锁2没有申请到")
    lock_1.release()
    print("锁1解锁")
    print("程序1结束")
    
def func2():
    print("程序2启动")
    lock_2.acquire()
    print("锁2申请")
    time.sleep(4)
    lock_1.acquire()
    print("锁1申请")
    lock_1.release()
    print("锁1解锁")
    lock_2.release()
    print("锁2解锁")
    print("程序2结束")
        
if __name__ == "__main__":
    print("主程序启动")
    t1 = threading.Thread(target = func1, args=())
    t2 = threading.Thread(target = func2, args=())
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("主程序结束")

In [None]:
import threading
import time
semp = threading.Semaphore(3)
# 最多同时允许三个 线程运行
def func():
    if semp.acquire():
        print(threading.currentThread().getName() + "get")
        time.sleep(10)
        semp.release()
        print(threading.currentThread().getName() + "release ")

if __name__ == "__main__":        
    for i in range(8):
        t1 = threading.Thread(target = func)
        t1.start()

### threading.Timer()
 - 在指定的秒数后启动一个功能

In [None]:
import threading
import time
def func():
    print("启动")
    time.sleep(4)
    print("结束")
if __name__ == "__main__":
    t1 = threading.Timer(5, func)
    t1.start()
    i = 0
    while i < 10:
        print("{}------".format(i))
        time.sleep(1)
        i += 1

### 可重入锁
 - 一个锁, 可以被一个线程多次申请
 - 主要解决递归调用的时候, 需要申请锁的情况
 

In [None]:
# 可重入锁问题
import time
import threading

num = 0
lock_1 = threading.Lock()
class MyThr(threading.Thread):
    def run(self):
        global num
        time.sleep(1)
        if lock_1.acquire():
            num += 1
            msg = selg.name + "---" + str(num)
            print(msg)
            lock_1.acquire()
            lock_1.release()
            lock_1.release()
 
def main():
    for i in range(5):
        t1 = MyThr()
        t1.start()

if __name__ == "__main__":
    main()
    

In [None]:
# 问题解决
import time
import threading

num = 0
lock_1 = threading.RLock()
class MyThr(threading.Thread):
    def run(self):
        global num
        time.sleep(1)
        if lock_1.acquire():
            num += 1
            msg = self.name + "---" + str(num)
            print(msg)
            lock_1.acquire()
            lock_1.release()
            lock_1.release()
 
def main():
    for i in range(5):
        t1 = MyThr()
        t1.start()

if __name__ == "__main__":
    main()
    

###  线程替代方法
 - subprocess
     - 完全跳过线程, 使用进程
     - 是派生进程的主要替代方案
     - python2.4 引入
 - multiprocessing
     - 使用threading接口派生, 使用子进程
     - 允许为多核或者多cpu 派生进程, 接口跟threading 非常相似
     - python 2.6
     
 - concurrent.futures
     - 新的异步执行模块
     - 任务级别操作
     - python 3.2 引入
## 多进程
- 进程之间通讯(InterprocessCommunication, IPC)
- 进程之间无任何共享状态
- 进程的创建
    - 案例 直接生成实例对象,
    - 生成派生对象
    

In [19]:
# 多进程案例
from multiprocessing import Process
from time import ctime , sleep

def clock(interval):
    while True:
        print("现在的时间是", ctime())
        sleep(interval)

  
if __name__ == "__main__":
    t1 = Process(target = clock, args=(5,))
    t1.start()

    
    

In [21]:
# 多进程派生对象
import multiprocessing 
from time import ctime, sleep
class clockProcess(multiprocessing.Process):
    
    def __init__(self, interval):
        super().__init__()
        self.interval = interval
    def run(self):
        while True:
            print("现在的时间是", ctime())
            sleep(self.interval)
            
if __name__ == "__main__":
    t1 = clockProcess(5)
    t1.start()
            
        

### 在os 中 查看pid, ppid 以及他们的关系
    - 案例

In [22]:
# 案例
from multiprocessing import Process
import os
def info(title):
    print(title)
    print("现在是", __name__)
    print("进程父ID", os.getppid())
    print("进程ID", os.getpid())
    
def func(name):
    info("func 函数")
    print("你好", name)
    
if __name__ == "__main__":
    info("main line")
    t1 = Process(target = func, args=("bob", ))
    t1.start()
    t1.join()

main line
现在是 __main__
进程父ID 3616
进程ID 4384


### 生产者消费者模型
 - JoinableQueue
     - 队列中哨兵使用
     - 加入一个None
     - 多少哨兵 加入
  

In [None]:
# 案例
import multiprocessing
from time import ctime
def consumer(input_q):
    print("消费者进程启动", ctime())
    while True:
        item = input_q.get()
        print("拿出来从", item, ctime())
    print("结束消费者进程")
    
def producer(sequence, output_q):
    print("生产者进程启动", ctime())
    for item in sequence:
        output_q.put(item)
        print("put", item, "into q")
    print("输出完毕", ctime())
    
if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    cons_q = multiprocessing.Process(target=consumer, args=(q, ))
    cons_q.daemon = True
    cons_q.start()
    sequence = [1, 2, 3, 4]
    producer(sequence, q)
    q.join()

生产者进程启动 Mon Sep  3 16:22:17 2018
put 1 into q
put 2 into q
put 3 into q
put 4 into q
输出完毕 Mon Sep  3 16:22:17 2018


In [None]:
import multiprocessing
from time import ctime


def consumer(input_q):
    print("消费者进程启动", ctime())
    while True:

        item = input_q.get()
        if item is None:
            break
        print("拿出来从", item, ctime())
    print("结束消费者进程")


def producer(sequence, output_q):
    print("生产者进程启动", ctime())
    for item in sequence:
        output_q.put(item)
        print("put", item, "into q")
    print("输出完毕", ctime())


if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    cons_q = multiprocessing.Process(target=consumer, args=(q,))
    cons_q.daemon = True
    cons_q.start()

    sequence = [1, 2, 3, 4]
    producer(sequence, q)
    q.put(None)
    cons_q.join()

In [None]:
# 多个消费者 设置哨兵
import multiprocessing
from time import ctime


def consumer(input_q):
    print("消费者进程启动", ctime())
    while True:

        item = input_q.get()
        if item is None:
            break
        print("拿出来从", item, ctime())
    print("结束消费者进程")


def producer(sequence, output_q):
    print("生产者进程启动", ctime())
    for item in sequence:
        output_q.put(item)
        print("put", item, "into q")
    print("输出完毕", ctime())


if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    cons_q = multiprocessing.Process(target=consumer, args=(q,))
    cins_q1 = multiprocessing.Process(target=consumer, args=(q,))
    cons_q.daemon = True
    cons_q.start()
    cons_q1.daemon = True
    cons_q1.start()

    sequence = [1, 2, 3, 4]
    producer(sequence, q)
    q.put(None)
    q.put(None)
    cons_q.join()
    cons_q1.join