In [2]:
# 在多线程中更改全局变量必须加锁，而局部变量不用
# 但是局部变量传递起来很麻烦
# 因此ThreadLocal应运而生

In [4]:
import threading

# 创建全局ThreadLocal对象
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))
    
def process_thread(name):
    # 绑定ThreadLocal的student
    local_school.student = name
    process_student()
    
t1 = threading.Thread(target=process_thread, args=('Alice',), name = 'Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name = 'Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)


In [5]:
# local_school是一个ThreadLocal对象，每个Thread对他都可以读写student属性，但互不影响
# local_school是一个全局变量，但是对于每个线程的局部变量，可以任意读写，互不干扰，也不用管理锁的问题
# ThreadLocal常用于线程绑定数据库、HTTP请求、用户身份信息等。
# 这样就解决了参数在一个线程的各个函数之间传递的问题

In [6]:
# 进程 Vs. 线程

In [7]:
# 多进程模式优点
# 多进程的有点是稳定性高，因为一个子进程崩溃了，不会影响主进程和其他子进程。
# 主进程只负责分配任务，挂掉的可能性低。著名的Apache最早就是采用多进程模式

In [8]:
# 多进程模式缺点
# 创建线程的代价太大，操作系统由于内存和CPU的限制，能同时运行的进程数有限

In [9]:
# 多线程模式优点
# 速度快，但快不了多少

In [10]:
# 多线程模式缺点
# 任何一个线程挂掉都可能直接造成整个进程崩溃，因为所有线程共享进程的内存

In [11]:
# 计算密集型 Vs. IO密集型
# 计算密集型很吃CPU资源，因此采用C语言这种高效率的语言很重要
# IO密集型的CPU消耗很少，任务的大部分时间都是等待IO操作完成，因此最适合开发效率最高（代码量最少）的语言，脚本语言首选

In [12]:
# 分布式进程

In [13]:
# 在Process和Thread中有限选Process，不仅稳定，而且可以分布到多台机器上
# multiprocessing的managers模块封装了网络通信的细节，可以很容易讲任务分不到其他多个进程中，编写分布式多进程程序

In [14]:
# 将当前的Queue通过网络暴露出去

In [1]:
import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue= queue.Queue()

# 继承BaseManager
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上，callable参数关联了Queue对象
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000,设置验证码'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue
manager.start()
# 获得通过网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去
for i in range(10):
    n = random.randint(0, 10000)
    print('put task %d...' % n)
    task.put(n)
# 从result队列读取结果
print('Try get results....')
for i in range(10):
    r = result.get(timeout=10)
    print('Result:%s' % r)
# 关闭
manager.shutdown()
print('master exit.')


put task 945...
put task 8409...
put task 8787...
put task 4155...
put task 5624...
put task 1461...
put task 3574...
put task 922...
put task 7400...
put task 2009...
Try get results....


Empty: 

In [18]:
# 在另一台机器上部署接收信号，并处理的程序

In [2]:
import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue，所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器，也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

Connect to server 127.0.0.1...
run task 945 * 945...
run task 8409 * 8409...
run task 8787 * 8787...
run task 4155 * 4155...
run task 5624 * 5624...
run task 1461 * 1461...
run task 3574 * 3574...
run task 922 * 922...
run task 7400 * 7400...
run task 2009 * 2009...
worker exit.


In [None]:
# 通过类似的部署方式，即可实现主服务器负责部署任务，其他服务其负责处理任务
# 注意Queue的作用是用来传递任务和接收结果，每个任务的描述数据量要尽量小。
# 比如发送一个处理日志文件的任务，就不要发送几百兆的日志文件本身，而是发送日志文件存放的完整路径，由Worker进程再去共享的磁盘上读取文件。