In [1]:
# 1.python并发
# 1.多线程，2.多进程，3.协程

import threading
import time
from threading import Thread

def task(seconds):
    print('start to sleep:', threading.current_thread().getName(), seconds)
    time.sleep(seconds)
    print('end sleeping:', threading.current_thread().getName(), seconds)
    return seconds+10


In [2]:
# 1.1.线程：计算机可以分配cpu调度的最小单元（车间里的工人）
# IO不需要计算多核，线程即可。多线程适合IO密集型、网络爬虫，不怎么用CPU。

# 当前主进程下创建多个子线程，线程并行在某些共享内存的数据结构上操作结果难以预料
t1 = Thread(target=task, args=[1+1])
t1.setName('睡2秒线程') # 放在start之前
t2 = Thread(target=task, args=[2+1])
t2.setName('睡3秒线程') # 放在start之前

# 放在start之前。是否是守护线程，默认为否。是则主线程执行完毕关闭该守护线程
t1.setDaemon(True)
# t2.setDaemon(False)

# 线程设置放在start之前
t1.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定
t2.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定

# join：等待当前线程执行完毕才继续执行，如果没有则主线程继续执行不等待
# t1.join() # 由于t1较短，不等t2执行完，只输出了t1的end
# t2.join() # 由于t2较长，t2执行完时t1也执行完，输出t1和t2的end
print('创建多个线程')
# 

start to sleep:start to sleep: 睡3秒线程 3
创建多个线程
 睡2秒线程 2
end sleeping: 睡2秒线程 2
end sleeping: 睡3秒线程 3


In [7]:
# 1.2. 线程类

# 继承线程类
class MyThread(Thread):
    def run(self) -> None:
        name, seconds = self._args
        print('start to sleep:', name)
        time.sleep(seconds)
        print('end sleeping:', name)

t1 = MyThread(args=['睡2秒线程', 2])
t2 = MyThread(args=['睡3秒线程', 3])

t1.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定
t2.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定

t2.join()


start to sleep: 睡2秒线程
start to sleep: 睡3秒线程
end sleeping: 睡2秒线程
end sleeping: 睡3秒线程


In [8]:
# 1.3. 线程锁或threading.Lock()threading.RLock()，上述都是不安全，特别是共享内存数据部分修改不可控，不过python里有一些数据结构自身带锁线程安全的。
from threading import Lock, RLock

# 加同一把锁后，只有一个占用
lock = Lock() # 同步锁，不支持嵌套，只能“锁-解-锁-解-...”
# lock = RLock() # 递归锁，更强大，支持复杂多人同时开发,可以“锁-锁-解-解”嵌套

# 继承线程类
class MyThread(Thread):
    def run(self) -> None:
        # 方式一：lock.acquire() 和 lock.release()
        # lock.acquire() # 申请锁
        # name, seconds = self._args
        # print('start to sleep:', name)
        # time.sleep(seconds)
        # print('end sleeping:', name)
        # lock.release() # 释放锁
        # 方式二：with
        with lock: # 默认执行lock.acquire() 和 lock.release()
            name, seconds = self._args
            print('start to sleep:', name)
            time.sleep(seconds)
            print('end sleeping:', name)

t1 = MyThread(args=['睡2秒线程', 2])
t2 = MyThread(args=['睡3秒线程', 3])

t1.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定
t2.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定

t1.join()
t2.join()

start to sleep: 睡2秒线程
end sleeping: 睡2秒线程
start to sleep: 睡3秒线程
end sleeping: 睡3秒线程


In [9]:
# 1.4. 死锁：相互占用资源不释放
from threading import Lock
lock = Lock() 

# 继承线程类:自定义线程类
class MyThread(Thread):
    def run(self) -> None:
        name, seconds = self._args
        print('run', name)
        lock.acquire() # 申请锁
        lock.acquire() # 申请锁，死锁了，程序没有终止
        print('start to sleep:', name)
        time.sleep(seconds)
        print('end sleeping:', name)
        lock.release() # 释放锁

t1 = MyThread(args=['睡2秒线程', 2])
t2 = MyThread(args=['睡3秒线程', 3])

t1.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定
t2.start() # 线程准备就绪，等待cpu调度，具体时间由cpu决定

t1.join()
t2.join()


run 睡2秒线程
run 睡3秒线程


KeyboardInterrupt: 

In [6]:
# 1.5.1 线程池：回调函数
from concurrent.futures import ThreadPoolExecutor

# 创建大小为3的线程池，最多维护3个线程
threadpool = ThreadPoolExecutor(3)

# 返回值自动封装在response中
def done(response):
    print('response.result()', response.result())

# 提交一个任务给线程池，让线程池分配一个线程去执行该任务，自主安排，没有空闲线程则等着知道有新的空闲线程
future1 = threadpool.submit(task, 2)
future1.add_done_callback(done) # 执行线程后回调函数，返回线程执行return结果
future2 = threadpool.submit(task, 3)
future2.add_done_callback(done) # 执行线程后回调函数，返回线程执行return结果

print('threadpool start')
threadpool.shutdown(wait=True) # 等线程池中所有线程执行完毕，没有则不等待。类似于前面join的作用
print('threadpool end')

start to sleep: ThreadPoolExecutor-0_0 2
start to sleep: ThreadPoolExecutor-0_1 3
threadpool start
end sleeping: ThreadPoolExecutor-0_0 2
response.result() 12
end sleeping: ThreadPoolExecutor-0_1 3
response.result() 13
threadpool end


In [9]:
# 1.5.1 线程池：统一获取结果
from concurrent.futures import ThreadPoolExecutor

# 创建大小为3的线程池，最多维护3个线程
threadpool = ThreadPoolExecutor(3)

# 返回值自动封装在response中
def done(response):
    print('response.result()', response.result())

# 提交一个任务给线程池，让线程池分配一个线程去执行该任务，自主安排，没有空闲线程则等着知道有新的空闲线程
futures = []
future1 = threadpool.submit(task, 2)
futures.append(future1)
future2 = threadpool.submit(task, 3)
futures.append(future2)

print('threadpool start')
threadpool.shutdown(wait=True) # 等线程池中所有线程执行完毕，没有则不等待。类似于前面join的作用
print('threadpool end')

# 统一获取线程返回结果
for future in futures:
    print(future.result())


start to sleep: ThreadPoolExecutor-6_0 2
start to sleep: ThreadPoolExecutor-6_1 3
threadpool start
end sleeping: ThreadPoolExecutor-6_0 2
end sleeping: ThreadPoolExecutor-6_1 3
threadpool end
12
13


In [11]:
# 1.6. 单例设计模式在多线程中不安全
# 单例模式：每次实例化类的对象时，都是最开始创建的那个对象，不再重复创建对象。保证全局该类下只有一个实例。
# 1.6.1.普通情况下，地址相同
class Singleton:
    instance = None
    def __init__(self, name):
        self.name = name
    def __new__(cls, *args, **kwargs):
        if cls.instance:
            return cls.instance
        cls.instance = object.__new__(cls) # 创建一个空对象
        return cls.instance
o1 = Singleton('a')
o2 = Singleton('b')
# 以下输出是同一个地址
print(o1)
print(o2)

<__main__.Singleton object at 0x000002390163A760>
<__main__.Singleton object at 0x000002390163A760>


In [16]:
# 1.6. 单例设计模式在多线程中不安全
# 1.6.2.多线程下的单例模式不安全（出问题）
from threading import Lock
lock = Lock() 

class Singleton:
    instance = None
    def __init__(self, name):
        self.name = name
    def __new__(cls, *args, **kwargs):
        # 不安全
        if cls.instance:
            return cls.instance
        time.sleep(2) # 执行此时各线程时间不同，需要在这个函数的代码块进行加锁（如"with lock:"）
        cls.instance = object.__new__(cls) # 创建一个空对象
        return cls.instance
        # 安全
        # with lock:
        #     if cls.instance:
        #         return cls.instance
        #     time.sleep(2) # 执行此时各线程时间不同，需要在这个函数的代码块进行加锁（如"with lock:"）
        #     cls.instance = object.__new__(cls) # 创建一个空对象
        #     return cls.instance
def task(name):
    o = Singleton(name)
    print(o) # 地址存在不同
for x in 'abcdedfg':
    t = Thread(target=task, args=[x])
    t.start()
t.join()


<__main__.Singleton object at 0x000002390163AFD0>
<__main__.Singleton object at 0x0000023901608880>
<__main__.Singleton object at 0x00000239016089A0><__main__.Singleton object at 0x0000023901608C10>
<__main__.Singleton object at 0x0000023901608370><__main__.Singleton object at 0x0000023901608A00>
<__main__.Singleton object at 0x0000023901608B20>

<__main__.Singleton object at 0x0000023901608EE0>


In [1]:
# 2.进程：计算机分配资源的最小单元（车间），比线程开销大，相互之间独立。CPython 全局解释锁（GIL）导致其在并行计算上优于多线程。
# 2.1 进程三种模式、函数
# 计算机多核优势适合多进程，适合计算密集型。
import multiprocess, threading
import time, os
from multiprocess import Process

# windows/linux/mac底层创建进程的方式不同，但在main函数里头都能成功
# fork: 自动把原来进程资源(包括文件对象和线程锁)拷贝复制一份，任意位置开始，快，os.fork()。unix
# spawn：手动传入参数必备资源(不支持文件对象和线程锁传参，需要自己创建这些资源)，main代码块开始，慢。window只支持这种模式。unix/windows
# forkserver：手动传入参数必备资源（不支持文件对象和线程锁传参，需要自己创建这些资源），main代码块开始. 部分unix

def task():
    print(1,name)
    print(1,multiprocess.current_process().name, multiprocess.current_process().pid)
    print(1, os.getpid(), os.getppid())
    name.append('lichy')

if __name__=='__main__':
    # 1.fork模式：自动拷贝
    multiprocess.set_start_method('fork')
    name = ['fubin']
    p = Process(target=task)
    p.name = 'fb'
    # 守护进程
    p.daemon = False # False：等子进程执行完毕，关闭该主进程；True: 不等子进程执行完毕，主进程执行完毕关闭子进程
    p.start()
    
    # 2.spawn模式：手动传参
    # multiprocess.set_start_method('spawn')
    # name = ['fubin']
    # p = Process(target=task,args=(name,)) # 参数传递
    # p.start()
    
    # 3.forsever模式：手动传参
    # multiprocess.set_start_method('forksever')
    # name = ['fubin']
    # p = Process(target=task,args=(name,)) # 参数传递
    # p.start()
    
    time.sleep(3)
    # 获取当前进程的名字、id、父进程id
    print(2,multiprocess.current_process().name, multiprocess.current_process().pid, os.getpid(), os.getppid())
    # 获取当前进程的子线程个数
    print(2,threading.enumerate())
    # 查看cpu个数(核数)
    print(2,multiprocess.cpu_count())
    print(2,name)


2 MainProcess 33238 33238 7334
2 [<_MainThread(MainThread, started 8213470720)>, <Thread(IOPub, started daemon 6120517632)>, <Heartbeat(Heartbeat, started daemon 6137344000)>, <Thread(Thread-3, started daemon 6155317248)>, <Thread(Thread-4, started daemon 6172143616)>, <ControlThread(Control, started daemon 6188969984)>, <HistorySavingThread(IPythonHistorySavingThread, started 6205796352)>, <ParentPollerUnix(Thread-2, started daemon 6223196160)>]
2 8
2 ['fubin']


In [11]:
# 2.2 自定义进程类
from multiprocess import Process

class MyProcess(Process):
    def run(self):
        print("进程：", self._args)

if __name__=='__main__':
    p = MyProcess(args=('fb',))
    p.start()
    print('主进程执行')

主进程执行
进程： ('fb',)


In [2]:
# 2.3 进程之间数据共享
# 2.3.1 默认不共享
from multiprocess import Process

def task(data):
    data.append('lichy')

if __name__=='__main__':
    data = []
    p = Process(target=task, args=(data,))
    p.start()
    p.join()
    
    print(data)

[]


In [5]:
# 2.3.2 进程之间数据共享  方式一： 一些特殊共享数据结构：Value,Array数字
from multiprocess import Process, Value, Array

def task(data1,data3,data4):
    data1.value = 17
    data3.value = 16.01
    data4[0] = -1
    print(11,data1.value,data2.value,data3.value, data4[:])

if __name__=='__main__':
    # c++风格，还有其他
    data1 = Value('i',18) # int 
    data3 = Value('f',18.) # double
    data4 = Array('i',[1,2,3,4]) # list
    print(1,data1.value,data3.value,data4[:])
    p = Process(target=task, args=(data1,data3,data4))
    p.start()
    p.join()
    
    print(2,data1.value,data3.value,data4[:])

1 18 18.0 [1, 2, 3, 4]
11 17 b'\x00' 16.010000228881836 [-1, 2, 3, 4]
2 17 16.010000228881836 [-1, 2, 3, 4]


In [18]:
# 2.3.2 进程之间数据共享  方式二： manager， Queue （FIFO）
from multiprocess import Process, Manager, Queue

def task(d, l, q):
    d['bf'] = -1
    l.append('lcy')
    q.put(2)
    print(11, d,l)

if __name__=='__main__':
    # 
    with Manager() as manager:
        d = manager.dict()
        l = manager.list()
        d['fb'] = 1
        l.append('fb')
        
        queue = Queue()
        queue.put(1)
        
        print(1, d,l,queue.qsize)
        p = Process(target=task, args=(d,l,queue))
        p.start()
        p.join()
        
        print(2, d, l, queue.qsize)
        print(dir(queue))

1 {'fb': 1} ['fb'] <bound method Queue.qsize of <multiprocess.queues.Queue object at 0x1076ff460>>
11 {'fb': 1, 'bf': -1} ['fb', 'lcy']
2 {'fb': 1, 'bf': -1} ['fb', 'lcy'] <bound method Queue.qsize of <multiprocess.queues.Queue object at 0x1076ff460>>
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_after_fork', '_buffer', '_close', '_closed', '_feed', '_finalize_close', '_finalize_join', '_ignore_epipe', '_joincancelled', '_jointhread', '_maxsize', '_notempty', '_on_queue_feeder_error', '_opid', '_poll', '_reader', '_recv_bytes', '_reset', '_rlock', '_sem', '_send_bytes', '_start_thread', '_thread', '_wlock', '_writer', 'cancel_join_thread', 'close', 'empty', '

In [21]:
# 2.3.2 进程之间数据共享  方式三： pipe
from multiprocess import Process, Pipe

def task(conn):
    time.sleep(2)
    # 发送
    print('子进程发送：', [1,2,4])
    conn.send([1,2,4])
    # 接收
    data = conn.recv() # 阻塞
    print('子进程接收：', data)
    time.sleep(2)

if __name__=='__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=task, args=(child_conn,))
    p.start()
    # 接收
    data = parent_conn.recv() # 阻塞
    print('父进程接收：', data)
    # 发送
    print('父进程发送：', 123)
    parent_conn.send(123)

子进程发送： [1, 2, 4]
父进程接收： [1, 2, 4]
父进程发送： 123
子进程接收： 123


In [None]:
# 总结：实际上，除了上述python内部数据结构实现外，还可以采用外部资源来实现，如文件、Mysql和redis。

In [22]:
# 2.4 进程锁：同线程锁类似，好多进程同时修改同一份共享资源（数据或文件）
from multiprocess import Process, RLock
import time

def task(lock):
    # 进程锁同线程锁类似，锁定代码块
    lock.acquire()
    print(os.getpid())
    lock.release()

if __name__=='__main__':
    # 进程锁
    lock = RLock()
    
    for i in range(3):
        p = Process(target=task, args=(lock,)) # 进程锁可以传递给不同进程，这里和线程锁有所不同
        p.start()
    
    time.sleep(8)

33344
33345
33346


In [2]:
# 2.5 进程池：同线程池使用类似
import multiprocess, time
from concurrent.futures import ProcessPoolExecutor

def task(num):
    print('执行：',num)
    time.sleep(2)
    return num

# 构建回调函数的操作内容
def done(res):
    print(1,multiprocess.current_process().pid)
    time.sleep(1)
    print(1,res.result())
    time.sleep(2)

if __name__=="__main__":
#     multiprocess.set_start_method('fork')
    processpool = ProcessPoolExecutor(3)
    for i in range(3):
        future = processpool.submit(task, i)
        future.add_done_callback(done) # 设置回调函数，和多线程一样
    print(2, multiprocess.current_process().pid)
    processpool.shutdown(wait=True)

2 33425
1 33425


Process SpawnProcess-4:
Process SpawnProcess-5:
Process SpawnProcess-6:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
Traceback (most recent call last):
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/fubin/o

1 33425


exception calling callback for <Future at 0x103ff7f40 state=finished raised BrokenProcessPool>
Traceback (most recent call last):
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "/var/folders/6l/8jb84y794gz98hv20bv2jlmw0000gn/T/ipykernel_33425/3186177949.py", line 14, in done
    print(1,res.result())
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "/var/folders/6l/8jb84y794gz98hv20bv2jlmw0000gn/T/ipykernel_33425/3186177949.py", line 14, in done
    print(1,res.result())
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 439

1 33425


exception calling callback for <Future at 0x103fd3bb0 state=finished raised BrokenProcessPool>
Traceback (most recent call last):
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "/var/folders/6l/8jb84y794gz98hv20bv2jlmw0000gn/T/ipykernel_33425/3186177949.py", line 14, in done
    print(1,res.result())
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "/var/folders/6l/8jb84y794gz98hv20bv2jlmw0000gn/T/ipykernel_33425/3186177949.py", line 14, in done
    print(1,res.result())
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 439

In [3]:
# 2.5 进程池中的进程锁：同线程池使用类似，但是只能使用multiprocessing.Manager.RLock，不能使用multiprocessing.RLock
import multiprocess
from concurrent.futures import ProcessPoolExecutor
from multiprocess import Manager

def task(num, lock):
    lock.acquire()
    print('执行：',num)
    time.sleep(2)
    lock.release()
    return num

if __name__=='__main__':
#     multiprocess.set_start_method('fork')
    # 这里进程池只能使用multiprocess.Manager.RLock，不能使用multiprocess.RLock
    lock = Manager().RLock()
    processpool = ProcessPoolExecutor(3)
    for i in range(3):
        future = processpool.submit(task, (i, lock))
    print(2, multiprocess.current_process().pid)
    processpool.shutdown(wait=True)

2 33452


Process SpawnProcess-8:
Traceback (most recent call last):
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/fubin/opt/anaconda3/lib/python3.9/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'task' on <module '__main__' (built-in)>
Process SpawnProcess-9:
Traceback (most recent call last):
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/fubin/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  Fi

In [4]:
# 3. 协程(异步编程，coroutine)：通过代码实现的东西，计算机中并不存在（一般不太重要，进程和线程是计算机内真实存在的）
# 协程也称为微线程，是一种用户态内上下文切换技术，通过一个线程实现代码块相互切换执行（来回跳着执行）
# 用处：处理io请求等待时间还想干点其他事情，通过一个线程实现并发操作
# 3.1 第三方实现包如greenlet实现协程：用户主动设置切换，效率低

from greenlet import greenlet

# gl1
def fun1():
    print(1)
    gl2.switch() # 跳转fun2
    print(2)
    gl2.switch() # 跳转fun2
# gl2
def fun2():
    print(3)
    gl1.switch() # 跳转fun1
    print(4)
    gl1.switch() # 跳转fun1

gl1 = greenlet(fun1)
gl2 = greenlet(fun2)

# 开始执行fun1
gl1.switch()

1
3
2
4


In [5]:
# 3.2 yield实现协程：用户主动设置切换，效率低
def fun1():
    yield 1
    yield from fun2()
    yield 2
def fun2():
    yield 3
    # yield from fun1()  输出 1,3,1,3,1,3,1,3,1,3,...
    yield 4

f1 = fun1()
for i in f1:
    print(i)

1
3
4
2


In [1]:
# 3.3 自动切换，效率高，采用asyncio模块加python的async、await关键字语法来实现协程。
# 切换-微观不并行宏观并行-异步编程。利用中间等待时间执行其他代码。
# 在ipynb执行会报错，跟jupyter notebook机制有关
# await本质是一个生成器，返回异步执行结果
import asyncio

async def fun1():
    print(1)
    await asyncio.sleep(2) # 等待时间或其他IO任务，让出cpu自动切换到其他任务，且能够自动切换回来。非阻塞式睡眠。
    # time.sleep(2) # 阻塞式睡眠
    print(2)
async def fun2():
    print(3)
    await asyncio.sleep(2) # 等待时间或其他IO任务，让出cpu自动切换到其他任务，且能够自动切换回来
    print(4)

# 包裹成任务
tasks = [asyncio.ensure_future(fun1()), asyncio.ensure_future(fun2())]

loop = asyncio.get_event_loop() # 轮询，一般程序入口
# 内部一个线程来执行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# cmd下执行输出：
# 1
# 3
# 2
# 4

RuntimeError: This event loop is already running

In [4]:
# 协程例子二:cmd下执行不报错
# 背后有event loop
import asyncio

async def xprint(x: int, s: str):
    await asyncio.sleep(x)
    print(s)
    return s
async def main():
#     task1 = asyncio.create_task(xprint(1,'fu'))
#     task2 = asyncio.create_task(xprint(2,'bin'))
#     result1 = await task1
#     result2 = await task2
#     results = await asyncio.gather(task1,task2) # 一下获取所有
    results = await asyncio.gather(xprint(1,'fu'),xprint(2,'bin')) # 自动封装成任务
asyncio.run(main())


RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
# 3.4 自动切换，采用aiohttp实现爬虫，io等待时候干点别的事情（aiohttp：http协程接口）
# 3.4.1 多线程协同
# 3.4.2 多进程协同（速度远远大于其他）

import aiohttp, asyncio

# 协程要比线程更加合适，节约资源，完成并发。协程本质是一个线程，不适合做计算处理。数据回来了，让其他线程来处理。
# 后期项目开发基于框架，框架内部已经使用了这些东西。多进程+协程实现很多高并发。
# 非阻塞和协程。

# 协程（单核cpu，开销最小） -> 线程（单核cpu） -> 进程(多核cpu)