In [None]:
import sys

# FIXME
sys.path.append('/home/justin/dev/cypher-school/')

In [None]:
%load_ext line_profiler
%load_ext autoreload
%autoreload 2

import os, requests, json, ipytest, requests_mock, socket, queue, random, time, multiprocessing, threading, collections
from ipaddress import ip_address
from sys import stdout
import numpy as np
import matplotlib.pyplot as plt
import curio

from ibd.two.complete import *

In [None]:
def get_nodes():
    url = "https://bitnodes.earn.com/api/v1/snapshots/latest/"
    response = requests.get(url)
    return response.json()["nodes"]

def nodes_to_address_tuples(nodes):
    address_strings = nodes.keys()
    address_tuples = []
    for address_string in address_strings:
        ip, port = address_string.rsplit(":", 1)
        
        # FIXME
        ip = ip.replace('[','').replace(']','')
        
        address_tuple = (ip, int(port))
        address_tuples.append(address_tuple)
    return address_tuples

def get_addresses():
    nodes = get_nodes()
    address_tuples = nodes_to_address_tuples(nodes)
    return address_tuples

In [None]:
def get_version_messages_logger(address_tuples, version_messages, exceptions, start_time):
    successes = len(version_messages)
    total = len(address_tuples)
    failures = len(exceptions)
    now = time.time()
    elapsed = now - start_time
    
    remaining = total - (successes + failures)
    progress = (successes + failures) / total
    rate = (successes + failures) / elapsed

    import math
    rate = max(1, rate)
    
    seconds_remaining = remaining / rate
    minutes_remaining = seconds_remaining / 60
    
    print(f"{successes} Received | {failures} Failures | {remaining} Remaining | {progress*100:.3f}% Complete | ~{minutes_remaining:.1f} Minutes Left")


In [None]:
from ibd.two.complete import *

VERSION = b'\xf9\xbe\xb4\xd9version\x00\x00\x00\x00\x00j\x00\x00\x00\x9b"\x8b\x9e\x7f\x11\x01\x00\x0f\x04\x00\x00\x00\x00\x00\x00\x93AU[\x00\x00\x00\x00\x0f\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0f\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00rV\xc5C\x9b:\xea\x89\x14/some-cool-software/\x01\x00\x00\x00\x01'

async def async_read_magic(sock):
    magic_bytes = await sock.recv(4)
    magic = bytes_to_int(magic_bytes)
    return magic


async def async_read_command(sock):
    raw = await sock.recv(12)
    # remove empty bytes
    command = raw.replace(b"\x00", b"")
    return command


async def async_read_length(sock):
    raw = await sock.recv(4)
    length = bytes_to_int(raw)
    return length


async def async_read_checksum(sock):
    # FIXME: protocol documentation says this should be an integer ...
    raw = await sock.recv(4)
    return raw


async def async_read_payload(sock, length):
    payload = await sock.recv(length)
    return payload


async def async_read_message(sock):
    magic = await async_read_magic(sock)
    command = await async_read_command(sock)
    length = await async_read_length(sock)
    checksum = await async_read_checksum(sock)
    payload = await async_read_payload(sock, length)
    return magic + command + length + checksum + payload


class Packet:
    def __init__(self, command, payload):
        self.command = command
        self.payload = payload

    @classmethod
    def from_socket(cls, sock):
        magic = read_magic(sock)
        if magic != NETWORK_MAGIC:
            raise RuntimeError(f'Network magic "{magic}" is wrong')

        command = read_command(sock)
        payload_length = read_length(sock)
        checksum = read_checksum(sock)
        payload = read_payload(sock, payload_length)

        calculated_checksum = calculate_checksum(payload)
        if calculated_checksum != checksum:
            raise RuntimeError("Checksums don't match")

        if payload_length != len(payload):
            raise RuntimeError(
                "Tried to read {payload_length} bytes, only received {len(payload)} bytes"
            )

        return cls(command, payload)

    @classmethod
    async def async_from_socket(cls, sock):
        magic = await async_read_magic(sock)

        if magic != NETWORK_MAGIC:
            raise RuntimeError(f'Network magic "{magic}" is wrong')

        command = await async_read_command(sock)
        payload_length = await async_read_length(sock)
        checksum = await async_read_checksum(sock)
        payload = await async_read_payload(sock, payload_length)

        calculated_checksum = calculate_checksum(payload)
        if calculated_checksum != checksum:
            raise RuntimeError("Checksums don't match")

        if payload_length != len(payload):
            raise RuntimeError(
                "Tried to read {payload_length} bytes, only received {len(payload)} bytes"
            )

        return cls(command, payload)


async def connect_async(addr):
    sock = curio.socket.socket()
    # curio.timeout_after(sock.connect, addr)
    await sock.connect(addr)
    await sock.send(VERSION)
    pkt = await Packet.async_from_socket(sock)
    msg = VersionMessage.from_bytes(pkt.payload)
    await sock.close()
    return msg


    
    
async def connect_many_async(addrs):
    start_time = time.time()
    version_messages = []
    exceptions = []
    
    async with curio.TaskGroup() as g:
        # Create some tasks
        for addr in addrs:
            await g.spawn(connect_async, addr)
        async for task in g:
            try:
                version_message = await task.join()
                version_messages.append(version_message)
#                 print("Success:", result)
                get_version_messages_logger(addrs, version_messages, exceptions, start_time)

            except curio.TaskError as e:
                exceptions.append(e)
#                 print("Failed:", e)
    return version_messages, exceptions

def run_connect_many_async(addrs):
    return curio.run(connect_many_async, addrs)

In [None]:
# define the addresses if they aren't defined
try:
    addresses
except:
    addresses = get_addresses()

In [None]:
version_messages, exceptions = run_connect_many_async(addresses[:10])

In [None]:
version_messages

In [None]:
exceptions

In [None]:
for e in exceptions:
    print(e.__cause__)