In [None]:
# 单例模式——所有生成实例都指向同一个对象

# Method 1
class Singleton(object):
    attr = None     # 类属性
    def __init__(self):
        print("Do something.")
    
    def __new__(cls, *args, **kwargs):  # 重载
        if not cls.attr:
            cls.attr = super(Singleton, cls).__new__(cls)
        return cls.attr

obj1 = Singleton()
obj2 = Singleton()
print(obj1, obj2)

# Method 2 
class Singleton(object):
    def __init__(self):
        print("Do something new.")
    def __new__(cls, *args, **kwargs):
        if not hasattr(Singleton, "_instance"):
            Singleton._instance = object.__new__(cls)
        return Singleton._instance

obj1 = Singleton()
obj2 = Singleton()
print(obj1, obj2)

In [None]:
# 工厂类的示例
import abc
class StandardFactory(object):
    @staticmethod
    def get_factory(factory):
        """ 根据参数找到对实际操作的工厂 """
        if factory == "cat":
            return CatFactory()
        elif factory == "dog":
            return DogFactory()
        raise TypeError("Unkown Factory.")

class Pet(abc.ABC):     # 用于构建抽象类
    @abc.abstractmethod
    def eat(self):
        pass

    def jump(self):
        print("jump...")

class Dog(Pet):
    def eat(self):
        return "Dog eat..."

class Cat(Pet):
    def eat(self):
        return "Cat eat..."


class DogFactory(object):
    def get_pet(self):
        return Dog()

class CatFactory(object):
    def get_pet(self):
        return Cat()

if __name__ == "__main__":
    factory = StandardFactory.get_factory("cat")
    cat = factory.get_pet()
    print(cat.eat())

    factory = StandardFactory.get_factory("dog")
    dog = factory.get_pet()
    print(dog.eat())

    dog.jump()
    cat.jump()

In [None]:
# 观察者模式（发布-订阅模式）
# 多使用一种“注册-通知-撤销注册”的形式
class Subject:
    """ some subject """
    def __init__(self):
        self.observers = []

    def add_observer(self, observer):
        self.observers.append(observer)
        return self

    def remove_observer(self, observer):
        self.observers.remove(observer)
        return self

    def notify(self, msg):
        for observer in self.observers:
            observer.update(msg)

class Observer:
    def __init__(self, name):
        self.name = name
    
    def update(self, msg):
        print(f"{self.name}收到消息: {msg}")

xiaoming = Observer("xiaoming")
lihua = Observer("lihua")

rain = Subject()

# 添加订阅
rain.add_observer(xiaoming)
rain.add_observer(lihua)

rain.notify("下雨了！")

# 取消订阅
rain.remove_observer(lihua)

In [None]:
%%writefile cs.py
# Module: 文件形式保存的功能模组
def fib(n): # write Fibonacci series up to n
    a, b = 0, 1
    while b < n:
        print(b, end=" ")
        a, b = b, a + b
    print()

def fib2(n): # return Fibonacci series up to n
    result = []
    a, b = 0, 1
    while b < n:
        result.append(b)
        a, b = b, a + b
    return result


In [None]:
import cs as fb

fb.fib(4)
print(fb.fib2(3))

from cs import fib

fib(5)

In [None]:
%%writefile cs.py
# Python多进程编程
# jupyter不支持多进程，所以输出到文件
# jupyter内置命令要放在行首
from multiprocessing import Process
import os
import time

def hello(i):
    print("son process id {} - for task {}".format(os.getpid(), i))
    time.sleep(2)
    
if __name__ == "__main__":
    print("current father process {}".format(os.getpid()))
    start = time.time()
    p1 = Process(target=hello, args=(1, ))
    p2 = Process(target=hello, args=(2, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("Totally take {} seconds".format(end - start))

In [None]:
%run cs.py

In [None]:
%%writefile cs.py
# 进程池管理python多进程
from multiprocessing import Process, Pool
import os
import time

def hello(i):
    print("son process id {} - for task {}".format(os.getpid(), i))
    time.sleep(1)

if __name__ == "__main__":
    print("current father process {}".format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(hello, args=(i, ))
    p.close()
    p.join()
    end = time.time()
    print("Totally take {} seconds".format(end - start))

In [None]:
!python cs.py

In [None]:
%%writefile cs.py
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print("Run task %s (%s)..." % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print("Task %s runs %0.2f seconds." % (name, end - start))

if __name__ == "__main__":
    print("Parent process %s." % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i, ))   # apply_async是异步的，子进程执行的同时，主进程继续向下执行
    print("Waiting for all subprocess done...")
    p.close()
    p.join()
    print("All subprocesses done.")

In [None]:
!python cs.py

In [18]:
%%writefile cs.py
# 多进程模式下的主从通讯
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, "hello"])
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn, ))
    p.start()
    print(parent_conn.recv())
    p.join()

Overwriting cs.py


In [19]:
!python cs.py

[42, None, 'hello']


In [20]:
%%writefile cs.py
# 生产者-消费者模式    
# 数据通常保留在一个消息队列中，提供队列进行数据共享 Multiprocessing.Queue
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码
def producer(q):
    for value in range(5):
        print("Produce %d" % value)
        q.put(value)
        time.sleep(1)

# 读数据进程执行的代码
def consumer(q):
    while True:
        value = q.get(True)
        print("Consume %d" % value)
        time.sleep(1)

if __name__ == "__main__":
    t0 = time.time()
    # 父进程创建Queue，并传给各个子进程
    q = Queue()
    pw = Process(target=producer, args=(q, ))
    pr = Process(target=consumer, args=(q, ))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()  # pr进程里是死循环，无法等待结束，只能强行终止
    print("Take %ss." % (time.time() - t0))

Overwriting cs.py


In [21]:
!python cs.py

Produce 0
Consume 0
Produce 1
Consume 1
Produce 2
Consume 2
Produce 3
Consume 3
Produce 4
Consume 4
Take 5.0938591957092285s.


In [22]:
# python多线程编程
import threading
import time

def hello(i):
    print("thread id: {} for task {}".format(threading.current_thread().name, i))
    time.sleep(2)

if __name__ == "__main__":
    start = time.time()
    t1 = threading.Thread(target=hello, args=(1, ))
    t2 = threading.Thread(target=hello, args=(2, ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    end = time.time()
    print("Take {}s".format(end - start))

thread id: Thread-7 for task 1
thread id: Thread-8 for task 2
Take 2.0064802169799805s


In [23]:
# python迭代器
def fib(n):
    index = 0
    a, b = 0, 1
    while index < n:
        receive = yield b
        print("'fib' receive %d" % receive)
        a, b = b, a + b
        index += 1

fib = fib(20)
print("'fib' yield %d" % fib.send(None)) # 效果等同于print(next(fib))
print("'fib' yield %d" % fib.send(1)) 
print("'fib' yield %d" % fib.send(1)) 
print("'fib' yield %d" % fib.send(1)) 
print("'fib' yield %d" % fib.send(1)) 

'fib' yield 1
'fib' receive 1
'fib' yield 1
'fib' receive 1
'fib' yield 2
'fib' receive 1
'fib' yield 3
'fib' receive 1
'fib' yield 5


In [24]:
# python用迭代器实现协程
def consumer():
    r = ""
    while True:
        n = yield r
        if not n:
            return 
        print("[CONSUMER] Consuming %s..." % n)
        r = "200 OK"

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n += 1
        print("[PRODUCER] Producing %s..." % n)
        r = c.send(n)
        print("[PRODUCER] Consumer return: %s" % r)
    c.close()

c = consumer()
produce(c)

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
