In [2]:
import time
import asyncio
import requests
import aiohttp
import grequests
import threading

import warnings
warnings.filterwarnings('ignore')

In [3]:
addresses = ['http://127.0.0.1:{}/end'.format(8000 + i) for i in range(3)]
addr = addresses[0]

## `requests` 并发请求

### 后端用 `async` 视图函数 (with `await asyncio.sleep`)

In [14]:
def func():
    res = requests.get(addr)
    print(res.status_code, res.json())

In [17]:
threading.Thread(target=func).start()
threading.Thread(target=func).start()

200200 {'result': 'success', 'client': 'A'}
 {'result': 'success', 'client': 'A'}


### 后端不用 `async` 视图函数 (with `time.sleep`)

In [16]:
threading.Thread(target=func).start()
threading.Thread(target=func).start()
# 也是几乎同时出结果

200 {'result': 'success', 'client': 'A'}
200 {'result': 'success', 'client': 'A'}


以上说明后端视图函数都是并发的

## `grequests` 并发

### `grequests.map` 并发后返回结果是同步阻塞的，结果列表是有序的

In [10]:
def greq_async_map():
    start = time.time()
    res_list = grequests.map([
        grequests.get(addr) for addr in addresses * 5
    ])
    print('---')
    end = time.time()
    print(end - start)
    print('sending completed')
    return res_list

In [11]:
res_list = greq_async_map()

---
2.0179731845855713
sending completed


In [39]:
print(res_list)

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]


### `grequests.imap` 并发后是及时返回结果，直到使用到结果时才阻塞

In [44]:
def greq_async_imap():
    start = time.time()
    res_list = grequests.imap([
        grequests.get(addr) for addr in addresses * 5
    ])
    end = time.time()
    print(end - start)
    print('sending completed at', time.time())
    print('---')
    return res_list

def fff():
    print('another function finished at', time.time())

In [45]:
res_list = greq_async_imap()
fff()

0.00045299530029296875
sending completed at 1675885141.692812
---
another function finished at 1675885141.6929028


In [33]:
for res in res_list:
    print(res.status_code, res.json()) # 这就又变成了同步阻塞

200 {'result': 'success', 'client': 'B'}
200 {'result': 'success', 'client': 'A'}
200 {'result': 'success', 'client': 'C'}
200 {'result': 'success', 'client': 'A'}
200 {'result': 'success', 'client': 'B'}
200 {'result': 'success', 'client': 'C'}
200 {'result': 'success', 'client': 'A'}
200 {'result': 'success', 'client': 'B'}
200 {'result': 'success', 'client': 'C'}
200 {'result': 'success', 'client': 'A'}
200 {'result': 'success', 'client': 'C'}
200 {'result': 'success', 'client': 'B'}
200 {'result': 'success', 'client': 'A'}
200 {'result': 'success', 'client': 'B'}
200 {'result': 'success', 'client': 'C'}


In [36]:
# print([res.status_code for res in res_list]) # 这样还好点

[200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200]


相比之下，`grequests.map` 才是真正意义上的并发

## `aiohttp` 并发

需要借助于 `asyncio` 的 coroutine/task 原语实现，否则是『伪并发』

**反面示例 1**

In [52]:
async def async_requests():
    start = time.time()
    async with aiohttp.ClientSession() as client:
        for addr in addresses * 3:
            async with client.get(addr) as resp:
                res = await resp.json()
                print('{} requesting completed'.format(addr), res)
    end = time.time()
    print('time consumption', end - start)

await async_requests()

http://127.0.0.1:8000/end requesting completed {'result': 'success', 'client': 'A'}
http://127.0.0.1:8001/end requesting completed {'result': 'success', 'client': 'B'}
http://127.0.0.1:8002/end requesting completed {'result': 'success', 'client': 'C'}
http://127.0.0.1:8000/end requesting completed {'result': 'success', 'client': 'A'}
http://127.0.0.1:8001/end requesting completed {'result': 'success', 'client': 'B'}
http://127.0.0.1:8002/end requesting completed {'result': 'success', 'client': 'C'}
http://127.0.0.1:8000/end requesting completed {'result': 'success', 'client': 'A'}
http://127.0.0.1:8001/end requesting completed {'result': 'success', 'client': 'B'}
http://127.0.0.1:8002/end requesting completed {'result': 'success', 'client': 'C'}
time consumption 18.083763122558594


In [53]:
async def async_requests():
    start = time.time()
    async with aiohttp.ClientSession() as client:
        for addr in addresses * 3:
            client.get(addr)

    end = time.time()
    print('time consumption', end - start)


await async_requests() # 内部调用没有阻塞，啥也没用

time consumption 0.00026917457580566406


**反面示例 1**

In [55]:
async def async_requests():
    start = time.time()
    for addr in addresses * 3:
        async with aiohttp.ClientSession() as client:
            async with client.get(addr) as resp:
                res = await resp.json()
                print('{} requesting completed'.format(addr), res)
    end = time.time()
    print('time consumption', end - start)

await async_requests() # 每一个地址都要创建一个session，这样理论上比上面更慢

http://127.0.0.1:8000/end requesting completed {'result': 'success', 'client': 'A'}
http://127.0.0.1:8001/end requesting completed {'result': 'success', 'client': 'B'}
http://127.0.0.1:8002/end requesting completed {'result': 'success', 'client': 'C'}
http://127.0.0.1:8000/end requesting completed {'result': 'success', 'client': 'A'}
http://127.0.0.1:8001/end requesting completed {'result': 'success', 'client': 'B'}
http://127.0.0.1:8002/end requesting completed {'result': 'success', 'client': 'C'}
http://127.0.0.1:8000/end requesting completed {'result': 'success', 'client': 'A'}
http://127.0.0.1:8001/end requesting completed {'result': 'success', 'client': 'B'}
http://127.0.0.1:8002/end requesting completed {'result': 'success', 'client': 'C'}
time consumption 18.149924993515015


**真正并发 1** (create task, gather, await result)

In [68]:
async def async_requests():
    async with aiohttp.ClientSession() as client:
        start = time.time()
        tasks = [asyncio.create_task(client.get(addr)) for addr in addresses * 3]
        await asyncio.gather(*tasks)
        middle = time.time()
        print('up to now, time consumption', middle - start)
        for task in tasks:
            res_content = await task.result().json()
            print('{} requesting completed'.format(task), res_content)
        end = time.time()
        print('time consumption', end - start)

await async_requests()

Unclosed connection
client_connection: Connection<ConnectionKey(host='127.0.0.1', port=8000, is_ssl=False, ssl=None, proxy=None, proxy_auth=None, proxy_headers_hash=None)>
Unclosed connection
client_connection: Connection<ConnectionKey(host='127.0.0.1', port=8001, is_ssl=False, ssl=None, proxy=None, proxy_auth=None, proxy_headers_hash=None)>


up to now, time consumption 2.0872039794921875
<Task finished name='Task-224' coro=<<_RequestContextManager without __name__>()> result=<ClientRespon...ation/json')>
> requesting completed {'result': 'success', 'client': 'A'}
<Task finished name='Task-225' coro=<<_RequestContextManager without __name__>()> result=<ClientRespon...ation/json')>
> requesting completed {'result': 'success', 'client': 'B'}
<Task finished name='Task-226' coro=<<_RequestContextManager without __name__>()> result=<ClientRespon...ation/json')>
> requesting completed {'result': 'success', 'client': 'C'}
<Task finished name='Task-227' coro=<<_RequestContextManager without __name__>()> result=<ClientRespon...ation/json')>
> requesting completed {'result': 'success', 'client': 'A'}
<Task finished name='Task-228' coro=<<_RequestContextManager without __name__>()> result=<ClientRespon...ation/json')>
> requesting completed {'result': 'success', 'client': 'B'}
<Task finished name='Task-229' coro=<<_RequestContextManag

**真正并发 1** (by coroutine)

In [None]:

def func_1():
    print('start')

    async def async_request(addr):
        start = time.time()
        async with aiohttp.ClientSession() as client:
            resp = await client.get(addr)
            print('{} requesting completed'.format(addr))
        # res = requests.get(addr) 这条不行
        # time.sleep(2) 这条也不行

    addresses = ['http://127.0.0.1:{}/start'.format(8000 + i) for i in range(3)]
    asyncio.run(asyncio.wait([async_request(addr) for addr in addresses]))


In [66]:

async def single_async_request(addr):
    async with aiohttp.ClientSession() as client:
        resp = await client.get(addr)
        print('{} requesting completed'.format(addr))

In [67]:
start = time.time()
await asyncio.wait([single_async_request(addr) for addr in addresses * 3])
end = time.time()
print('time consumption', end - start)

http://127.0.0.1:8001/end requesting completed
http://127.0.0.1:8000/end requesting completed
http://127.0.0.1:8000/end requesting completed
http://127.0.0.1:8001/end requesting completed
http://127.0.0.1:8001/end requesting completed
http://127.0.0.1:8000/end requesting completed
http://127.0.0.1:8002/end requesting completed
http://127.0.0.1:8002/end requesting completed
http://127.0.0.1:8002/end requesting completed
time consumption 2.040360927581787


---

## 其他示例

**demo 1**

In [80]:
async def func_a():
    print('suspending func a')
    await asyncio.sleep(3)
    print('resuming func a')

async def func_b():
    await asyncio.sleep(2)
    print('in function b')

#     # await task_a
#     # await task_b
#     await asyncio.gather(task_a, task_b)
#     # await func_a()
#     # await func_b()




In [81]:
async def main():
    task_a = asyncio.create_task(func_a())
    task_b = asyncio.create_task(func_b())
    await task_a
    await task_b
    
await main()

suspending func a
in function b
resuming func a


In [82]:
async def main():
    task_a = asyncio.create_task(func_a())
    task_b = asyncio.create_task(func_b())
    await asyncio.gather(task_a, task_b)
    
await main()

suspending func a
in function b
resuming func a


以上两种方法是等价的