In [1]:
import socket
import binascii
import socket
import struct
import codecs
import time
import random
import hashlib
from datetime import datetime
import io
from test_framework.messages import *

#### Methods slightly adjusted from https://github.com/bitcoin/bitcoin/tree/master/test/functional/test_framework

#### Settings

In [2]:
settings_dict = {
    'magic'          : 0x0709110B,

    #dstAddr         = "8.9.3.218"
    'dstAddr'         : "82.197.160.8",   # node Thush
    'dstPort'         : 18333,

    'max_n_bits'      : 0x1d00ffff,
    'version'        : 70015,
    'buff_size'       : 0x02000000,  # https://github.com/bitcoin/bitcoin/blob/60abd463ac2eaa8bc1d616d8c07880dc53d97211/src/serialize.h#L23
}

In [3]:
# from p2p.py

MESSAGEMAP = {
    b"addr": msg_addr,
    b"addrv2": msg_addrv2,
    b"block": msg_block,
    b"blocktxn": msg_blocktxn,
    b"cfcheckpt": msg_cfcheckpt,
    b"cfheaders": msg_cfheaders,
    b"cfilter": msg_cfilter,
    b"cmpctblock": msg_cmpctblock,
    b"feefilter": msg_feefilter,
    b"filteradd": msg_filteradd,
    b"filterclear": msg_filterclear,
    b"filterload": msg_filterload,
    b"getaddr": msg_getaddr,
    b"getblocks": msg_getblocks,
    b"getblocktxn": msg_getblocktxn,
    b"getdata": msg_getdata,
    b"getheaders": msg_getheaders,
    b"headers": msg_headers,
    b"inv": msg_inv,
    b"mempool": msg_mempool,
    b"merkleblock": msg_merkleblock,
    b"notfound": msg_notfound,
    b"ping": msg_ping,
    b"pong": msg_pong,
    b"sendaddrv2": msg_sendaddrv2,
    b"sendcmpct": msg_sendcmpct,
    b"sendheaders": msg_sendheaders,
    b"tx": msg_tx,
    b"verack": msg_verack,
    b"version": msg_version,
    b"wtxidrelay": msg_wtxidrelay,
}

In [4]:
class Message:
    __slots__ = ("addrFrom", "nPortFrom", "addrTo", "nPortTo", "nNonce", "relay", "nServices",
                 "nStartingHeight", "nTime", "nVersion", "strSubVer","bMagic", "bCommand","timestamp","with_payload")
    
    def __init__(self, bCommand = b'undefined', addrTo = '0.0.0.0', nPortTo = 0, bMagic= 0x0709110B, nVersion= 70015, nServices= 0,with_payload=True):
        self.nNonce = random.getrandbits(64)
        self.nStartingHeight = 596306
        self.relay = False
        self.nPortFrom = 0
        self.addrFrom = ""
        self.bCommand = bCommand
        self.addrTo = addrTo
        self.nPortTo = nPortTo
        self.with_payload = with_payload
        self.bMagic = bMagic
        self.nVersion = nVersion
        self.nServices  = nServices

    def serialize(self):
        # header 
        header     = b""
        header     += struct.pack("i",self.bMagic)                    # magic value                           
        header     += struct.pack("<12s",self.bCommand)                     # command
        
        # payload
        payload    = b""
        if self.with_payload:
            payload    += struct.pack("i", self.nVersion)
            payload    += struct.pack("Q", self.nServices)
            payload    += struct.pack("q", int(time.time()))
            payload    += struct.pack("Q",0);

            payload    +=b"\x00" * 10 + b"\xff" * 2
            payload    += socket.inet_aton(self.addrTo)
            payload    += struct.pack(">H",self.nPortTo)                    # Receiver Port
            payload    += struct.pack("Q",0)
            payload    += struct.pack(">16s",bytes(self.addrFrom,'utf-8'))  # Sender IP Address
            payload    += struct.pack(">H",self.nPortFrom)                  # Sender Port
        if self.with_payload or self.bCommand == b'pong' :
            payload    += struct.pack("<Q", self.nNonce)
        if self.with_payload:
            payload    += struct.pack("B",0)                                # Bytes in version string
            payload    += struct.pack("i", self.nStartingHeight)
            payload    += struct.pack("?", self.relay)
        
        # length and checksum
        length     = struct.pack("I", len(payload));
        checksum   = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4];
        
        return header + length + checksum + payload

In [5]:
import asyncio
import queue

class SimpleGetAddrClient(asyncio.Protocol):
    def __init__(self,settings_dict, on_con_lost):
        self.on_con_lost = on_con_lost
        self.transport = None
        self.recvbuf = b""
        self.settings_dict = settings_dict
        self.peers = []

    def connection_made(self, transport):
        # safe transport connection
        self.transport = transport
        
        # write version message on connection
        message = Message(bCommand=b'version', addrTo=self.settings_dict['dstAddr'],nPortTo=self.settings_dict['dstPort'])
        print(f"Sending {message.bCommand}")
        transport.write(message.serialize())

    def data_received(self, data):
        if(len(data) > 0):
            self.recvbuf += data
            self.on_data()
        
    def on_data(self):
        try:
            while True:
                if len(self.recvbuf) < 4:
                    return
                
                # Check if magic bytes match (mainnet, testnet3 or regtest)
                if self.recvbuf[:4] != self.settings_dict['magic'].to_bytes(4, 'little'):
                    raise ValueError("magic bytes mismatch: {} != {}".format(repr(self.settings_dict['magic']), repr(self.recvbuf)))
                    
                # Message with headers is 4 + 12 + 4 + 4 bytes long
                if len(self.recvbuf) < 4 + 12 + 4 + 4:
                    return
                
                # Extract message type
                msg_type = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
                msg_len = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
                checksum = self.recvbuf[4+12+4:4+12+4+4]
                if len(self.recvbuf) < 4 + 12 + 4 + 4 + msg_len:
                    return
                
                # Remove headers from message
                msg = self.recvbuf[4+12+4+4:4+12+4+4+msg_len]
                
                # Check if payload is not corrupted
                th = hashlib.sha256(msg).digest()
                h = hashlib.sha256(th).digest()
                if checksum != h[:4]:
                    raise ValueError("got bad checksum " + repr(self.recvbuf))
                    
                # Received Message from buffer
                self.recvbuf = self.recvbuf[4+12+4+4+msg_len:]
                if msg_type not in MESSAGEMAP:
                    raise ValueError("Received unknown msgtype from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, msg_type, repr(msg)))
                f = io.BytesIO(msg)
                t = MESSAGEMAP[msg_type]()
                t.deserialize(f)
                print(f"Received {msg_type}")
                #self._log_message("receive", t)
                self.on_message(t)
        except Exception as e:
            #logger.exception('Error reading message:', repr(e))
            print(f"Exception {e}")
            raise
            
            
    def on_message(self, message):
        try:
            msgtype = message.msgtype.decode('ascii')
            getattr(self, 'on_' + msgtype)(message)
        except:
            print(f"ERROR delivering {repr(message)}")
            raise
            
            
    def on_version(self, message):
        msg = Message(bCommand=b'verack', addrTo=self.settings_dict['dstAddr'],nPortTo=self.settings_dict['dstPort'])
        print(f"Sending {msg.bCommand}")
        self.transport.write(msg.serialize())
        
        
    def on_verack(self, message):
        msg = Message(bCommand=b'getaddr', addrTo=self.settings_dict['dstAddr'],nPortTo=self.settings_dict['dstPort'],with_payload=False)
        print(f"Sending {msg.bCommand}")
        self.transport.write(msg.serialize())
        
        
    def on_sendheaders(self, message):
        pass
    
    def on_ping(self, message):
        msg = Message(bCommand=b'pong', addrTo=self.settings_dict['dstAddr'],nPortTo=self.settings_dict['dstPort'],with_payload=False)
        print(f"Sending {msg.bCommand}")
        self.transport.write(msg.serialize())
    
    def on_pong(self, message):
        pass
    
    def on_addr(self, message):
        
        for addr in message.addrs:
            self.peers.append(addr)

    def on_feefilter(self,message):
        pass
    
    def on_sendcmpct(self, message):
        pass
    
    def on_inv(self, message):
        pass

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)

    

async def main(client_, loop_):
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    #loop = asyncio.get_running_loop()

    #on_con_lost = loop.create_future()

    transport, protocol = await loop_.create_connection(
        lambda: client_,
                settings_dict['dstAddr'], settings_dict['dstPort'])

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()

        
loop = asyncio.get_event_loop()        
on_con_lost = loop.create_future()
client = SimpleGetAddrClient(settings_dict, on_con_lost)


future = asyncio.run_coroutine_threadsafe(main(client, loop), loop)

Sending b'version'
Received b'version'
Sending b'verack'
Received b'verack'
Sending b'getaddr'
Received b'sendheaders'
Received b'sendcmpct'
Received b'sendcmpct'
Received b'ping'
Sending b'pong'
Received b'addr'
Received b'feefilter'
Received b'addr'


In [6]:
ips = []
ports = []

for peer in client.peers:
    ips.append(peer.ip)
    ports.append(peer.port)
    
    

In [7]:
import pandas as pd
import numpy as np

df = pd.DataFrame(list(zip(ips, ports)),
               columns =['IP', 'Port'])
df = df.iloc[np.argsort(list(map(socket.inet_aton,df['IP'])))]

for i in range(df.shape[0]):
    print(f"{df.iloc[i]['IP']:>16}:{df.iloc[i]['Port']}")

         0.0.0.0:18333
         0.0.0.1:18333
         0.0.0.1:18333
         0.0.0.1:18333
         0.0.0.1:18333
         0.0.0.1:18333
         0.0.0.2:18333
         0.0.0.2:18333
         0.0.0.2:18333
         0.0.0.2:18333
         0.0.0.2:18333
         0.0.0.2:18333
         0.0.0.2:18333
        0.0.0.10:18333
        0.0.0.11:18333
        0.0.0.24:18333
        0.0.0.29:18333
        0.0.0.34:18333
        0.0.0.36:18333
        0.0.0.44:18333
        0.0.0.67:18333
        0.0.1.35:18333
       0.0.2.226:18333
       0.0.6.110:18333
       0.0.8.161:18333
      0.0.13.222:18333
       0.0.24.73:18333
      0.0.49.178:18333
     0.0.117.151:18333
     0.0.126.112:18333
     0.0.130.222:18333
     0.0.149.160:18333
     0.0.153.174:18333
     0.0.205.196:18333
     0.0.216.139:18333
     0.0.243.227:18333
        0.3.96.1:18333
       0.146.2.2:18333
     0.157.176.1:8888
         1.1.1.1:18333
      1.37.87.73:18333
    1.145.64.164:18333
   1.162.125.119:18333
       2.2.2

In [8]:
future.cancel()

True

Exception in callback _SelectorSocketTransport._call_connection_lost(None)
handle: <Handle _SelectorSocketTransport._call_connection_lost(None)>
Traceback (most recent call last):
  File "/Users/chris/miniforge3/envs/Deeplearning/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/chris/miniforge3/envs/Deeplearning/lib/python3.9/asyncio/selector_events.py", line 978, in _call_connection_lost
    super()._call_connection_lost(exc)
  File "/Users/chris/miniforge3/envs/Deeplearning/lib/python3.9/asyncio/selector_events.py", line 736, in _call_connection_lost
    self._protocol.connection_lost(exc)
  File "/var/folders/cw/ywlpl1tx6m5dyzys99jgnjph0000gn/T/ipykernel_62089/3583971664.py", line 120, in connection_lost
    self.on_con_lost.set_result(True)
asyncio.exceptions.InvalidStateError: invalid state


The server closed the connection


In [None]:
binascii.hexlify(Message(b'version',addrTo = '192.168.0.20', nPortTo = 1883).serialize())