### 0. Data structure
#### 0.1 heap & 优先队列
* 首先是队列
* 其次其访问次序不是FIFO, 而是根据定义的优先级来排序

##### 0.1.1 应用模式
```
server       <----           client requests
  |                                 |
scheduler  <--- priority queue   <-- importance priority   
```
* 优先队列需要动态的维护优先级次序，
* 会有很多插入，删除，排序
* 接口：get/insert(put)/delete
* stack/queue 都是优先队列的特殊情况

##### 0.1.2 基本实现
|结构|get_max| del_max |insert|
|----|---|---|---|
|vector|O(n)|O(n)|O(1)|
|sorted vector|O(1)|O(1)|O(n)
|list|O(n)|O(n)|O(1)
|BBST|log(n)|log(n)|log(n)
|完全二叉树/堆|O(1)|log(n)|log(n)

* blanced binary tree 的效率其实是被浪费了，因为永远只get or del **MAX**, 而bbst始终维护全序关系，只用了2/3的BBST

##### 0.1.3 完全二叉树实现 <=> 完全二叉堆
* 逻辑上，等同于完全二叉树
* 物理上用vector实现

![Complete binary tree - vector impletementation](img/complete_bt.png)

###### 0.1.4 堆序性
* H[I] <= H[Parent(i)]
* 堆序性保证，根节点是最大的节点。
* get_max 就肯定是vector[0]

##### 0.1.5 完全二叉堆 插入与堆序性
* popup, 如大于parent，则递归的和parent互换
* log(n)

##### 0.1.6 完全二叉堆 删除与堆序性
* top-down 下滤，把末尾元素popup到定，然后把top元素向下换

In [7]:
import heapq
nums = [3,10,1000,-99,4,100]

* heapify 拿一个iterable 变成一个逻辑上的二叉堆

In [8]:
heapq.heapify(nums)
print(nums)

[-99, 3, 100, 10, 4, 1000]


In [9]:
heapq.heappop(nums)

-99

#### 0.2 双向队列 - deque
* deque是双端队列，可以在两端扩展和收缩
* 允许元素随机读取


* deque 像是一个linked-list 和 vector的混合产品，双端插入删除效率高，且支持随机读取

In [11]:
from collections import deque

dq = deque()
dq.extend([2, 3, 4])

dq.append(5)
dq.appendleft(1)

* deque 随机存取

In [12]:
dq[2]

3

In [13]:
print(dq.pop())
print(dq.popleft())

5
1


#### 0.3 ordereddict - BST

* 利用ordereddict实现LRU

In [14]:
from collections import OrderedDict
class LRU(OrderedDict):
    'Limit size, evicting the least recently looked-up key when full'

    def __init__(self, maxsize=128, *args, **kwds):
        self.maxsize = maxsize
        super().__init__(*args, **kwds)

    def __getitem__(self, key):
        value = super().__getitem__(key)
        self.move_to_end(key)
        return value

    def __setitem__(self, key, value):
        super().__setitem__(key, value)
        if len(self) > self.maxsize:
            oldest = next(iter(self))
            del self[oldest]

#####  0.4 bisect
模块能够提供保持list元素序列的支持。它使用了二分法完成大部分的工作。它在向一个list插入元素的同时维持list是有序的。在某些情况下，这比重复的对一个list进行排序更为高效，并且对于一个较大的list来说，对每步操作维持其有序也比对其排序要高效。

##### 0.5 counter

##### 0.6 Weakref
weakref模块能够帮助我们创建Python引用，却不会阻止对象的销毁操作

### I. 元编程
#### 1.1 装饰器
##### 1.1.1 装饰函数
下面两个例子等价

In [15]:
class A:
    @classmethod
    def method(cls):
        pass
    
class B:
    def method(cls):
        pass
    method = classmethod(method)

* 用wraps decorator保留函数的元信息，名字，文档，签名etc

In [16]:
from functools import wraps
import time

def timethis(func):
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        print("Running time ", time.time() - start)
        return result
    
    return wrapper

In [17]:
@timethis
def run(app_name):
    '''start a app'''
    time.sleep(1)
    return app_name

In [18]:
run("app")
print(run.__name__, run.__doc__)

Running time  1.0000245571136475
run start a app


#### 1.1.2 装饰一个类

In [11]:
import types

class Profiled:
    def __init__(self, func):
        self.wrapped = wraps(func)(self)
        self.ncalls = 0
    
    def __call__(self, *args, **kwargs):
        self.ncalls += 1
        return self.wrapped(*args, **kwargs)
    
    def __get__(self, instance, cls):
        if instance is None:
            return self 
        else:
            return types.MethodType(self, instance)

In [12]:
@Profiled
def add(x, y):
    return x + y

In [13]:
# add(1, 2)

#### 1.2 Descriptor
描述符是实现了特定协议的类，包括：\__get\__, \__set\__, \__delete\__

在实现 保护属性不受修改 、 属性类型检查 的基本功能，同时有大大提高代码的复用率。

##### 1.2.1作用
* 弥补property的不足, 如果用property实现下面的功能，那么我们需要2组setter去分别检查weight，price，不能复用
* 设置属性不能被删除？那定义_delete_方法，并raise 异常。 
* 还可以设置只读属性，\__set\__, raise exception


In [14]:
class Quantity(object):

    def __init__(self, storage_name):
        self.storage_name = storage_name

    def __get__(self, instance, owner):
        # return instance.__dict__[self.storage_name]
        return getattr(self, self.storage_name)

    def __set__(self, instance, value):
        if value <= 0:
            raise ValueError("value must possive")

        # instance.__dict__[self.storage_name] = value
        setattr(self, self.storage_name, value)


class LineItem(object):
    weight = Quantity("weight")
    price = Quantity("price")

    def __init__(self, weight, price):
        self.weight = weight
        self.price = price

In [15]:
l1 = LineItem(1, 2)
print(l1.weight, l1.price)

# l2 = LineItem(-1, 0)   ValueError: value must possive

1 2


#### 1.3 元类
* metaclass这个词，模糊地描述了一种高于类，而又超乎类的概念。
* 我们把python的class语句转义为元类，令其在定义类型时都会提供独特的行为

##### 1.3.1 用元类来验证子类
* 通过元类，可以再生成子类对象之前，定义类时，先验证子类的定义是否合乎规范
* py2/3语法不同
* python把子类的整个class语句体处理完，就会调用元类的\__new\__方法

In [19]:
class Meta(type):
    def __new__(meta, name, bases, class_dict):
        print("In Meta: ", meta, name, bases, class_dict)
        
        if "foo" not in class_dict:
            print("Invalid class for Meta")
            
        return type.__new__(meta, name, bases, class_dict)
    
class MyClass(object, metaclass=Meta):
    print("BEFORE")
    def foo(self):
        pass
    print("AFTER")

BEFORE
AFTER
In Meta:  <class '__main__.Meta'> MyClass (<class 'object'>,) {'__module__': '__main__', '__qualname__': 'MyClass', 'foo': <function MyClass.foo at 0x00000176B70ED2F0>}


In [20]:
class MyClass2(object, metaclass=Meta):
    pass

In Meta:  <class '__main__.Meta'> MyClass2 (<class 'object'>,) {'__module__': '__main__', '__qualname__': 'MyClass2'}
Invalid class for Meta


##### 1.3.2 用元类来注册子类
* 可以在程序中自动注册类型，在需要方向查找(reverse lookup)的场合，非常有用
* 可以在简单的标识符与对应的类之间建立映射关系


* 在下例中需要一个registry去维护需要被deserialize的类名，
* 就可以不用 BetterPoint2D.deserialize(data) 这种形式，而是更generic的形式来调用deserialize方法
* 需要把register_class 放在一个地方，不会被忘记调用

In [21]:
import json

registry = {}

def register_class(target_class):
    registry[target_class.__name__] = target_class
    
    
class Meta(type):
    def __new__(meta, name, bases, class_dict):
        # 真的在内存里实例化这个实例
        cls = type.__new__(meta, name, bases, class_dict)
        register_class(cls)         # 注册类！
        return cls

    
def deserialize(data):
    params = json.loads(data)
    target_class = registry[params["class"]]
    return target_class(*params["args"])
    
    
class Serializable(object):
    
    def __init__(self, *args):
        self.args = args
    
    def serialize(self):
        return json.dumps({
                'class': self.__class__.__name__,
                'args':self.args})
    
       
class Point2D(Serializable, metaclass=Meta):
    
    def __init__(self, x, y):
        super(Point2D, self).__init__(x, y)
        self.x = x
        self.y = y
        
    def __repr__(self):
        return "Point({}, {})".format(self.x, self.y)

* Point2D 继承Serizlizable，但是元类时Meta

In [22]:
point = Point2D(2, 4)
data = point.serialize()
print(data)

{"class": "Point2D", "args": [2, 4]}


In [23]:
print("After deserialize", deserialize(data))

After deserialize Point(2, 4)


##### 1.3.2 用元类来注解类的属性

### II. python 技术
#### 2.1 迭代器和生成器

#### 2.2 contextmanager

##### 2.3 asyncio 异步编程

https://www.cnblogs.com/zhaof/p/8490045.html

https://zhuanlan.zhihu.com/p/25228075

https://www.cnblogs.com/earendil/p/7411115.html

https://zhuanlan.zhihu.com/p/28993491

* 实现异步有两个底层的东西
    * select - OS提供方式 告诉应用程序，我有空闲
        * 判断非阻塞调用是否就绪如果 OS 能做，是不是应用程序就可以不用自己去等待和判断了，就可以利用这个空闲去做其他事情以提高效率。所以OS将I/O状态的变化都封装成了事件，如可读事件、可写事件。并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。让应用程序可以通过select注册文件描述符和回调函数。当文件描述符的状态发生变化时，select 就调用事先注册的回调函数。

        * select因其算法效率比较低，后来改进成了poll，再后来又有进一步改进，BSD内核改进成了kqueue模块，而Linux内核改进成了epoll模块。这四个模块的作用都相同，暴露给程序员使用的API也几乎一致，区别在于kqueue 和 epoll 在处理大量文件描述符时效率更高。鉴于 Linux 服务器的普遍性，以及为了追求更高效率，所以我们常常听闻被探讨的模块都是 epoll 。
    *  操作系统的Interrupts和Traps机制
        * 操作系统挂起进程，保存上下文，切换回来自动运行
        
* 而所谓的异步编程，是给用户主动挂起，主动切换程序执行的方法
    * 协程 - 协程级别的切换比Thread更细，它并没有spawn一个新的线程，但实现了 OS run/trap的切换
    * 线程，通过线程实现，但是线程的启动通信是有cost的，并且由于python GIL的存在，线程不能实现CPU并行，因为python同一时间只能有一个线程运行。所以又不能CPU并行，cost又高，在IO密集操作中，它并没有协程好用！！
    * 进程

* python的支持
    * select.select
    * yield
    * yield from
    * async/await
    * asyncio

![OS executes](img/os.png)

##### 2.3.1 Non-blocking
###### 2.3.1.1 epoll
判断非阻塞调用是否就绪如果 OS 能做，是不是应用程序就可以不用自己去等待和判断了，就可以利用这个空闲去做其他事情以提高效率。

所以OS将I/O状态的变化都封装成了事件，如可读事件、可写事件。并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。让应用程序可以通过select注册文件描述符和回调函数。当文件描述符的状态发生变化时，select 就调用事先注册的回调函数。

select因其算法效率比较低，后来改进成了poll，再后来又有进一步改进，Linux内核改进成了epoll模块。

###### 2.3.1.2 回调 - Callback
把I/O事件的等待和监听任务交给了 OS，那 OS 在知道I/O状态发生改变后（例如socket连接已建立成功可发送数据），它又怎么知道接下来该干嘛呢？回调。

需要我们将发送数据与读取数据封装成独立的函数，让epoll代替应用程序监听socket状态时，得告诉epoll：“如果socket状态变为可以往里写数据（连接建立成功了），请调用HTTP请求发送函数。如果socket 变为可以读数据了（客户端已收到响应），请调用响应处理函数。

###### 2.3.1.3 事件循环 - Event Loop
为了解决上述问题，我们写一个循环，去访问selector模块，等待它告诉我们当前是哪个事件发生了，应该对应哪个回调。这个等待事件通知的循环，称之为事件循环。

selector.select()是一个阻塞调用，因为如果事件不发生，那应用程序就没事件可处理，所以就干脆阻塞在这里等待事件发生。所以，selector机制(后文以此称呼代指epoll/kqueue)是设计用来解决大量并发连接的。当系统中有大量非阻塞调用，能随时产生事件的时候，selector机制才能发挥最大的威力。

##### 2.3.1.4 原始的selector实现

Python标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。

DefaultSelector类会根据 OS 环境自动选择最佳的模块，

那在 Linux 2.5.44 及更新的版本上都是epoll了。

In [3]:
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()


def recieve_line(sock):
    chars = []
    while True:
        c = sock.recv(1)
        if not c:
            break
        
        chars.append(c)
        if c == b'\n':
            break
    print("Recieved data ", chars)
    return b''.join(chars)


class Server(object):
    
    def __init__(self, ip, port):
        self.addr = (ip, port)
        self.start()
        
    def start(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        sock.bind(self.addr)
        sock.listen(64)                                                                                                      
        print("Server::: Listen at {}\n".format(self.addr))                                                                        

        selector.register(sock, EVENT_READ, self.accept)
        
    
    def accept(self, sock, mask):
        conn, socketname = sock.accept()
        print("server sock fileno", sock.fileno(), " file no accepted ", conn.fileno())
        print("Accepted connection ", conn, socketname)
        conn.setblocking(False)
        selector.register(conn, EVENT_READ, self.connected)
        
    def connected(self, conn, mask):
        print(conn, mask)
        data = recieve_line(conn)
        print(data)
        if not data:
            print("Closing connection ", conn)
            selector.unregister(conn)
            conn.close()

# get event loop
def run_forever():
    while True:
        events = selector.select()
        if not events:
            raise SelectTimeout('轮询超时')
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key.fileobj, event_mask)

In [4]:
s = Server("127.0.0.1", 9283)

Server::: Listen at ('127.0.0.1', 9283)



In [None]:
run_forever()

###### 2.3.1.5 asyncio
异步io的本质是对系统的基本的异步io函数epoll, selector的封装。 但凡异步必将涉及事件-回调，而裸的事件回调会使代码复杂，比如又要考虑出现异常怎么办，回调中套回调， 回调间数据怎么共享，怎么判断谁触发回调都很麻烦。
asyncio这个库就是使用python的yield这个可以打断保存当前函数的上下文的机制， 封装好了selector 摆脱掉了复杂的回调关系

In [None]:
class Future:
    
    def __init__(self):
        self.result = None
        self._callbacks = []
    
    def add_done_callback(self, func):
        self._callbacks.append(func)
    
    def set_result(self, result):
        self.result = result
        for func in self._callbacks:
            func(self)
            
class Task:
    
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)
    
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return 
        
        next_future.add_done_callback(self.step)

In [1]:
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)
        
loop = asyncio.get_event_loop()
loop.run_until_complete(display_date(loop))
loop.close()

2019-06-12 11:02:55.542137
2019-06-12 11:02:56.542492
2019-06-12 11:02:57.542523
2019-06-12 11:02:58.542563
2019-06-12 11:02:59.543231


In [1]:
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    

async def say():
    
    print(f"started at time {time.strftime('%X')}")
    
    await say_after(1, "hello")
    await say_after(2, "world")
    
    print(f"finished at {time.strftime('%X')}")

* create and manage event loops, which provide asynchronous APIs for networking, running subprocesses, handling OS signals, etc;
* implement efficient protocols using transports;
* bridge callback-based libraries and code with async/await syntax.

In [3]:
import asyncio

class AsyncioServer(asyncio.Protocol):
    
    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print("Accept connection from {}".format(self.address))
    
    def data_received(self, data):
        self.data += data
        if self.data.endswith(b'?'):
            anwser = self.data.replace(b'?', b'.')
            print("Answer ", anwser)
            self.transport.write(anwser)
            self.data = b''
    
    def connection_lost(self, exc):
        if exc:
            print("Client {} error: {}".format(self.address, exc))
        elif self.data:
            print("Client {} sent {} but then closed".format(self.address, self.data))
        else:
            print("Client {} closed socket".format(self.address))
            
def serve(address, server):
    loop = asyncio.get_event_loop()
    coro = loop.create_server(server, *address)
    server = loop.run_until_complete(coro)
    print("Listening at {}".format(address))
    try:
        loop.run_forever()
    finally:
        server.colse()
        loop.stop()

In [80]:
address = ("127.0.0.1", 9823)
serve(address, AsyncioServer)

NameError: name 'serve' is not defined

### III. 并发编程
#### 3.0 概念
##### 3.0.1 
###### 并发 vs 并行
* 并发：concurrent 在一个时间段内有几个程序在一个CPU上运行，但任意时刻只有一个程序在CPU上运行
* 并行：parallel，多个CPU

###### 同步 vs 异步
* 同步： 必须等待IO操作完成，再下一步，线性调用
* 异步： 不必等待IO造作完成，就返回的调用方式

###### blocking vs non-blocking
* blocking：调用函数是被挂起
* non-blocking：调用函数是线程不会被挂起，而是立即返回


#### 3.0.2 异步的重要应用 - 非阻塞IO 

#### 3.0.3 异步的重要应用 - select 来实现非阻塞IO
* 一般不直接用select，而是用包装好的selector
* DefaultSelector会根据OS来选择使用epoll，还是poll


* selector.register(fileno, EVENT_READ/EVENT_WRITE, callback)
* select 本身不支持register模式，是selector封装的
* socket状态变化以后的callback是程序员实现的


* key, mask = selector.select()
    * key 是你register的key object， 
    * callback 在key.data 里存着
    
见2.3.1.4 

* eventloop模式
    * 所有的异步都是一个模式 -- 事件循环
    * 不停的请求socket的状态并调用相应的callback

#### 3.1 协程 - coroutine 
##### 3.1.0 回调模式的痛点
* callback + evet loop
    * 程序可读性差
    * 共享状态管理困难，不同协程中怎么共享数据
    * 异常处理困难

* 协程可以解决这个问题
    * 回调模式编码复杂度高
    * 同步模式的并发性不高
    * 如果用多线程来做，需要用lock，开销大
* 方案
    * 采用同步方式去编写异步的代码
    * 使用单线程去切换任务
        * 线程是由操作系统切换的，单线程意味着程序员自己去调度任务
        * 不需要锁，并发性高
        * 在线程内切换函数，性能远高于线程之间切换效率更高，并发性更高
        
* 效果：
    * 在单线程中，自己通过程序来挂起/恢复函数
    * => coroutine

##### 什么是协程
* 如何利用8核CPU, 64G 内存，在10gbps的网络上维持100W并发连接



* coroutine 更多的是挂起一个操作，来实现异步！！！
* 绿色线程，轻量级的线程。协程，又称作Coroutine。从字面上来理解，即协同运行的例程，它是比是线程（thread）更细量级的用户态线程，特点是允许用户的主动调用和主动退出，挂起当前的例程然后返回值或去执行其他任务，接着返回原来停下的点继续执行。
* 实现原理
    * 通过用yield语句。其实这里我们要感谢操作系统（OS）为我们做的工作，因为它具有getcontext和swapcontext这些特性，通过系统调用，我们可以把上下文和状态保存起来，切换到其他的上下文，这些特性为coroutine的实现提供了底层的基础。操作系统的Interrupts和Traps机制则为这种实现提供了可能性

In [2]:
import time

def get_html(url):
    time.sleep(3)

    return "mock data"
    

def parse_url(html):
    return html

# blocking way
def get_url(url):
    
    html = get_html(url)  # io cost high, 传统模式，blocking等待在这里
    
    # 希望可以在 get_html可以挂起，等到get_html返回值了之后，再切换回来
    # 继续运行后面的
    
    urls = parse_url(html)  # CPU cost high

###### 利用generator 来把上面的代码改造成协程

* attempt 1 failed, blocking

In [20]:
import time
import threading
from queue import Queue

queue = Queue()


def some_io_heavy_func():
    time.sleep(3)
    queue.put("mock data")
    

def get_html(url):
    print("Getting URL {} ".format(url))
    t1 = threading.Thread(target=some_io_heavy_func)
    t1.start()
    
def non_blocking_get_url(urls):
    for url in urls:
        yield from get_html(url)
    
    

def main():
    urls = ["baidu", "google", "fb"]
    for msg in non_blocking_get_url(urls):
        print(msg)
        # parse(data)

In [19]:
main()

Getting URL baidu 
baidu finished
Getting URL google 
google finished
Getting URL fb 
fb finished


##### generator 的一些特性
* 不仅可以产生值
* 还可以接收值

* 启动生成器支持的方法
    * next
    * send
    * close, close之后再next会报stopIteration异常
    * throw

* generatorEXIT继承自BaseException， 如果 except Exception, 抓不到异常的

In [54]:
def gen_func():
    # generator还可以接收返回值
    # 接受的值就是调用方法传回来的
    html = yield "http://baidu.com"  
    print("Generator get value ", html)
        
    yield 2
    return 4

gen = gen_func()
html = next(gen)
print(u"Send 也收到返回值", gen.send(html))


Generator get value  http://baidu.com
Send 也收到返回值 2


In [58]:
def gen_func():
    # generator还可以接收返回值
    # 接受的值就是调用方法传回来的
    try:
        html = yield "http://baidu.com"  
        print("Generator get value: ", html)
    except Exception as e:
        print("Got exception ", e)
        
    yield 2
    yield 3
    return 4

gen = gen_func()
next(gen)
gen.throw(Exception, "Mock Error") 
gen.close()

Got exception  Mock Error


* generator 可以给它发送值，他可以返回值，也可以抛出异常，
* 这就满足了对程序控制的基本需求

###### yield from
* yield from 一个generator

* chain 那多个可迭代对象 chain在一起

In [136]:
from itertools import chain

a_list = ["hello", "world"]
a_dict = {"a" : "ab"}

def gen():
    try:
        for i in chain(a_list, a_dict, range(5, 6)):
            recieved = yield i
            
            if recieved:
                print(recieved)
    except Exception as e:
        print("Some one send me exception: ", e)

def g1():
    yield from gen()
    
    
def main():
    g = g1()
    g.send(None)
    g.send(u"Send from 调用方")
    
    print(u"Call next() on 委托生成器:", next(g))
    
    try:
        g.throw(Exception, "Mock")
    except StopIteration as e:
        print(u"throw Exception, 该generator应该stop")

* main: 调用方
* g1: 委托生成器
* gen: 子生成器

* yield from 会在**调用方**和**子生成器**中间建立一个双向通道


* 所以调用方 send 是直接发给子生成器的，跳过了G1
* 所以调用方 throw 给子生成器异常

In [137]:
main()

Send from 调用方
Call next() on 委托生成器: a
Some one send me exception:  Mock
throw Exception, 该generator应该stop


In [79]:
for value in simple_case():
    print(value)

0
1


* 示例1，用coroutine代替线程

In [1]:
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
    print('Blastoff!')
    
    
def countup(n):
    x = 0
    while x < n:
        print('Conuting up', x)
        yield
        x += 1

In [2]:
from collections import deque

class TaskScheduler:
    
    def __init__(self):
        self._task_queue = deque()
        
    def new_task(self, task):
        '''Admit a newly started task to the scheduler'''
        self._task_queue.append(task)
    
    def run(self):
        while self._task_queue:
            task = self._task_queue.popleft()
            
            try:
                next(task)
                self._task_queue.append(task) # 轮询的运行queue里的协程
            except StopIteration:
                pass

In [3]:
scheduler = TaskScheduler()
scheduler.new_task(countdown(5))
scheduler.new_task(countup(6))
scheduler.run()

T-minus 5
Conuting up 0
T-minus 4
Conuting up 1
T-minus 3
Conuting up 2
T-minus 2
Conuting up 3
T-minus 1
Conuting up 4
Blastoff!
Conuting up 5


* 示例2

In [6]:
class ActorScheduler:
    
    def __init__(self):
        self._actors = {}
        self._msg_queue = deque()
        
    def new_actor(self, name, actor):
        self._msg_queue.append((actor, None))
        self._actors[name] = actor
       
    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))
            
    def run(self):
        while self._msg_queue:
            actor, msg = self._msg_queue.popleft()
            
            try:
                actor.send(msg)
                print("Sending ", msg, " to ", actor.__name__)
            except StopIteration:
                pass

In [7]:
def printer():
    while True:
        msg = yield
        print('Got: ', msg)

        
def counter(sched):
    
    while True:
        n = yield
        if n == 0:
            break
        
        sched.send('printer', n)
        
        sched.send('counter', n-1)

In [8]:
sched = ActorScheduler()
sched.new_actor('printer', printer())
sched.new_actor('counter', counter(sched))

sched.send('counter', 3)
sched.run()

Sending  None  to  printer
Sending  None  to  counter
Sending  3  to  counter
Got:  3
Sending  3  to  printer
Sending  2  to  counter
Got:  2
Sending  2  to  printer
Sending  1  to  counter
Got:  1
Sending  1  to  printer


###### SELECT
* 要理解select.select模块其实主要就是要理解它的参数, 以及其三个返回值。
* select()方法接收并监控3个通信列表， 第一个是所有的输入的data,就是指外部发过来的数据，第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息


###### 3.1.2 用协程实现produce/consume

```python
import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result)) 
```

```
>>> asyncio.run(print_sum(1, 2))
Compute 1 + 2 ...
1 + 2 = 3
```


#### 3.2 多线程

##### 3.2.0 线程之间共享的资源有哪些

###### 3.2.0.0  共享的资源
* a. 堆  
    * 由于堆是在进程空间中开辟出来的，所以它是理所当然地被共享的；因此new出来的都是共享的（16位平台上分全局堆和局部堆，局部堆是独享的）
* b. 全局变量 
    * 它是与具体某一函数无关的，所以也与特定线程无关；因此也是共享的
* c. 静态变量 
    * 虽然对于局部变量来说，它在代码中是“放”在某一函数中的，但是其存放位置和全局变量一样，存于堆中开辟的.bss和.data段，是共享的
* d. 文件等公用资源  
    * 这个是共享的，使用这些公共资源的线程必须同步。Win32 提供了几种同步资源的方式，包括信号、临界区、事件和互斥体。

###### 3.2.0.1 独享的资源
* a. 栈 
    * 栈是独享的
* b. 寄存器  
    * 这个可能会误解，因为电脑的寄存器是物理的，每个线程去取值难道不一样吗？其实线程里存放的是副本，包括程序计数器PC
 
###### 3.2.0.2 线程共享的环境包括：
* 进程代码段
* 进程的公有数据
    * (利用这些共享的数据，线程很容易的实现相互之间的通讯)
* 进程打开的文件描述符、信号的处理器、
* 进程的当前目录和进程用户ID与进程组ID。
 
###### 3.2.0.2 线程的个性
进程拥有这许多共性的同时，还拥有自己的个性。有了这些个性，线程才能实现并发性。


* 1.线程ID 
    * 每个线程都有自己的线程ID，这个ID在本进程中是唯一的。进程用此来标识线程。
* 2.寄存器组的值 
    * 由于线程间是并发运行的，每个线程有自己不同的运行线索，当从一个线 程切换到另一个线程上 时，必须将原有的线程的寄存器集合的状态保存，以便将来该线程在被重新切换到时能得以恢复。
* 3.线程的堆栈 
    * 堆栈是保证线程独立运行所必须的。线程函数可以调用函数，而被调用函数中又是可以层层嵌套的，所以线程必须拥有自己的函数堆栈， 使得函数调用可以正常执行，不受其他线程的影响。
* 4.错误返回码 
    * 由于同一个进程中有很多个线程在同时运行，可能某个线程进行系统调用 后设置了errno值，而在该 线程还没有处理这个错误，另外一个线程就在此时被调度器投入运行，这样错误值就有可能被修改。所以不同线程应该有自己的错误返回码变量。
* 5.线程的信号屏蔽码 
    * 由于每个线程所感兴趣的信号不同，所以线程的信号屏蔽码应该由线程自己管理。但所有的线程都 共享同样的信号处理器。
* 6.线程的优先级 
    * 由于线程需要像进程那样能够被调度，那么就必须要有可供调度使用的参数，这个参数就是线程的 优先级。

##### 3.2.1 threading.Thread

threading 库可以在单独的线程中执行任何的在Python 中可以调用的对象。你可以创建一个Thread 对象并将你要执行的对象以target 参数的形式提供给该对

Python 中的线程会在一个单独的系统级线程中执行（比如说一个POSIX 线程或者一个Windows 线程），这些线程将由操作系统来全权管理。

* python线程的问题
    * Python中的多线程因为GIL的存在，它们并不能利用CPU多核优势，一个Python进程中，只允许有一个线程处于运行状态
    * 在IO 密集的操作中，比如socket。在做阻塞的系统调用时，例如sock.connect(),sock.recv()时，当前线程会释放GIL，让别的线程有执行机会。但是单个线程内，在阻塞调用上还是阻塞的。
    * 所以IO 密集操作线程可以提高效率
    * 但是由于线程有切换开销，状态共享的问题，所以IO操作最好选择协程
* 竞态条件：
    * 线程是被OS调度，调度策略是抢占式的，以保证同等优先级的线程都有均等的执行机会，那带来的问题是：并不知道下一时刻是哪个线程被运行，也不知道它正要执行的代码是什么。所以就可能存在竞态条件。
    

In [111]:
import time

def count_down(n):
    while n>0:
        print("T-minus {}".format(n))
        n -= 1
        time.sleep(1)

* start
    * threads can only be started once
    * 线程一旦启动，将独立执行直到目标函数返回。
* is_alive
    * 你可以查询一个线程对象的状态，看它是否还在执行

In [120]:
from threading import Thread

t = Thread(target=count_down, args=(3,))
t.start()
print("\nStill running {}\n".format(t.is_alive()))

T-minus 3
Still running True


T-minus 2
T-minus 1


In [121]:
t.is_alive()

False

* join
    * 你也可以将一个线程加入到当前线程，并等待它终止：

In [122]:
t2 = Thread(target=count_down, args=(3,))
t2.start()
t2.join()
print("Waiting until finish")

T-minus 3
T-minus 2
T-minus 1
Waiting until finish


* daemon 后台线程
    * Python 解释器在所有线程都终止后才继续执行代码剩余的部分。对于需要长时间运行的线程或者需要一直运行的后台任务，你应当考虑使用后台线程。
    * 后台线程无法等待，不过，这些线程会在主线程终止时自动销毁
    

In [126]:
t_daemon = Thread(target=count_down, args=(3,), daemon=True)
t_daemon.start()

T-minus 3
T-minus 5
T-minus 4T-minus 2

T-minus 1T-minus 3

T-minus 2
T-minus 1


##### 3.2.2 线程简单通信
你无法结束一个线程，无法给它发送信号，无法调整它的调度，也无法执行其他高级操作。如果需要这些特性，你需要自己
添加。比如说，如果你需要终止线程，那么这个线程必须通过编程在某个特定点轮询来退出。你可以像下边这样把线程放入一个类

In [127]:
class CountdownTask(object):
    
    def __init__(self):
        self._running = True
    
    def terminate(self):
        self._running = False
    
    def run(self, n):
        while self._running and n>0:
            print("T-minus {}".format(n))
            n -= 1
            time.sleep(1)

In [129]:
c = CountdownTask()
t_could_terminate = Thread(target=c.run, args=(10, ))
t_could_terminate.start()
c.terminate()  # send signal, 一般情况，对于信号 或者任何多进程状态下修改数据，都要加锁！！！
t.join()

T-minus 10


##### 3.2.3 GIL
由于全局解释锁（GIL）的原因，Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以，Python 的线程更适用于处理I/O 和其他需要并发执行的阻塞操作（比如等待I/O、等待从数据库获取数据等等），而不是需要多处理器并行
的计算密集型任务

##### 3.2.4 线程同步机制
* 线程同步原语 -- 操作系统级别 ？？
    * lock/rlock/semaphore

* locks
* rlocks
* semaphores
* conditions
* events
* queues

https://www.cnblogs.com/Security-Darren/p/4732914.html

###### 3.2.4.1 locks
锁有两种状态：被锁（locked）和没有被锁（unlocked）。拥有acquire()和release()两种方法，并且遵循一下的规则：

###### 互斥锁
* 如果一个锁的状态是unlocked，调用acquire()方法改变它的状态为locked；
* 如果一个锁的状态是locked，acquire()方法将会阻塞，直到另一个线程调用release()方法释放了锁；
* 如果一个锁的状态是unlocked调用release()会抛出RuntimeError异常；
* 如果一个锁的状态是locked，调用release()方法改变它的状态为unlocked

###### 锁 + with 上下文管理
Lock 对象和with 语句块一起使用可以保证互斥执行，就是每次只有一个线程可以执行with 语句包含的代码块。with 语句会在这个代码块执行前自动获取锁，在执行结束后自动释放锁。
* 在with \__enter\__ 时调用 lock.acquire()
* 在with \__exit\__ 时调用 lock.release()

In [33]:
import threading 

class SharedCounter(object):
    
    def __init__(self, initial_value=0):
        self._value = initial_value
        self._value_lock = threading.Lock()
    
    # 方式一： 显式调用 aquire/release，获取和释放锁
    def incr(self, delta=1):
        self._value_lock.acquire()
        self._value += delta
        self._value_lock.release()
    
    # 方式二：利用上下文管理，with语句来释放/获取锁
    def decr(self, delta=1):
        with self._value_lock:
            self._value -= delta

* 二比一号是因为，RAII rule 如果中间有异常发生，则会导致release没有被调用 而造成死锁

In [35]:
shared_c = SharedCounter()
t1 = threading.Thread(target=shared_c.incr, args=(4,))
t2 = threading.Thread(target=shared_c.decr, args=(2,))

t1.start()
t2.start()

print(shared_c._value)
print("after 2 sec", time.sleep(2))
print(shared_c._value)

4
after 2 sec None
2


###### 3.2.4.2 防止死锁 -- 简单的死锁情况
* 两个线程，因为每个获取多个锁，1 获取a等待b， 2获取b等待a，造成死锁

* 当然如果每个进程只用一个锁，互斥锁 - mutex可以完美的解决问题
* 绝对不要嵌套调用with 去获取多个锁

** contextmanager**

* 我们可以通过给一个try…finally…结构的函数头部加上@contextmanager
* 就可以通过with…as…结构来调用它了
* 这样try块中yield的数据被as出来，finally块中的数据在with..as..块结束的时候被执行。
        

In [1]:
import threading
from contextlib import contextmanager

_local = threading.local()

@contextmanager
def acquire(*locks):
    # 给锁排序，按顺序获取和释放
    locks = sorted(locks, key=lambda x: id(x))
    
    acquired = getattr(_local, "acquired", [])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError("Lock Order Violation")
        
    # 获取所有锁
    acquired.extend(locks)
    _local.acquired = acquired
    
    try:
        for lock in locks:
            lock.acquire()
        
        yield  # with... as, 则as获取的值为None
        
    finally:
        # 释放所有锁
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]

In [2]:
import time

x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    counter = 0
    while True and counter < 3:
        with acquire(x_lock, y_lock) as a_value:
            
            print("Yield None, so should get ", a_value)
            print("Thread-1 Got lock")
            counter += 1
            time.sleep(1)
        

def thread_2():
    counter = 0
    while True and counter < 3:
        with acquire(y_lock, x_lock):
            print("Thread-2 Got lock")
            counter += 1

In [3]:
t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

Yield None, so should get  None
Thread-1 Got lock
Thread-2 Got lock
Thread-2 Got lock
Thread-2 Got lock
Yield None, so should get  None
Thread-1 Got lock
Yield None, so should get  None
Thread-1 Got lock


###### 3.2.5 rlock
可重入锁RLock
为解决同一线程种不能多次请求同一资源的问题，python提供了“可重入锁”：threading.RLock，RLock内部维护着一个Lock和一个counter变量，counter记录了acquire的次数，从而使得资源可以被多次require。直到一个线程所有的acquire都被release，其他的线程才能获得资源 。用法和threading.Lock类相同。
将上面迭代死锁的代码改写一下，就不会发生死锁，但注意，调用acquire和release的次数必须相等。

链接：https://www.jianshu.com/p/fc031cb03e29


##### 3.2.6 condition

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量，除了提供与Lock类似的acquire和release方法外，还提供了wait和notify方法。线程首先acquire一个条件变量，然后判断一些条件。如果条件不满足则wait；如果条件满足，进行一些处理改变条件后，通过notify方法通知其他线程，其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程，从而解决复杂的同步问题。



#### 3.2.7 进程间通信 -- queue.Queue
* 注意deque不是线程安全的，queue.Queue是

#### 3.2.8 线程池

#### 3.3 多进程
* multiprocessing
* concurrent.futures
    * ThreadPoolExecutor
        * 来做一个线程池
    * ProcessPoolExecutor
        * 可被用来在一个单独的Python 解释器中执行计算密集型函数。