In [1]:
# to wait for a huge number of network events efficientsly. 
# (asynchronous I/O)

In [None]:
# Waits for many responses, but does little computation
# If it devotes a thread to each in-flight request, then as the number of 
# concurrent requests rises, it will run out of memory or other
# thread-related resource before it runs out of sockets.

# It avoids the need for threads by using async


In [None]:
# 1. show an async event loop, sketch a crawler that uses event loop with
# callbacks. 

# 2. python coroutines are both efficient and extensible. 
# We implement coroutines in Python using generators.

# 3. We use Python's standard 'asyncio' library, and corrdinate coroutines
# using an async queue.


In [None]:
# A crawler finds and downloads all pages on a website, perhaps to archive
# or index them. Beginning with a root URL, it fetches each page, parses it
# for links to unseen pages, and adds theses to a queue. It stops when it 
# fetches a page with no unseen links and the queue is empty

# We can hasten this process by downloading many pages concurrently.
# AS the crawler finds new links, it launches simultaneous fetch operation
# on separate sockets. It parses responses as they arrive, adding new links
# to the queue.

In [9]:
import socket, parse_links

ImportError: No module named 'parse_links'

In [10]:
url = 'xkcd.com'

In [12]:
sock = socket.socket()
sock.connect(('xkcd.com', 80))
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
    response += chunk
    chunk = sock.recv(4096)

In [14]:
request

'GET xkcd.com HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'

In [7]:
# Traditionally, we createa thread pool. 
def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
        
    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

In [8]:
fetch('xkcd.com')

NameError: name 'parse_links' is not defined

In [15]:
sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass



In [16]:
sock

<socket.socket fd=61, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('10.14.48.212', 61550), raddr=('151.101.192.67', 80)>

In [None]:
# Irritatingly, a non-blocking socket throws an exception from connect, 
# even hwne it is working normally. This exception 

In [17]:
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')

while True:
    try:
        sock.send(encoded)
        break
    except OSError as e:
        pass

print('sent')
    

sent


In [18]:
# Python 3.4's DefaultSElector uses the best select like function.

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()
sock = socket.socket()
sock.setblocking(False)

try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected!')
    
selector.register(sock.fileno(), EVENT_WRITE, connected)

SelectorKey(fileobj=60, fd=60, events=2, data=<function connected at 0x10a2fdf28>)

In [None]:
def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

In [None]:
# connected call back is stored as event_key.data which ew retrive adn exec once
# the none blocking socket is connected.

# unlike in our fast spinning loop above, the call to select here pauses, awaiting the next I/O events.
# Then the loop runs callbacks that are waiting for these events

# We built a tiny system that does overlapping I/O. It doesn't actually utilize
# multiple cores to execute computation in parallel. But then, 

# So 

In [None]:
Callbacks

In [None]:
# With the runty framework we have built so far, how can we bild a web crawler
# we begin with global sets of the urls we have yet to fetch, and the urls we
# have seen:
    
    

In [23]:
urls_todo = set(['/'])
seen_urls = set(['/'])

In [24]:
urls_todo

{'/'}

In [None]:
fetching a page will require a series of callbacks. the connected callback
fies when a socket is connected, and sends a get request ot the server.


In [25]:
class Fetcher:
    def __init__(self, url):
        self.response = b''
        self.url = url
        self.sock = None
    
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        
        selector.register(self.sock.fileno(), 
                         EVENT_WRITE,
                         self.connected)
    

In [61]:
# Global Interpretor Lock
# GIL

import sys
a = []
b = a
c = b
d = c

In [62]:
sys.getrefcount(a)

5

In [60]:
sys.getrefcount(a)

4

In [50]:
e = d

In [51]:
e

4

In [64]:
# CPU-bound program
import time
from threading import Thread

COUNT = 50000000

def countdown(n):
    while n > 0:
        n -= 1
        
start = time.time()
countdown(COUNT)
end = time.time()

print('time in sec: ', end-start)

time in sec:  8.432587146759033


In [67]:
# Multithreaded Python (with GIL)

import time
from threading import Thread

COUNT = 50000000

def countdown(n):
    while n > 0:
        n -= 1
        
t1 = Thread(target = countdown, args=(COUNT//2,))
t2 = Thread(target = countdown, args=(COUNT//2,))

start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print(end-start)

9.159382104873657


In [66]:
((COUNT//2,)

tuple

In [70]:
vin_list = ['111', '222', '333', '444', '555', '666', '222']

In [71]:
from queue import Queue
q = Queue(maxsize=0)

results = [{} for x in vin_list]

In [72]:
results

[{}, {}, {}, {}, {}, {}, {}]

In [None]:
# multithreading for vin decoding
# num_theads = 200
num_theads = 500

tstart = time.time()

from threading import Thread
# Setting up the Queue
from queue import Queue
#set up the queue to hold all the urls
q = Queue(maxsize=0)
# Use many threads (50 max, or one for each url)

#Populating Queue with tasks
# results_full = [{} for x in vin_list]
results = [{} for x in vin_list]
results_full = copy.deepcopy(results)
# results_full = [{} for x in vin_list]

#load up the queue with the urls to fetch and the index for each job (as a tuple):
for i in range(len(vin_list)):
    #need the index and the url in each queue item.
    q.put((i,vin_list[i]))
    
    
# Threaded function for queue processing.
def vin_lu_threaded(q, result):
    while not q.empty():
        i,vin = q.get()                      #fetch new work from the Queue
        try:
            url = 'https://vin-decoder.dev.pod.tc/decode/vipr'
            payload = {"vin":vin}
            headers = {'Content-Type': 'application/json','Accept': 'application/json','token': '123'}
            r = requests.post(url, data=json.dumps(payload), headers=headers)   
            r_json = r.json()
            make,model,year,chromeStyleId = \
            r_json['make'].upper(),r_json['model'].upper(),r_json['modelYear'],r_json['chromeStyleId']
            results[i] = (make,model,year,chromeStyleId)  #Store data back at correct index
            results_full[i] = tuple([r_json[x] for x in ['viprMakeId', 'viprModelId', 'viprStyleId']])
        except:
            result[i] = ('','',0)
        #signal to the queue that task has been processed
        q.task_done()
    return True

#Starting worker threads on queue processing
for i in range(num_theads):
    logging.debug('Starting thread ', i)
    worker = Thread(target=vin_lu_threaded, args=(q,results))
    worker.setDaemon(True)    #setting threads as "daemon" allows main program to 
                              #exit eventually even if these dont finish 
                              #correctly.
    worker.start()
#now we wait until the queue has been processed
q.join()