In [None]:
# 限制多线程的执行速率
from threading import Lock
import time


class Throttle:
    def __init__(self, rate):
        self._consume_lock = Lock()
        self.rate = rate
        self.tokens = 0
        self.last = 0

    def consume(self, amount=1):
        with self._consume_lock:
            now = time.time()

            # 时间测量在第一令牌请求上初始化以避免冲突
            if self.last == 0:
                self.last = now

            elapsed = now - self.last

            # 确保传递时间的量足够大以添加新的令牌
            if int(elapsed * self.rate):
                self.tokens += int(elapsed * self.rate)
                self.last = now

            # 不要过度填满桶
            self.tokens = (
                self.rate if self.tokens > self.rate else self.tokens
            )

            # 如果可用最终分配令牌
            if self.tokens >= amount:
                self.tokens -= amount
            else:
                amount = 0

            return amount

In [None]:
# 使用队列和线程池并发处理HTTP请求
from gmaps import Geocoding
from threading import Thread
from queue import Queue, Empty

api = Geocoding()

PLACES = (
    'New York', 'Dehli', 'Vien', 'Zadar', 'Venice', 'Reykjavik', 'Berlin'
)
THREAD_POOL_SIZE = 4

# 向Api请求数据
def fetch_place(place):
    geocoded = api.geocode(place)[0]
    return geocoded[0]

def worker(work_queue, results_queue, throttle):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            # 分配令牌
            while not throttle.consume():
                pass

            # 处理工作线程中可能产生的错误，确保主线程不会因为该错误无限期等待
            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()
    throttle = Throttle(10)

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue, throttle))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

    while not results_queue.empty():
        result = results_queue.get()

        if isinstance(result, Exception):
            raise result


In [None]:
# 多进程
from multiprocessing import Pool

from gmaps import Geocoding


api = Geocoding()
PLACES = (
    'New York', 'Dehli', 'Vien', 'Zadar', 'Venice', 'Reykjavik', 'Berlin'
)
POOL_SIZE = 4

def fetch_place(place):
    return api.geocode(place)[0]

def present_result(geocoded):
    print(geocoded)

def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

        for result in results:
            present_result(result)

In [None]:
# 异步网络请求
import aiohttp
import asyncio

session = aiohttp.ClientSession()

async def geocode(place):
    params = {
        'sensor': 'false',
        'address': place
    }

    async with session.get('https://maps.googleapis.com/maps/api/geocode/json', params=params) as response:
        result = await response.json()
        return result['results']

PLACES = (
    'New York', 'Dehli', 'Vien', 'Zadar', 'Venice', 'Reykjavik', 'Berlin'
)

async def fetch_place(place):
    return (await geocode(place))[0]

async def present_result(result):
    geocoded = await result
    print(result)

async def main():
    await asyncio.wait(
        [present_result(fetch_place(place)) for place in PLACES]
    )

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # 如果没有关闭ClientSession，aiohttp会抛出问题
    loop.run_until_complete(session.close())
    loop.close()
