In [1]:
import collections
import queue
import random
import time
DEPARTURE_INTERVAL = 5

In [2]:
Event = collections.namedtuple('Event', 'time proc action')

In [3]:
def taxi_process(ident, trips, start_time=0):
    """Yield to simulator issuing event at each state change"""
    time = yield Event(start_time, ident, 'leave garage')
    for i in range(trips):
        time = yield Event(time, ident, 'pick up passenger')
        time = yield Event(time, ident, 'drop off passenger')
        
    yield Event(time, ident, 'going home')
    #end of taxi process

In [4]:
class Simulator:
    
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)
        
    def run(self, end_time):
        """Schedule and display events until time is up"""
        for _, proc in sorted(self.procs.items()):
            first_event = next(proc)
            self.events.put(first_event)
            
        sim_time = 0
        while sim_time < end_time:
            if self.events.empty():
                print('*** End of events ***')
                break
            
            current_event = self.events.get()
            sim_time, proc_id, previous_action = current_event
            msg = 'taxi: {proc_id:<3}{indent}{event}' 
            print(msg.format(indent = ' ' * proc_id, proc_id=proc_id, event=current_event))
            active_proc = self.procs[proc_id]
            next_time = sim_time + compute_duration(previous_action)
            try: 
                next_event = active_proc.send(next_time)
            except StopIteration:
                del self.procs[proc_id]
            else: 
                self.events.put(next_event)
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))


In [5]:
DURATIONS = {
    'leave garage': 5,
    'drop off passenger': 5,
    'pick up passenger': 20,
    'going home': 1, 
}
def compute_duration(previous_event):
    interval = DURATIONS[previous_event]
    return int(random.expovariate(1/interval)) + 1

def main_taxis(end_time=180, num_taxis=3, seed=None):
    if seed is not None:
        random.seed(seed)
    taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL) 
             for i in range(num_taxis)}
    sim = Simulator(taxis)
    sim.run(end_time)

In [6]:
main_taxis(500, 3)

taxi: 0  Event(time=0, proc=0, action='leave garage')
taxi: 0  Event(time=4, proc=0, action='pick up passenger')
taxi: 1   Event(time=5, proc=1, action='leave garage')
taxi: 1   Event(time=9, proc=1, action='pick up passenger')
taxi: 2    Event(time=10, proc=2, action='leave garage')
taxi: 2    Event(time=14, proc=2, action='pick up passenger')
taxi: 2    Event(time=23, proc=2, action='drop off passenger')
taxi: 2    Event(time=28, proc=2, action='pick up passenger')
taxi: 2    Event(time=37, proc=2, action='drop off passenger')
taxi: 2    Event(time=42, proc=2, action='pick up passenger')
taxi: 2    Event(time=44, proc=2, action='drop off passenger')
taxi: 2    Event(time=48, proc=2, action='pick up passenger')
taxi: 2    Event(time=50, proc=2, action='drop off passenger')
taxi: 2    Event(time=62, proc=2, action='pick up passenger')
taxi: 1   Event(time=64, proc=1, action='drop off passenger')
taxi: 1   Event(time=65, proc=1, action='pick up passenger')
taxi: 0  Event(time=68, proc=0

In [21]:
import os, time, sys, requests, pathlib, shutil, collections

POP20_CC = ('cn in us id br pk ng bd ru jp mx ph vn et eg de ir tr cd fr').upper().split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = pathlib.Path('flags/')

def save_flag(img, filename):
    path = DEST_DIR / filename
    path.write_bytes(img)
        
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
   
def single_download(cc):
    image = get_flag(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc
    
def download_many_procedural(cc_list):
    res = map(single_download, sorted(cc_list))
    return len(list(res))

    
def main_flags(download_many):
    shutil.rmtree(str(DEST_DIR), True)
    DEST_DIR.mkdir()
    print('STARTING {}'.format(download_many.__name__))
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

main_flags(download_many_procedural)

STARTING download_many_procedural

20 flags downloaded in 3.59s


In [8]:
from concurrent import futures
MAX_WORKERS = 50 

def download_many_threads(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(single_download, sorted(cc_list))
    return len(list(res))

main_flags(download_many_threads)

STARTING download_many_threads

20 flags downloaded in 0.69s


In [9]:
def download_many_proc(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ProcessPoolExecutor(workers) as executor:
        res = executor.map(single_download, sorted(cc_list))
    return len(list(res))

main_flags(download_many_proc)

STARTING download_many_proc

20 flags downloaded in 0.97s


In [10]:
def download_many_threads_v2(cc_list):
#     cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(2) as ex:
        to_do = []
        for cc in sorted(cc_list):
            future = ex.submit(single_download, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
        
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
            
    return len(results)
            
main_flags(download_many_threads_v2)

STARTING download_many_threads_v2
Scheduled for BD: <Future at 0x7f9815c70080 state=running>
Scheduled for BR: <Future at 0x7f9815c73438 state=running>
Scheduled for CD: <Future at 0x7f9815c736d8 state=pending>
Scheduled for CN: <Future at 0x7f9815c73be0 state=pending>
Scheduled for DE: <Future at 0x7f9815cbbc18 state=pending>
Scheduled for EG: <Future at 0x7f9815cbba20 state=pending>
Scheduled for ET: <Future at 0x7f9815cbb898 state=pending>
Scheduled for FR: <Future at 0x7f98145f3eb8 state=pending>
Scheduled for ID: <Future at 0x7f98145f3400 state=pending>
Scheduled for IN: <Future at 0x7f98145f34a8 state=pending>
Scheduled for IR: <Future at 0x7f98145f30b8 state=pending>
Scheduled for JP: <Future at 0x7f98145f32e8 state=pending>
Scheduled for MX: <Future at 0x7f98145f3780 state=pending>
Scheduled for NG: <Future at 0x7f98145f3c88 state=pending>
Scheduled for PH: <Future at 0x7f98145f39e8 state=pending>
Scheduled for PK: <Future at 0x7f98145f3048 state=pending>
Scheduled for RU: <Fut

In [20]:
from tqdm import tqdm
def sleep_for_a_while(procid):
    delay = random.expovariate(5)
#     print(delay)
    time.sleep(delay)

with futures.ThreadPoolExecutor(200) as ex:
    res = ex.map(sleep_for_a_while, range(1000))
    res = tqdm(res, total=1000)
    list(res)

100%|██████████| 1000/1000 [00:01<00:00, 543.77it/s]


In [33]:
from enum import Enum
from collections import namedtuple
Result = namedtuple('Result', 'status msg')
class HTTPStatus(Enum):
    ok = 'ok'
    not_found = 'not found'

In [48]:
ALL_CC = """
AD AE AF AG AL AM AO AR AT AU AZ BA BB BD BE BF BG BH BI BJ BN BO BR BS BT
BW BY BZ CA CD CF CG CH CI CL CM CN CO CR CU CV CY CZ DE DJ DK DM DZ EC EE
EG ER ES ET FI FJ FM FR GA GB GD GE GH GM GN GQ GR GT GW GY HN HR HT HU ID
IE IL IN IQ IR IS IT JM JO JP KE KG KH KI KM KN KP KR KW KZ LA LB LC LI LK
LR LS LT LU LV LY MA MC MD ME MG MH MK ML MM MN MR MT MU MV MW MX MY MZ NA
NE NG NI NL NO NP NR NZ OM PA PE PG PH PK PL PT PW PY QA RO RS RU RW SA SB
SC SD SE SG SI SK SL SM SN SO SR SS ST SV SY SZ TD TG TH TJ TL TM TN TO TR
TT TV TW TZ UA UG US UY UZ VA VC VE VN VU WS YE ZA ZM ZW
""".split()

def get_flag_v2(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.content

def single_download_v2(cc, base_url, verbose=False):
    try: 
        image = get_flag_v2(cc)
    except requests.exceptions.HTTPError as err:
        res = err.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else: 
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose:
        print(cc, msg)
        
    return Result(status, cc) 
    
def download_many_procedural_v2(cc_list, base_url, verbose, max_req):
    counter = collections.Counter()
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm(cc_iter)
    for cc in cc_iter:
        try:
            res = single_download_v2(cc, base_url, verbose)
        except requests.exceptions.HTTPError as err:
            error_msg = 'HTTP error {res.status_code} - {res.reason}'
            error_msg = error_msg.format(res=err.response)
        except requests.exceptions.ConnectionError as err:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}: {}'.format(cc, error_msg))
    
    return counter
            
    res = map(single_download, sorted(cc_list))
    return len(list(res))

    
    
VERBOSE = False
MAX_PROC = 1

def main_flags_v2(download_many):
    shutil.rmtree(str(DEST_DIR), True)
    DEST_DIR.mkdir()
    t0 = time.time()
    count = download_many(ALL_CC[:10], BASE_URL, VERBOSE, MAX_PROC)
    duration = time.time() - t0
    return '{} completed in {}s'.format(download_many.__name__, duration)

    
main_flags_v2(download_many_procedural_v2)

100%|██████████| 10/10 [00:00<00:00, 10.94it/s]


'download_many_procedural_v2 completed in 0.9044125080108643s'