In [1]:
%%time

import asyncio
import collections
import csv
import functools
import requests

LINKS_CSV = 'movielens/ml-latest-small/links.csv'
#PLOTS_CSV = 'movielens/ml-latest-small/plots.csv'
PLOTS_CSV = 'movielens/ml-latest-small/plots2.csv'

MOVIE_DATA_URL = 'http://www.omdbapi.com'

MovieRequest = collections.namedtuple('MovieRequest', ['movie_id', 'imdb_id', 'tries'])
MovieData = collections.namedtuple('MovieData', ['movie_id', 'plot'])

request_queue = asyncio.Queue(100)
fail_queue = asyncio.Queue(100)
data_queue = asyncio.Queue(100)

pipeline_active = asyncio.Event()
producer_done = asyncio.Event()

async def pipeline():
    print('Pipeline started...')
    pipeline_active.set()
    await asyncio.sleep(2)
    await producer_done.wait()
    await request_queue.join()
    await fail_queue.join()
    await data_queue.join()
    pipeline_active.clear()
    await asyncio.sleep(2)
    print('Pipeline done!')


async def request_producer(filename, request_queue):
    await pipeline_active.wait()
    print('Request producer started...')
    N = 3 * request_queue.maxsize // 4
    with open(filename, newline='') as f:
        reader = csv.reader(f)
        next(reader) # skip header
        for n, (movie_id, imdb_id, tmdb_id) in enumerate(reader):
            while request_queue.qsize() > N:
                await asyncio.sleep(2)
            await request_queue.put(MovieRequest(movie_id, imdb_id, 0))
            if (n+1) % 1000 == 0:
                print('Requests... {:,d}'.format(n+1))
    producer_done.set()
    print('Request producer done!')


s = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=20, pool_maxsize=20, max_retries=3)
s.mount(MOVIE_DATA_URL, adapter)
s.params['plot'] = 'full'

def load_movie_data(imdb_id, timeout=5):
    r = s.get(MOVIE_DATA_URL, params={'i': 'tt' + imdb_id}, timeout=timeout)
    return r.json()


async def request_worker(name, loop, request_queue, data_queue, fail_queue, retries=3):
    await pipeline_active.wait()
    print('Request worker {} started...'.format(name))
    n, t, f = 0, 0, 0
    while pipeline_active.is_set():
        try:
            req = await asyncio.wait_for(request_queue.get(), 1)
        except asyncio.TimeoutError:
            #print('Request worker {} timeout...'.format(name))
            continue
        try:
            data = await loop.run_in_executor(None, functools.partial(load_movie_data, req.imdb_id))
            n += 1
            await data_queue.put(MovieData(req.movie_id, data["Plot"]))
        except Exception as e:
            #print('Request worker {} fail...'.format(name))
            #print(e)
            if req.tries < retries:
                t += 1
                await request_queue.put(req._replace(tries=req.tries+1))
            else:
                f += 1
                await fail_queue.put(req)
        request_queue.task_done()
    print('Request worker {} done! {:,d} requests, {:,d} retries, {:,d} fails'.format(name, n, t, f))


async def csv_writer(filename, data_queue):
    await pipeline_active.wait()
    print('CSV Writer (filename={}) started...'.format(filename))
    n = 0
    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['movieId', 'plot'])

        while pipeline_active.is_set():
            try:
                data = await asyncio.wait_for(data_queue.get(), 1)
            except asyncio.TimeoutError:
                #print('CSV Plot timeout...')
                continue
            movie_id, movie_plot = data
            writer.writerow([movie_id, movie_plot])
            n += 1
            data_queue.task_done()
            
            if n % 1000 == 0:
                print('CSV rows... {:,d}'.format(n))

    print('CSV Writer done! {:,d} rows'.format(n))


async def fail_sink(fail_queue):
    await pipeline_active.wait()
    print('Fail Sink started...')
    n = 0
    while pipeline_active.is_set():
        try:
            fail = await asyncio.wait_for(fail_queue.get(), 1)
        except asyncio.TimeoutError:
            #print('Fail Sink timeout...')
            continue
        n += 1
        if n <= 100:
            print('Lost: {}'.format(fail))
        if n % 1000 == 0:
            print('Lost: {:,d}'.format(n))
        fail_queue.task_done()
    print('Fail sink done! {:,d} fails'.format(n))

    
loop = asyncio.get_event_loop()

loop.create_task(fail_sink(fail_queue))

loop.create_task(csv_writer(PLOTS_CSV, data_queue))

for i in range(5):
    loop.create_task(request_worker(str(i+1), loop, request_queue, data_queue, fail_queue))

loop.create_task(request_producer(LINKS_CSV, request_queue))

t = loop.create_task(pipeline())

loop.run_until_complete(t)

print(request_queue.qsize())
print(fail_queue.qsize())
print(data_queue.qsize())

Pipeline started...
Fail Sink started...
CSV Writer (filename=movielens/ml-latest-small/plots2.csv) started...
Request worker 1 started...
Request worker 2 started...
Request worker 3 started...
Request worker 4 started...
Request worker 5 started...
Request producer started...
Requests... 1,000
CSV rows... 1,000
Requests... 2,000
CSV rows... 2,000
Requests... 3,000
CSV rows... 3,000
Requests... 4,000
CSV rows... 4,000
Requests... 5,000
CSV rows... 5,000
Requests... 6,000
CSV rows... 6,000
Requests... 7,000
CSV rows... 7,000
Requests... 8,000
CSV rows... 8,000
Requests... 9,000
CSV rows... 9,000
Request producer done!
Request worker 2 done! 1,838 requests, 3 retries, 0 fails
Fail sink done! 0 fails
Request worker 1 done! 1,813 requests, 2 retries, 0 fails
Request worker 3 done! 1,790 requests, 2 retries, 0 fails
Request worker 4 done! 1,861 requests, 0 retries, 0 fails
Request worker 5 done! 1,823 requests, 1 retries, 0 fails
CSV Writer done! 9,125 rows
Pipeline done!
0
0
0
CPU times: 