# Recap

In the first lesson we learned to interpret the outermost layer of the Bitcoin Protocol onion: the [message structure](https://en.bitcoin.it/wiki/Protocol_documentation#Message_structure). We learned to send a `version` message to Bitcoin Network peers and listen for their `version` response. We learned to read this response and check that the correct `magic` bytes came at the beginning of the message; to interpret which of the 27 `command` types the message is; to read `payload` data associated with the command and the check the payload with a `checksum` given to us by our remote peer;

In the second lesson we learned to peel another layer or two of the onion. We learned to read the [payload](https://en.bitcoin.it/wiki/Protocol_documentation#version) of `version` messages. Along the way we had to figure out how to interpret all the sub-structures of the data, such as variable-length strings", variable-length integers, network addresses, `services` bitfields, Unix timestamps, and big-endian encoded port numbers.

We've come a long way, but we still have a long way to go.

# Getting To Know Each Other

Now that we can talk to our peers, let's be friendly neighbors and introduce ourself.

In this lesson we will connect to the nearl 10,000 Bitcoin Network peers that operate out in the open. We'll send each a `version` message and we'll record for their responses. Our first attempts at this will be far too slow and we will learn about "concurrent programming" -- a technique that frees our program to work on many things at once, in our case talking to Bitcoin Network peers.

Lastly we'll do some "data science" to find patterns in this sea of bytes. FIXME more words/

Let's get started!

# bitnodes.earn.com

The first thing we did in the first lesson was to pull up [this website](https://bitnodes.earn.com/nodes/) and look for the IP address of some other node to talk to. 

Now we're going to write some Python code to do this for us.

bitnodes.earn.com offers [a free, unauthenticated API](https://bitnodes.earn.com/api/#list-nodes) to help us do this. You've probably heard this word before -- API -- and you probably don't know exactly what it means. The acronym [API](https://en.wikipedia.org/wiki/Application_programming_interface) stands for "Application Programming Interface". An "Application Programming Interface" is a description of how a programmer can interact with a piece of software. For example, Python has an API for converting `bytes` to `int`s: [int.from_bytes(bytes, byteorder, \*, signed=False)](https://docs.python.org/3/library/stdtypes.html#int.from_bytes). Python defines this exact function allowing programmers to accomplish this exact operation. There are multiple different "implementations" of python -- CPython, PyPy, MicroPython etc -- and they all implement this same API.

So that's the original meaning of the term "application programming interface". But it's most frequently used describe this sort of thing in a specific domain: web programming. Please read this [explainer](https://medium.freecodecamp.org/what-is-an-api-in-english-please-b880a3214a82) of this more narrow definition of the term. The [earn.com API](https://bitnodes.earn.com/api/) is one such example of "API" in this sense of the word.

The earn.com API is free and also "unauthenticated" which means we don't have to present any kind of credential in order to use this -- stock market data APIs, for one, aren't so kind!

The API has this specific [List Nodes endpoint](https://bitnodes.earn.com/api/#list-nodes) which will give a list of every node they are aware of at present or some specific point in the past. We are able to specify 

To "exercise" this API we need to send a GET http request. This is the same sort of request that your browser sense every time you load a webpage. It just fetches data.

### cURL: A Terminal Utility

Go to your command line and type this in:

```
$ curl -H "Accept: application/json; indent=4" https://bitnodes.earn.com/api/v1/snapshots/latest/
```

(If you get any error you probably need to install the cURL program. Google it!)

This should spit a huge amoutn of "JSON" out onto your terminal. This is a complete list of all Bitcoin Network nodes which earn.com has been able to find.

### Requests: A Python Library

This is great, but we need to find a way to do this from Python. This is where the `requests` library comes in. Watch [this video](https://www.youtube.com/watch?v=_8HPCToXdAk) to learn how to use `requests`

##### Exercise #1: Use `requests.get` to make the same https request we made using cURL above.

Return a dictionary of the JSON response from the API 

another hint: [Relevant part](https://youtu.be/_8HPCToXdAk?t=3m12s) of Youtube video above.

hint: `.json()` get's the JSON response

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import requests

def get_bitnodes_api_response():
    BITNODES_URL = "https://bitnodes.earn.com/api/v1/snapshots/latest/"
    ### YOUR CODE ###
    raise NotImplementedError()

In [5]:
def get_bitnodes_api_response():
    BITNODES_URL = "https://bitnodes.earn.com/api/v1/snapshots/latest/"
    return requests.get(BITNODES_URL).json()

In [6]:
nodes_json = open("ibd/four/response.json").read()
nodes_dict = json.loads(example_json)

NameError: name 'json' is not defined

In [7]:
def test_get_bitnodes_api_response():
    BITNODES_URL = "https://bitnodes.earn.com/api/v1/snapshots/latest/"
    with requests_mock.mock() as mock:
        mock.get(BITNODES_URL, json=nodes_dict)
        response = get_bitnodes_api_response()
        assert response == nodes_dict

ipytest.run_tests(doctest=True)
ipytest.clean_tests("test_get_bitnodes_api_response*")

NameError: name 'ipytest' is not defined

#### Exercise #2: Call the bitnodes API and return just the `"nodes"` part of the JSON response

hint: relevant part of the YouTube video, where you grab the value corresponding to the `name` key from the `r.json()` response JSON dictionary. We're doing the same thing in this exercise, just looking up the `nodes` key instead of the `name` key.
```
r = requests.get("http://swapi.co/api/people/1")
r.json()['name']
```

In [8]:
def get_nodes():
    BITNODES_URL = "https://bitnodes.earn.com/api/v1/snapshots/latest/"
    ### YOUR CODE ###
    raise NotImplementedError()

In [9]:
def get_nodes():
    data = get_bitnodes_api_response()
    return data['nodes']

In [10]:
def test_get_nodes():
    BITNODES_URL = "https://bitnodes.earn.com/api/v1/snapshots/latest/"
    with requests_mock.mock() as mock:
        mock.get(url, json=example_data)
        nodes = get_nodes()
        assert nodes == nodes_dict['nodes']

ipytest.run_tests(doctest=True)
ipytest.clean_tests("test_get_nodes*")

NameError: name 'ipytest' is not defined

##### Exercise #FIXME: Turn the `nodes` object into a list of `ip:port` string addresses

_Notice that the keys of the `node` object are `ip:port`_

This exercise just asks you to turn a dictionary into a list of it's keys. There's a built-in `dict` method to do this. Look it up.

In [11]:
def nodes_to_address_strings(nodes):
    raise NotImplementedError()    

In [12]:
def nodes_to_address_strings(nodes):
    return nodes.keys()

In [13]:
mock_nodes = {
    "192.168.0.1:8333": {}, # ipv4
    "FE80:CD00:0:CDE:1257:0:211E:729C:8333": {}, # ipv6
}

def test_nodes_to_address_strings():
    address_strings = nodes_to_address_strings(mock_nodes)
    solution_set = {"192.168.0.1:8333", "FE80:CD00:0:CDE:1257:0:211E:729C:8333"}
    assert set(address_strings) == solution_set

ipytest.run_tests(doctest=True)
ipytest.clean_tests("test_nodes_to_address_strings*")

NameError: name 'ipytest' is not defined

##### Exercise #FIXME: Turn the `nodes` object into a list of `(ip, port)` tuples where ip is a string and port is an integer

If you recall, [`socket.connect`](https://docs.python.org/3/library/socket.html#socket.socket.connect) takes such a tuple as its argument. This is why I want you to do this. Once we have a list of every such tuple we can iterate across it and connect to every node.

note: this is a challenging exercise

FIXME: explain this as the gameplan / objective at the beginning.

In [14]:
def nodes_to_address_tuples(nodes):
    raise NotImplementedError()

In [15]:
def nodes_to_address_tuples(nodes):
    address_strings = nodes.keys()
    address_tuples = []
    for address_string in address_strings:
        ip, port = address_string.rsplit(":", 1)
        address_tuple = (ip, int(port))
        address_tuples.append(address_tuple)
    return address_tuples

In [16]:
mock_nodes = {
    "192.168.0.1:8333": {}, # ipv4
    "FE80:CD00:0:CDE:1257:0:211E:729C:8333": {}, # ipv6
}
solution_set = {
    ("192.168.0.1", 8333), 
    ("FE80:CD00:0:CDE:1257:0:211E:729C", 8333),
}

def test_nodes_to_address_tuples():
    address_tuples = nodes_to_address_tuples(mock_nodes)
    assert set(address_tuples) == solution_set

ipytest.run_tests(doctest=True)
ipytest.clean_tests("test_nodes_to_address_tuples*")

NameError: name 'ipytest' is not defined

# Calling All Nodes!

Now we have a list of address tuples -- just like the `socket.connect` API uses. Let's iterate over them and download version messages from every node

In [166]:
import socket
from ibd.two.complete import Packet, VersionMessage # get the final version ...

OUR_VERSION = b'\xf9\xbe\xb4\xd9version\x00\x00\x00\x00\x00j\x00\x00\x00\x9b"\x8b\x9e\x7f\x11\x01\x00\x0f\x04\x00\x00\x00\x00\x00\x00\x93AU[\x00\x00\x00\x00\x0f\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0f\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00rV\xc5C\x9b:\xea\x89\x14/some-cool-software/\x01\x00\x00\x00\x01'

def get_version_message(address_tuple, logger=None):
    # FIXME: get rid of this logger. Do this from functions.
    if logger:
        logger()
    sock = socket.socket()
    sock.settimeout(1) # only wait 1 second for connections / responses
    
    sock.connect(address_tuple)

    # initiate the "version handshake"
    sock.send(OUR_VERSION)

    # receive their "version" response
    packet = Packet.from_socket(sock)

    version_message = VersionMessage.from_bytes(packet.payload)
    return version_message
    
def get_version_messages(address_tuples):
    version_messages = []
    exceptions = []
    for address_tuple in address_tuples:
        try:
            version_message = get_version_message(address_tuple)
        except Exception as e:
            exceptions.append(e)
            continue
        version_messages.append(version_message)
        
        successes = len(version_messages)
        total = len(address_tuples)
        failures = len(exceptions)
        remaining = total - (success + failures)
        progress = (successes + failures) / total
        print(f"{successes} Received | {failures} Failures | {remaining} Remaining | {progress:.3f}% Complete")
        

In [23]:
nodes = get_nodes()
address_tuples = nodes_to_address_tuples(nodes)
get_version_messages(address_tuples)

KeyboardInterrupt: 

After about 10 seconds of waiting for this cell to finish executing, I hope you start to wonder if our code might be running too slow? What's going on? Are we progressing? Are we stuck?

It's time to add a little logging to better understand what's happening

In [82]:
import time

def get_version_messages_logger(address_tuples, version_messages, exceptions, start_time):
    successes = len(version_messages)
    total = len(address_tuples)
    failures = len(exceptions)
    now = time.time()
    elapsed = now - start_time
    
    remaining = total - (successes + failures)
    progress = (successes + failures) / total
    seconds_remaining = elapsed / progress
    minutes_remaining = seconds_remaining / 60
    
    print(f"{successes} Received | {failures} Failures | {remaining} Remaining | {progress:.3f}% Complete | ~{minutes_remaining:.0f} Minutes Left")

def get_version_messages(address_tuples, logger=False):
    version_messages = []
    exceptions = []
    start_time = time.time()
    for address_tuple in address_tuples:
        try:
            version_message = get_version_message(address_tuple)
        except Exception as e:
            exceptions.append(e)
            continue
        version_messages.append(version_message)
        if logger:
            logger(address_tuples, version_messages, exceptions, start_time)

In [None]:
nodes = get_nodes()
address_tuples = nodes_to_address_tuples(nodes)
get_version_messages(address_tuples, logger=get_version_messages_logger)

# Profiling -- Exactly Where is our Code Slow?

Do you feel like waiting around for an hour for all these version messages to download? I don't ...

In order to improve our lot, we first need to understand _why_ our code is slow. Analyzing the speed of a program is one aspect of the discipline of ["profiling"](https://en.wikipedia.org/wiki/Profiling_(computer_programming)).

To profile our slow code and understand figure out why it's so slow, we're going to use a tool called [line_profiler](https://github.com/rkern/line_profiler/). [Here is a nice tutorial](https://jakevdp.github.io/PythonDataScienceHandbook/01.07-timing-and-profiling.html) that describes a few methods of profiling python code, including line_profiler. Please read it.

To use `version_profiler` we first, we load line_profiler as an Jupyter extension. Next, we run our `get_version_message` function through it:

In [67]:
%load_ext line_profiler

The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


In [68]:
%lprun -f get_version_message get_version_message(address_tuples[1])

You should see something like this at the bottom of your Jupyter window:

![image](images/profiler.png)

If you look in the "% Time" column, you will see that the `sock.connect` and `sock.recv` (called by `Packet.from_socket`) calls are each taking up about 50% of the time. It's not because these functions are "slow" or "unoptimized" -- no, it's because they're waiting for a response from our peer; they're "blocked". And this function is blocked, the Python interpreter can't do any other work.

Concurrent programming techniques offer away around some of the problems of blocking code. They allow us to chunk our programe into bite-sized tasks which your computer switch between whenever one gets blocked, and then picking each task back up every time they are un-blocked (e.g. our peer accepts the TCP connection from `sock.connect` and it returns).

But concurrent programming (multi-threading being one approach to concurrency) can be very difficult:

![image](./images/this-tall.jpg)

We'll need concurrency, however, when we write our initial-block-downloader, so let's dip our toes into Python concurrency.

Please read [this tutorial](https://code.tutsplus.com/articles/introduction-to-parallel-and-concurrent-programming-in-python--cms-28612), and stop at the "Gevent" section. You'll learn to write a concurrent web-scraper using multiple "threads" and multiple "processes". And take note: the `get_version_messages` function we're trying to speed up is basically a web scraper. Try to anticipate how these techniques apply to our situation. Can you write a multi-threaded or multi-process `get_version_messages` function?

# Multithreading

In [115]:
import os, multiprocessing, threading

def concurrent_log():
    print(f"PID: {os.getpid()}", end=" | ")
    print(f"Process Name: {multiprocessing.current_process().name}", end=" | ")
    print(f"Thread Name: {threading.current_thread().name}", end="\n")

def get_version_messages_threaded_demo(addresses):
    start = time.time()
    threads = []

    # spawn 10 threads and start them
    # append the threads to `threads` list so that we can wait for them to finish
    # one problem -- can't get the results!
    for address in addresses:
        # FIXME exceptions not handled
        thread = threading.Thread(target=get_version_message, 
                         args=(address,), 
                         kwargs={"logger": concurrent_log})
        threads.append(thread)
        thread.start()

    for thread in threads:
        # wait for each thread to finish executing
        thread.join()
        
    end = time.time()
    print(f"It took {end - start} seconds")

In [116]:
# note: if you run this enough times you may notice that the printing gets messed up. 
# This is because the different threads might print at exactly the same time, 
# making their output
get_version_messages_threaded_demo(address_tuples[:4])

PID: 31945 | Process Name: MainProcess | Thread Name: Thread-95
PID: 31945 | Process Name: MainProcessPID: 31945 | Thread Name: Thread-96 | Process Name: MainProcess | Thread Name: Thread-97

PID: 31945 | Process Name: MainProcess | Thread Name: Thread-98
It took 0.7151916027069092 seconds


# Race Conditions

If you run this enough times you may notice that the printing gets messed up. This bug has occurred in the image below. The second line of the image contains two messages, but the fourth lines is empty! This is because the different threads might print at exactly the same time, making their output interfere. This is called a "race condition", and it's the worst enemy of the multi-threaded program. If you'd like to learn more, check out [this phenominal talk on concurrency by Python core contributer Raymond Hettinger](https://www.youtube.com/watch?v=Bv25Dwe84g).

![image](images/threading-bug.png)

We'll use a technique covered in the video to demonstrate how to make the race conditions worse. If we put tiny little `time.sleep` calls in our code, things no longer happen in the order we were expecting.

TODO: more explanation of why this breaks

In [117]:
import random

def fuzz():
    time.sleep(random.random() / 10)

def concurrent_log():
    print(f"PID: {os.getpid()}", end=" | ")
    fuzz()
    print(f"Process Name: {multiprocessing.current_process().name}", end=" | ")
    fuzz()
    print(f"Thread Name: {threading.current_thread().name}", end="\n")


In [118]:
print("Our Synchronous code has no race conditions")
print("===========================================")
for address in address_tuples[:4]:
    get_version_message(address, logger=concurrent_log)

print("\n")

print("Threaded code easily owned by race conditions")
print("=============================================")

get_version_messages_threaded_demo(address_tuples[:4])

Synchronous code never has race conditions
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread


Threaded code easily owned by race conditions
PID: 31945 | PID: 31945PID: 31945 |  | PID: 31945 | Process Name: MainProcess | Thread Name: Thread-100
Process Name: MainProcess | Process Name: MainProcess | Process Name: MainProcess | Thread Name: Thread-99
Thread Name: Thread-102
Thread Name: Thread-101
It took 0.9241952896118164 seconds


See how nicely the normal, synchronous code prints its little log statements?

See *exactly the same logging code* produces unreadable output when run in different threads?

How can we fix it?

The answer is to only let one thread print -- the `MainThread`. We can accomplish this by having our logger send every message to the main thread via a simple "queue" instead of printing it.

In [150]:
import queue

log_queue = queue.Queue()

def log_producer():
    message = f"PID: {os.getpid()} | Process Name: {multiprocessing.current_process().name} | Thread Name: {threading.current_thread().name}"
    log_queue.put(message)
    
def log_consumer():
    while True:
        try:
            message = log_queue.get(timeout=1)
        except queue.Empty:
            # Queue is empty
            break
        print(message)
    
def get_version_messages_threaded_demo(addresses):
    start = time.time()
    threads = []

    # spawn 10 threads and start them
    # append the threads to `threads` list so that we can wait for them to finish
    # one problem -- can't get the results!
    for address in addresses:
        # FIXME exceptions not handled
        thread = threading.Thread(target=get_version_message, 
                         args=(address,), 
                         kwargs={"logger": log_producer})
        threads.append(thread)
        thread.start()

    for thread in threads:
        # wait for each thread to finish executing
        thread.join()
        
    end = time.time()
    
    log_consumer()
    
    print(f"It took {end - start} seconds")

In [136]:
print("Our Synchronous code has no race conditions")
print("===========================================")
for address in address_tuples[:4]:
    get_version_message(address, logger=concurrent_log)

print("\n")

print("Queues helps eliminate race conditions in concurrent code")
print("=========================================================")

get_version_messages_threaded_demo(address_tuples[20:24])

Our Synchronous code has no race conditions
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread
PID: 31945 | Process Name: MainProcess | Thread Name: MainThread


Queues helps eliminate race conditions in concurrent code
PID: 31945 | Process Name: MainProcess | Thread Name: Thread-135
PID: 31945 | Process Name: MainProcess | Thread Name: Thread-136
PID: 31945 | Process Name: MainProcess | Thread Name: Thread-137
PID: 31945 | Process Name: MainProcess | Thread Name: Thread-138
It took 0.4343259334564209 seconds


This looks nice, but we still have another problem: how can we retrieve the version message from the thread when it finishes executing? Python doesn't have a built-in way to extrace "return value" from a thread in this way.

What can we do? Hint: it starts with a "Q"!

Previously we created a `log_queue` to keep track of our logging messages until our main thread got around to printing them out for us.

Now, we will create a `version_message_queue` which allows each thread to submit something like their "return value".

In [144]:
import queue

version_message_queue = queue.Queue()
    
def get_version_messages_threaded_demo(addresses):
    start = time.time()
    threads = []
    
    # FIXME: this style is different from the logging code 
    # in get_version_message choose one style or the other
    def target(*args, **kwargs):
        q = kwargs.pop("result_queue")
        result = get_version_message(*args, **kwargs)
        q.put(result)
            
    for address in addresses:
        thread = threading.Thread(target=target, 
                         args=(address,), 
                         kwargs={"result_queue": version_message_queue})
        threads.append(thread)
        thread.start()

    for thread in threads:
        # wait for each thread to finish executing
        thread.join()
        
    end = time.time()
    
    log_consumer()
    
    while True:
        try:
            version_message = version_message_queue.get(timeout=1)
        except queue.Empty:
            # Queue is empty
            break
        print(version_message)
    
    print(f"It took {end - start} seconds")

In [147]:
get_version_messages_threaded_demo(address_tuples[80:81])

+------------------+----------------------------------------------------------------------------------+
| VersionMessage   |                                                                                  |
| version          | 70015                                                                            |
+------------------+----------------------------------------------------------------------------------+
| services         | {'NODE_NETWORK': True, 'NODE_GETUTXO': False, 'NODE_BLOOM': True, 'NODE_WITNESS' |
|                  | : True, 'NODE_NETWORK_LIMITED': True}                                            |
+------------------+----------------------------------------------------------------------------------+
| timestamp        | 2018-08-06 20:46:47                                                              |
+------------------+----------------------------------------------------------------------------------+
| addr_recv        | <Address ::ffff:4671:5047:50766>           

There we go. Our threaded code is now acceptible. It doesn't have any race conditions and the main thread can get `version` message "return values" from all the threads it spawns.



# Multiprocessing

Here's how to accomplish concurrent version message downloads using 

In [181]:
log_queue = multiprocessing.Queue()
version_message_queue = multiprocessing.Queue()

# FIXME: pass queues as argument so we don't have to redefine when new queues are introduced
def log_producer():
    message = f"PID: {os.getpid()} | Process Name: {multiprocessing.current_process().name} | Thread Name: {threading.current_thread().name}"
    log_queue.put(message)

# FIXME: pass queues as argument so we don't have to redefine when new queues are introduced
def log_consumer():
    while True:
        try:
            message = log_queue.get(timeout=1)
        except queue.Empty:
            # Queue is empty
            break
        print(message)

def version_message_consumer():
    while True:
        try:
            version_message = version_message_queue.get(timeout=1)
        except queue.Empty:
            # Queue is empty
            break
        print("ver", version_message)
    

# FIXME: maybe add some kwargs to specify whether we want to print the version message or the logging statements
def get_version_messages_multiprocess_demo(addresses, log=False, results=False):
    start = time.time()
    processes = []
    
    # FIXME: this style is different from the logging code 
    # in get_version_message choose one style or the other
    def target(*args, **kwargs):
        q = kwargs.pop("result_queue", None)
        logger = kwargs.pop("logger", None)
        result = get_version_message(*args, **kwargs)
        if logger:
            logger()
        if q:
            q.put(result)
    
    # FIXME one passes producer one passes queue
    # FIXME these set very differently-named keys
    target_kwargs = {}
    if log:
        target_kwargs["logger"] = log_producer
    if results:
        target_kwargs["result_queue"] = version_message_queue
   
    for address in addresses:
        process = multiprocessing.Process(target=target, 
                         args=(address,), 
                         kwargs=target_kwargs)
        processes.append(process)
        process.start()

    for process in processes:
        # wait for each process to finish executing
        process.join()
        
    end = time.time()
    
    if log:
        log_consumer()
    if results:
        version_message_consumer()
        
    print(f"It took {end - start} seconds")

In [175]:
get_version_messages_multiprocess_demo(address_tuples[9:11], log=True)

None
None
PID: 4338 | Process Name: Process-29 | Thread Name: MainThread
PID: 4341 | Process Name: Process-30 | Thread Name: MainThread
It took 0.4035048484802246 seconds


In [183]:
# FIXME: this is printing None's WTFFFFF

get_version_messages_multiprocess_demo(address_tuples[:4], results=True)

None
None
None
None
ver +------------------+----------------------------------------------------------------------------------+
| VersionMessage   |                                                                                  |
| version          | 70015                                                                            |
+------------------+----------------------------------------------------------------------------------+
| services         | {'NODE_NETWORK': True, 'NODE_GETUTXO': False, 'NODE_BLOOM': True, 'NODE_WITNESS' |
|                  | : True, 'NODE_NETWORK_LIMITED': True}                                            |
+------------------+----------------------------------------------------------------------------------+
| timestamp        | 2018-08-06 23:53:10                                                              |
+------------------+----------------------------------------------------------------------------------+
| addr_recv        | <Address ::ffff:467

It took 0.7719259262084961 seconds


OK. No know how to do threaded and multi-process code. How do they stack up against good old synchronous cod

In [167]:
# FIXME fill in with the x_demo functions we've written

def concurrent_demo(addresses):
    addresses = addresses[:8]

    start_time = time.time()
    for address in addresses:
        get_version_message(address, logger=concurrent_log)
    end_time = time.time()

    print("Serial time=", end_time - start_time)

    start_time = time.time()
    threads = [
        threading.Thread(target=get_version_message, args=(address,), kwargs={"logger": concurrent_log})
        for address in addresses
    ]
    [thread.start() for thread in threads]
    [thread.join() for thread in threads]
    end_time = time.time()

    print("Threads time=", end_time - start_time)

    start_time = time.time()
    processes = [
        multiprocessing.Process(
            target=get_version_message, args=(address,), kwargs={"logger": concurrent_log}
        )
        for address in addresses
    ]
    [process.start() for process in processes]
    [process.join() for process in processes]
    end_time = time.time()

    print("Parallel time=", end_time - start_time)

In [84]:
concurrent_demo(address_tuples[:4])

PID: 31945, Process Name: MainProcess, Thread Name: MainThread
PID: 31945, Process Name: MainProcess, Thread Name: MainThread
PID: 31945, Process Name: MainProcess, Thread Name: MainThread
PID: 31945, Process Name: MainProcess, Thread Name: MainThread
Serial time= 1.656128168106079
PID: 31945, Process Name: MainProcess, Thread Name: Thread-19
PID: 31945, Process Name: MainProcess, Thread Name: Thread-20
PID: 31945, Process Name: MainProcess, Thread Name: Thread-21
PID: 31945, Process Name: MainProcess, Thread Name: Thread-22
Threads time= 0.4682736396789551
PID: 17717, Process Name: Process-5, Thread Name: MainThread
PID: 17720, Process Name: Process-6, Thread Name: MainThread
PID: 17723, Process Name: Process-7, Thread Name: MainThread
PID: 17724, Process Name: Process-8, Thread Name: MainThread
Parallel time= 0.4868483543395996


Looks at that!

Using the same code from the tutorial we are able to speed up our function 3 times!

Take note:
* The the first block of code runs entirely within the same "MainProcess" and "MainThread"
* The second, threaded block of code runs entirely within the "MainProcess" but within 4 different threads attached to that "MainProcess".
* The third, multi-process block of code spawns executes across 4 different process, but within each process the code executes in the "MainThread"
* Lastly, not that the multi-threaded version is a little faster than the multi-process version. This is because threads are a little "lighter weight" than processes so they start faster, and we aren't doing any CPU-intensive number crunching where multi-processing is able to spread the work across the multiple cores of your laptop.

Downloader TODO
* make a short list of known-good, always up nodes from the bitnodes leaderboard. use these for the little demos.
* extract start and stop times for ^^ and print using matplotlib
    * wouldn't it be dope if more than 1 task ran at a time?
    * maybe also pull out some other timestamps demonstrating that all time is spent waiting for the remote peer.
* `get_versions_threaded(addrs)`
    * first just prints
    * second uses queue to communicate with main thread
* `get_versions_multiprocess(addrs)
* Print out the cool display showing how threads / processes are fast and how they're actually doing work in different threads / processes
* Graph how they're doing work concurrently
* (optional) given an example of a cpu-intensive task where multipprocessing excels (fib in a nod to David Beazley?)
* async / await version with `curio`
    * let's avoid having to use TaskGroup initially ... just obscures what's going on
* someday, write a crawler just like bitnodes (we're kind of trusting them right now)

Conclusion
* some of this may have seemed like a bit of a needless tangent, but I assure it was not. When we finally implement our intial-block-downloader we will need to stay connected to many peers at the same time and concurrently download blocks from each of them. Our code must not have race conditions, and it must have a central, controlling task manager that can supervise the connections to our peers and assemble valid blockchain. We will spend a lot of time profiling and optimizing our code because initial block download must be as fast as possible.

Homework:
* Make some kind of graphical representation of the data we receive from our peers
* This is the bitnodes crawler: https://github.com/ayeowch/bitnodes. Try to write your own. This would simply involve connecting to a peer, sending them a `getaddr` message, listening for the `addr` response, then doing the verack handshake with each address contained in the `addr` message, and repeat -- taking care to log each version message you receive. Not that bitnodes skips any nodes running versions < 70001. Can you find some nodes with lower version numbers than that?
* A related idea to the above idea -- what if you sent `getaddr` messages to every peer that bitnodes gave us, listened for and saved the convents of each peer's `addr` response. Then you see if our peers will tell us about any nodes that bitnodes doesn't includ -- this is probably the best approach to find old nodes.

Data Science TODO
* 