diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 000000000..081f7e51d --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,44 @@ +package main + +import ( + log "github.com/sirupsen/logrus" + + "github.com/dusk-network/dusk-blockchain/pkg/p2p/kadcast" + "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" +) + +func main() { + log.Infoln("Starting Kadcast Node!") + // Our node info. + var port uint16 = 25519 + ip := [4]byte{62, 57, 180, 247} + router := kadcast.MakeRouter(ip, port) + log.Infoln("Router was created Successfully.") + + // Create buffer. + queue := ring.NewBuffer(500) + + // Launch PacketProcessor rutine. + go kadcast.ProcessPacket(queue, &router) + + // Launch a listener for our node. + go kadcast.StartUDPListener("udp", queue, router.MyPeerInfo) + + // Create BootstrapNodes Peer structs + var port1 uint16 = 25519 + ip1 := [4]byte{157, 230, 219, 77} + boot1 := kadcast.MakePeer(ip1, port1) + var bootstrapNodes []kadcast.Peer + bootstrapNodes = append(bootstrapNodes[:], boot1) + + // Start Bootstrapping process. + err := kadcast.InitBootstrap(&router, bootstrapNodes) + if err != nil { + log.Panic("Error during the Bootstrap Process. Job terminated.") + } + + // Once the bootstrap succeeded, start the network discovery. + kadcast.StartNetworkDiscovery(&router) + + select {} +} diff --git a/go.mod b/go.mod index 44331f8ec..aa87bebb5 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( golang.org/x/net v0.0.0-20190926025831-c00fd9afed17 // indirect golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + gotest.tools v2.2.0+incompatible ) go 1.13 diff --git a/go.sum b/go.sum index a4ab79c4e..67778a04a 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,7 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/graphql-go/graphql v0.7.8 h1:769CR/2JNAhLG9+aa8pfLkKdR0H+r5lsQqling5WwpU= @@ -122,6 +123,7 @@ github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/robpike/ivy v0.0.0-20180326033303-3dd8a2d16657 h1:gKsXF1HCXqryC0W9Y2W4Nw4R/GAPj/SjnsXby0jZ62s= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= @@ -229,4 +231,6 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/p2p/kadcast/bucket.go b/pkg/p2p/kadcast/bucket.go new file mode 100644 index 000000000..5d93f2498 --- /dev/null +++ b/pkg/p2p/kadcast/bucket.go @@ -0,0 +1,99 @@ +package kadcast + +// MaxBucketPeers represents the maximum +//number of peers that a `bucket` can hold. +var MaxBucketPeers uint8 = 25 + +// bucket stores peer info of the peers that are at a certain +// distance range to the peer itself. +type bucket struct { + idLength uint8 + peerCount uint8 + totalPeersPassed uint64 + // Should always be less than `MaxBucketPeers` + entries []Peer + // This map keeps the order of arrivals for LRU + lru map[Peer]uint64 + // This map allows us to quickly see if a Peer is + // included on a entries set without iterating over + // it. + lruPresent map[Peer]bool +} + +// Allocates space for a `bucket` and returns a instance +// of it with the specified `idLength`. +func makeBucket(idlen uint8) bucket { + return bucket{ + idLength: idlen, + totalPeersPassed: 0, + peerCount: 0, + entries: make([]Peer, 0, MaxBucketPeers), + lru: make(map[Peer]uint64), + lruPresent: make(map[Peer]bool), + } +} + +// Finds the Least Recently Used Peer on the entries set +// of the `bucket` and returns it's index on the entries +// set and the `Peer` info that is hold on it. +func (b bucket) findLRUPeerIndex() (int, uint64) { + var val = b.totalPeersPassed + i := 0 + for index, p := range b.entries { + if b.lru[p] <= val { + val = b.lru[p] + i = index + } + } + return i, val +} + +// Remove a `Peer` from the entries set without +// caring about the order. +// It also maps the `Peer` to false on the LRU map. +// The resulting slice of entries is then returned. +func (b *bucket) removePeerAtIndex(index int) []Peer { + // Remove peer from the lruPresent map. + b.lruPresent[b.entries[index]] = false + + b.entries[index] = b.entries[len(b.entries)-1] + // We do not need to put s[i] at the end, as it will be discarded anyway + return b.entries[:len(b.entries)-1] +} + +// Adds a `Peer` to the `bucket` entries list. +// It also increments the peerCount all according +// the LRU policy. +func (b *bucket) addPeer(peer Peer) { + // Check if the entries set can hold more peers. + if len(b.entries) < int(MaxBucketPeers) { + // Insert it into the set if not present + // on the current entries set. + if b.lruPresent[peer] == false { + b.entries = append(b.entries, peer) + b.peerCount++ + b.lruPresent[peer] = true + } + // Store recently used peer. + b.lru[peer] = b.totalPeersPassed + b.totalPeersPassed++ + return + } + // If the entries set is full, we perform + // LRU and remove a peer to include the new one. + // + // Check if peer is not already present into the + // entries set + if b.lruPresent[peer] == false { + // Search for the least recently used peer. + var index, _ = b.findLRUPeerIndex() + // Remove it from the entries set and from + // the lruPresent map. + b.entries = b.removePeerAtIndex(index) + // Add the new peer to the entries set. + b.entries = append(b.entries, peer) + b.lruPresent[peer] = true + b.totalPeersPassed++ + } + b.lru[peer] = b.totalPeersPassed +} diff --git a/pkg/p2p/kadcast/network.go b/pkg/p2p/kadcast/network.go new file mode 100644 index 000000000..7ff9d0bfa --- /dev/null +++ b/pkg/p2p/kadcast/network.go @@ -0,0 +1,65 @@ +package kadcast + +import ( + log "github.com/sirupsen/logrus" + "net" + "time" + + "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" +) + +// StartUDPListener listens infinitely for UDP packet arrivals and +// executes it's processing inside a gorutine by sending +// the packets to the circularQueue. +func StartUDPListener(netw string, queue *ring.Buffer, MyPeerInfo Peer, ) { + + lAddr := getLocalUDPAddress() + // Set listening port. + lAddr.Port = int(MyPeerInfo.port) +PacketConnCreation: + // listen to incoming udp packets + pc, err := net.ListenUDP(netw, &lAddr) + if err != nil { + log.Panic(err) + } + // Set initial deadline. + pc.SetDeadline(time.Now().Add(time.Minute)) + + // Instanciate the buffer + buffer := make([]byte, 1024) + for { + // Read UDP packet. + byteNum, uAddr, err := pc.ReadFromUDP(buffer) + + if err != nil { + log.WithError(err).Warn("Error on packet read") + pc.Close() + goto PacketConnCreation + } + // Set a new deadline for the connection. + pc.SetDeadline(time.Now().Add(5 * time.Minute)) + // Serialize the packet. + encodedPack := encodeRedPacket(uint16(byteNum), *uAddr, buffer[0:byteNum]) + // Send the packet to the Consumer putting it on the queue. + queue.Put(encodedPack) + } +} + +// Gets the local address of the sender `Peer` and the UDPAddress of the +// reciever `Peer` and sends to it a UDP Packet with the payload inside. +func sendUDPPacket(netw string, addr net.UDPAddr, payload []byte) { + localAddr := getLocalUDPAddress() + conn, err := net.DialUDP(netw, &localAddr, &addr) + if err != nil { + log.WithError(err).Warn("Could not stablish a connection with the dest Peer.") + return + } + defer conn.Close() + + // Simple write + _, err = conn.Write(payload) + if err != nil { + log.WithError(err).Warn("Error while writting to the filedescriptor.") + return + } +} \ No newline at end of file diff --git a/pkg/p2p/kadcast/packet.go b/pkg/p2p/kadcast/packet.go new file mode 100644 index 000000000..f89638780 --- /dev/null +++ b/pkg/p2p/kadcast/packet.go @@ -0,0 +1,261 @@ +package kadcast + +import ( + "encoding/binary" + "net" + + log "github.com/sirupsen/logrus" + + "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" +) + +// Packet represents a Kadcast packet which is +// the payload of the TCP or UDP packet received. +type Packet struct { + headers [24]byte + payload []byte +} + +// Builds a `Packet` from the headers and the payload. +func makePacket(headers [24]byte, payload []byte) Packet { + return Packet{ + headers: headers, + payload: payload, + } +} + +// -------- General Packet De/Serialization tools -------- // + +// Gets a stream of bytes and slices it between headers of Kadcast +// protocol and the payload. +func getPacketFromStream(stream []byte) Packet { + var headers [24]byte + copy(headers[:], stream[0:23]) + return Packet{ + headers: headers, + payload: stream[23:], + } +} + +// Deserializes the packet into an slice of bytes. +func (pac Packet) asBytes() []byte { + hl := len(pac.headers) + l := hl + len(pac.payload) + byteRepr := make([]byte, l) + copy(byteRepr, pac.headers[:]) + copy(byteRepr[hl:], pac.payload[:]) + return byteRepr +} + +// Returns the headers info sliced into three pieces: +// Packet type, SenderId, IdNonce and senderPort. +func (pac Packet) getHeadersInfo() (byte, [16]byte, [4]byte, [2]byte) { + // Construct type, senderID and Nonce + typ := pac.headers[0] + var senderID [16]byte + copy(senderID[:], pac.headers[1:17]) + var nonce [4]byte + copy(nonce[:], pac.headers[17:21]) + var peerPort [2]byte + copy(peerPort[:], pac.headers[21:23]) + return typ, senderID, nonce, peerPort +} + +// Gets the Packet headers parts and puts them into the +// header attribute of the Packet. +func (pac *Packet) setHeadersInfo(tipus byte, router Router, destPeer Peer) { + headers := make([]byte, 24) + // Add `Packet` type. + headers[0] = tipus + // Add MyPeer ID + copy(headers[1:17], router.MyPeerInfo.id[0:16]) + // Attach IdNonce + idNonce := getBytesFromUint32(router.myPeerNonce) + copy(headers[17:21], idNonce[0:4]) + // Attach Port + port := getBytesFromUint16(destPeer.port) + copy(headers[21:23], port[0:2]) + + // Build headers array from the slice. + var headersArr [24]byte + copy(headersArr[:], headers[0:23]) + + pac.headers = headersArr +} + +// -------- NODES Packet De/Serialization tools -------- // + +// Builds the payload of a `NODES` message by collecting, +// deserializing and adding to the packet's payload the +// peerInfo of the `K` closest Peers in respect to a certain +// target Peer. +func (pac *Packet) setNodesPayload(router Router, targetPeer Peer) int { + // Get `K` closest peers to `targetPeer`. + kClosestPeers := router.getXClosestPeersTo(K, targetPeer) + // Compute the amount of Peers that will be sent and add it + // as a two-byte array. + count := getBytesFromUint16(uint16(len(kClosestPeers))) + pac.payload = append(pac.payload[:], count[:]...) + // Serialize the Peers to get them in `wire-format`, + // basically, represented as bytes. + for _, peer := range kClosestPeers { + pac.payload = append(pac.payload[:], peer.deserialize()...) + } + return len(kClosestPeers) +} + +// Analyzes if the announced number of Peers included on the +// `NODES` message payload is the same as the recieved one. +// Returns `true` if it is correct and `false` otherways. +func (pac Packet) checkNodesPayloadConsistency(byteNum int) bool { + // Get number of Peers announced. + peerNum := binary.BigEndian.Uint16(pac.payload[0:2]) + // Get peerSlice length subtracting headers and count. + peerSliceLen := byteNum - (len(pac.headers) + 2) + + return int(peerNum)*PeerBytesSize == peerSliceLen +} + +// Gets a `NODES` message and returns a slice of the +// `Peers` found inside of it +func (pac Packet) getNodesPayloadInfo() []Peer { + // Get number of Peers recieved. + peerNum := int(binary.BigEndian.Uint16(pac.payload[0:2])) + // Create Peer-struct slice + var peers []Peer + // Slice the payload into `Peers` in bytes format and deserialize + // every single one of them. + var i, j int = 3, PeerBytesSize + 1 + for m := 0; m < peerNum; m++ { + // Get the peer structure from the payload and + // append the peer to the returned slice of Peer structs. + peers = append(peers[:], serializePeer(pac.payload[i:j])) + + i += PeerBytesSize + j += PeerBytesSize + } + return peers +} + +// ProcessPacket recieves a Packet and processes it according to +// it's type. It gets the packets from the circularqueue that +// connects the listeners with the packet processor. +func ProcessPacket(queue *ring.Buffer, router *Router) { + // Instantiate now the variables to not pollute + // the stack. + var err error + var byteNum int + var senderAddr *net.UDPAddr + var udpPayload []byte + var packet Packet + for { + // Get all of the packets that are now on the queue. + queuePackets, _ := queue.GetAll() + for _, item := range queuePackets { + // Get items from the queue packet taken. + byteNum, senderAddr, udpPayload, err = decodeRedPacket(item) + if err != nil { + log.WithError(err).Warn("Error decoding the packet taken from the ring.") + continue + } + // Build packet struct + packet = getPacketFromStream(udpPayload[:]) + // Extract headers info. + tipus, senderID, nonce, peerRecepPort := packet.getHeadersInfo() + + // Verify IDNonce + err = verifyIDNonce(senderID, nonce) + // If we get an error, we just skip the whole process since the + // Peer was not validated. + if err := verifyIDNonce(senderID, nonce); err != nil { + log.WithError(err).Warn("Incorrect packet sender ID. Skipping its processing.") + continue + } + + // Build Peer info and put the right port on it subsituting the one + // used to send the message by the one where the peer wants to receive + // the messages. + ip, _ := getPeerNetworkInfo(*senderAddr) + port := binary.LittleEndian.Uint16(peerRecepPort[:]) + peerInf := MakePeer(ip, port) + + // Check packet type and process it. + switch tipus { + case 0: + log.WithField( + "Source-IP", peerInf.ip[:], + ).Infoln("Recieved PING message") + handlePing(peerInf, router) + case 1: + log.WithField( + "Source-IP", peerInf.ip[:], + ).Infoln("Recieved PONG message") + handlePong(peerInf, router) + + case 2: + log.WithField( + "Source-IP", peerInf.ip[:], + ).Infoln("Recieved FIND_NODES message") + handleFindNodes(peerInf, router) + + case 3: + log.WithField( + "Source-IP", peerInf.ip[:], + ).Infoln("Recieved NODES message") + handleNodes(peerInf, packet, router, byteNum) + } + } + } +} + +// Processes the `PING` packet info sending back a +// `PONG` message and adding the sender to the buckets. +func handlePing(peerInf Peer, router *Router) { + // Process peer addition to the tree. + router.tree.addPeer(router.MyPeerInfo, peerInf) + // Send back a `PONG` message. + router.sendPong(peerInf) +} + +// Processes the `PONG` packet info and +// adds the sender to the buckets. +func handlePong(peerInf Peer, router *Router) { + // Process peer addition to the tree. + router.tree.addPeer(router.MyPeerInfo, peerInf) +} + +// Processes the `FIND_NODES` packet info sending back a +// `NODES` message and adding the sender to the buckets. +func handleFindNodes(peerInf Peer, router *Router) { + // Process peer addition to the tree. + router.tree.addPeer(router.MyPeerInfo, peerInf) + // Send back a `NODES` message to the peer that + // send the `FIND_NODES` message. + router.sendNodes(peerInf) +} + +// Processes the `NODES` packet info sending back a +// `PING` message to all of the Peers announced on the packet +// and adding the sender to the buckets. +func handleNodes(peerInf Peer, packet Packet, router *Router, byteNum int) { + // See if the packet info is consistent: + // peerNum announced <=> bytesPerPeer * peerNum + if !packet.checkNodesPayloadConsistency(byteNum) { + // Since the packet is not consisten, we just discard it. + log.Info("NODES message recieved with corrupted payload. Packet ignored.") + return + } + + // Process peer addition to the tree. + router.tree.addPeer(router.MyPeerInfo, peerInf) + + // Deserialize the payload to get the peerInfo of every + // recieved peer. + peers := packet.getNodesPayloadInfo() + + // Send `PING` messages to all of the peers to then + // add them to our buckets if they respond with a `PONG`. + for _, peer := range peers { + router.sendPing(peer) + } +} diff --git a/pkg/p2p/kadcast/peer.go b/pkg/p2p/kadcast/peer.go new file mode 100644 index 000000000..507e0f311 --- /dev/null +++ b/pkg/p2p/kadcast/peer.go @@ -0,0 +1,115 @@ +package kadcast + +import ( + "encoding/binary" + "net" + + "golang.org/x/crypto/sha3" +) + +// PeerBytesSize represents the ammount of bytes +// necessary to represent . +const PeerBytesSize int = 22 + +// Peer stores the info of a peer which consists on: +// - IP of the peer. +// - Port to connect to it. +// - The ID of the peer. +type Peer struct { + ip [4]byte + port uint16 + id [16]byte +} + +// MakePeer constructs a `Peer` by setting it's IP, Port +// and computing and setting it's ID. +func MakePeer(ip [4]byte, port uint16) Peer { + id := computePeerID(ip) + peer := Peer{ip, port, id} + return peer +} + +// Deserializes a `Peer` structure as an array of bytes +// that allows to send it through a wire. +func (peer Peer) deserialize() []byte { + serPeer := make([]byte, 22) + // Add Peer IP. + copy(serPeer[0:4], peer.ip[0:4]) + // Serialize and add Peer port. + portByt := getBytesFromUint16(peer.port) + copy(serPeer[4:6], portByt[0:2]) + // Add Peer ID. + copy(serPeer[6:22], peer.id[0:16]) + return serPeer +} + +// Serializes an array of bytes that contains a Peer +// on it returning a `Peer` structure. +func serializePeer(peerBytes []byte) Peer { + // Get Ip + var ip [4]byte + copy(ip[:], peerBytes[0:4]) + // Get Port + port := binary.LittleEndian.Uint16(peerBytes[4:6]) + // Get Id + var id [16]byte + copy(id[:], peerBytes[6:22]) + + return Peer{ + ip: ip, + port: port, + id: id, + } +} + +// The function receives the user's `Peer` and computes the +// ID nonce in order to be able to join the network. +// +// This operation is basically a PoW algorithm that ensures +// that Sybil attacks are more costly. +func (peer Peer) computePeerNonce() uint32 { + var nonce uint32 = 0 + var hash [32]byte + data := make([]byte, 20) + id := peer.id + for { + bytesUint := getBytesFromUint32(nonce) + copy(data[0:16], id[0:16]) + copy(data[16:20], bytesUint[0:4]) + hash = sha3.Sum256(data) + if (hash[31]) == 0 { + return nonce + } + nonce++ + } +} + +// Computes the XOR distance between two Peers. +func (peer Peer) computeDistance(otherPeer Peer) uint16 { + return idXor(peer.id, otherPeer.id) +} + +// Reads the network info of a `Peer` and returns its +// corresponding `UDPAddr` struct. +func (peer Peer) getUDPAddr() net.UDPAddr { + return net.UDPAddr{ + IP: peer.ip[:], + Port: int(peer.port), + Zone: "N/A", + } +} + +// Builds the Peer info from a UPDAddress struct. +func getPeerNetworkInfo(udpAddress net.UDPAddr) ([4]byte, uint16) { + var ip [4]byte + copy(ip[:], udpAddress.IP[:]) + return ip, uint16(udpAddress.Port) +} + +// PeerSort is a helper type to sort `Peers` +type PeerSort struct { + ip [4]byte + port uint16 + id [16]byte + xorMyPeer [16]byte +} diff --git a/pkg/p2p/kadcast/protocol.go b/pkg/p2p/kadcast/protocol.go new file mode 100644 index 000000000..b1a447daa --- /dev/null +++ b/pkg/p2p/kadcast/protocol.go @@ -0,0 +1,69 @@ +package kadcast + +import ( + "errors" + "time" + + log "github.com/sirupsen/logrus" +) + +// InitBootstrap inits the Bootstrapping process by sending +// a `PING` message to every bootstrapping node repeatedly. +// If it tried 3 or more times and no new `Peers` were added, +// it panics. +// Otherways, it returns `nil` and logs the Number of peers +// the node is connected to at the end of the process. +func InitBootstrap(router *Router, bootNodes []Peer) error { + log.Info("Bootstrapping process started.") + // Get PeerList ordered by distance so we can compare it + // after the `PONG` arrivals. + initPeerNum := router.tree.getTotalPeers() + for i := 0; i <= 3; i++ { + + actualPeers := router.pollBootstrappingNodes(bootNodes, time.Second*5) + if actualPeers <= initPeerNum { + if i == 3 { + return errors.New("Maximum number of attempts achieved. Please review yor connection settings") + } + log.WithFields(log.Fields{ + "Retries": i, + }).Warn("Bootstrapping nodes were not added.\nTrying again..") + } else { + break + } + } + log.WithFields(log.Fields{ + "connected_nodes": router.tree.getTotalPeers(), + }).Info("Bootstrapping process finished") + return nil +} + +// StartNetworkDiscovery triggers the network discovery process. +// The node basically sends `FIND_NODES` messages to the nodes it +// is currently connected to and evaluates the `Peers` that were added +// on each iteration. +// If the closest peer to ours is the same during two iterations of the +// `FIND_NODES` message, we finish the process logging the ammout of peers +// we are currently connected to. +// Otherways, if the closest Peer on two consecutive iterations changes, we +// keep queriyng the `alpha` closest nodes with `FIND_NODES` messages. +func StartNetworkDiscovery(router *Router) { + // Get closest actual Peer. + previousClosestArr := router.getXClosestPeersTo(1, router.MyPeerInfo) + previousClosest := previousClosestArr[0] + + // Ask for new peers, wait for `PONG` arrivals and get the + // new closest `Peer`. + actualClosest := router.pollClosestPeer(5 * time.Second) + + // Until we don't get a peer closer to our node on each poll, + // we look for more nodes. + for actualClosest != previousClosest { + previousClosest = actualClosest + actualClosest = router.pollClosestPeer(5 * time.Second) + } + + log.WithFields(log.Fields{ + "peers_connected": router.tree.getTotalPeers(), + }).Info("Network Discovery process finished.") +} diff --git a/pkg/p2p/kadcast/router.go b/pkg/p2p/kadcast/router.go new file mode 100644 index 000000000..fae0796a3 --- /dev/null +++ b/pkg/p2p/kadcast/router.go @@ -0,0 +1,215 @@ +package kadcast + +import ( + "net" + "sync" + "sort" + "time" +) + +// K is the number of peers that a node will send on +// a `NODES` message. +const K int = 20 + +// Alpha is the number of nodes to which a node will +// ask for new nodes with `FIND_NODES` messages. +const Alpha int = 3 + +// Router holds all of the data needed to interact with +// the routing data and also the networking utils. +type Router struct { + // Tree represents the routing structure. + tree Tree + // Even the port and the IP are the same info, the difference + // is that one IP has type `IP` and the other `[4]byte`. + // Since we only store one tree on the application, it's worth + // to keep both in order to avoid convert the types continuously. + myPeerUDPAddr net.UDPAddr + MyPeerInfo Peer + // Holds the Nonce that satisfies: `H(ID || Nonce) < Tdiff`. + myPeerNonce uint32 +} + +// MakeRouter allows to create a router which holds the peerInfo and +// also the routing tree information. +func MakeRouter(externIP [4]byte, port uint16) Router { + myPeer := MakePeer(externIP, port) + return Router{ + tree: makeTree(myPeer), + myPeerUDPAddr: myPeer.getUDPAddr(), + MyPeerInfo: myPeer, + myPeerNonce: myPeer.computePeerNonce(), + } +} + +// --------------------------------------------------// +// // +// Tools to get sorted Peers in respect to a certain // +// PeerID in terms of XOR-distance. // +// // +// --------------------------------------------------// + +// Returns the complete list of Peers in order to be sorted +// as they have the xor distance in respec to a Peer as a parameter. +func (router Router) getPeerSortDist(refPeer Peer) []PeerSort { + var peerList []Peer + for buckIdx, bucket := range router.tree.buckets { + // Skip bucket 0 + if buckIdx != 0 { + peerList = append(peerList[:], bucket.entries[:]...) + } + } + var peerListSort []PeerSort + for _, peer := range peerList { + // We don't want to return the Peer struct of the Peer + // that is the reference. + if peer != refPeer { + peerListSort = append(peerListSort[:], + PeerSort{ + ip: peer.ip, + port: peer.port, + id: peer.id, + xorMyPeer: xor(refPeer.id, peer.id), + }) + } + } + return peerListSort +} + +// ByXORDist implements sort.Interface based on the IdDistance +// respective to myPeerId. +type ByXORDist []PeerSort + +func (a ByXORDist) Len() int { return len(a) } +func (a ByXORDist) Less(i int, j int) bool { + return !xorIsBigger(a[i].xorMyPeer, a[j].xorMyPeer) +} +func (a ByXORDist) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Returns a list of the selected number of closest peers +// in respect to a certain `Peer`. +func (router Router) getXClosestPeersTo(peerNum int, refPeer Peer) []Peer { + var xPeers []Peer + peerList := router.getPeerSortDist(refPeer) + sort.Sort(ByXORDist(peerList)) + + // Get the `peerNum` closest ones. + for _, peer := range peerList { + xPeers = append(xPeers[:], + Peer{ + ip: peer.ip, + port: peer.port, + id: peer.id, + }) + if len(xPeers) >= peerNum { + break + } + } + return xPeers +} + +// Sends a `FIND_NODES` messages to the `alpha` closest peers +// the node knows and waits for a certain time in order to wait +// for the `PONG` message arrivals. +// Then looks for the closest peer to the node itself into the +// buckets and returns it. +func (router Router) pollClosestPeer(t time.Duration) Peer { + var wg sync.WaitGroup + var ps []Peer + wg.Add(1) + router.sendFindNodes() + + timer := time.AfterFunc(t, func() { + ps = router.getXClosestPeersTo(1, router.MyPeerInfo) + wg.Done() + }) + + wg.Wait() + timer.Stop() + return ps[0] +} + +// Sends a `PING` messages to the bootstrap nodes that +// the node knows and waits for a certain time in order to wait +// for the `PONG` message arrivals. +// Returns back the new number of peers the node is connected to. +func (router Router) pollBootstrappingNodes(bootNodes []Peer, t time.Duration) uint64 { + var wg sync.WaitGroup + var peerNum uint64 + + wg.Add(1) + for _, peer := range bootNodes { + router.sendPing(peer) + } + + timer := time.AfterFunc(t, func() { + peerNum = uint64(router.tree.getTotalPeers()) + wg.Done() + }) + + wg.Wait() + timer.Stop() + return peerNum +} + +// ------- Packet-sending utilities for the Router ------- // + +// Builds and sends a `PING` packet +func (router Router) sendPing(receiver Peer) { + // Build empty packet. + var packet Packet + // Fill the headers with the type, ID, Nonce and destPort. + packet.setHeadersInfo(0, router, receiver) + + // Since return values from functions are not addressable, we need to + // allocate the receiver UDPAddr + destUDPAddr := receiver.getUDPAddr() + // Send the packet + sendUDPPacket("udp", destUDPAddr, packet.asBytes()) +} + +// Builds and sends a `PONG` packet +func (router Router) sendPong(receiver Peer) { + // Build empty packet. + var packet Packet + // Fill the headers with the type, ID, Nonce and destPort. + packet.setHeadersInfo(1, router, receiver) + + // Since return values from functions are not addressable, we need to + // allocate the receiver UDPAddr + destUDPAddr := receiver.getUDPAddr() + // Send the packet + sendUDPPacket("udp", destUDPAddr, packet.asBytes()) +} + +// Builds and sends a `FIND_NODES` packet. +func (router Router) sendFindNodes() { + // Get `Alpha` closest nodes to me. + destPeers := router.getXClosestPeersTo(Alpha, router.MyPeerInfo) + // Fill the headers with the type, ID, Nonce and destPort. + for _, peer := range destPeers { + // Build the packet + var packet Packet + packet.setHeadersInfo(2, router, peer) + // We don't need to add the ID to the payload snce we already have + // it in the headers. + // Send the packet + sendUDPPacket("udp", peer.getUDPAddr(), packet.asBytes()) + } +} + +// Builds and sends a `NODES` packet. +func (router Router) sendNodes(receiver Peer) { + // Build empty packet + var packet Packet + // Set headers + packet.setHeadersInfo(3, router, receiver) + // Set payload with the `k` peers closest to receiver. + peersToSend := packet.setNodesPayload(router, receiver) + // If we don't have any peers to announce, we just skip sending + // the `NODES` messsage. + if peersToSend == 0 { + return + } + sendUDPPacket("udp", receiver.getUDPAddr(), packet.asBytes()) +} diff --git a/pkg/p2p/kadcast/tree.go b/pkg/p2p/kadcast/tree.go new file mode 100644 index 000000000..2af53d2cc --- /dev/null +++ b/pkg/p2p/kadcast/tree.go @@ -0,0 +1,42 @@ +package kadcast + +// Tree stores `L` buckets inside of it. +// This is basically the routing info of every peer. +type Tree struct { + buckets [128]bucket +} + +// Allocates space for a tree and returns an empty intance of it. +// +// It also sets our `Peer` info in the lowest order bucket. +func makeTree(myPeer Peer) Tree { + var bucketList [128]bucket + for i := 0; i < 128; i++ { + bucketList[i] = makeBucket(uint8(i)) + } + // Add my `Peer` info on the lowest `bucket`. + bucketList[0].addPeer(myPeer) + return Tree{ + buckets: bucketList, + } +} + +// Classifies and adds a Peer to the routing storage tree. +func (tree *Tree) addPeer(myPeer Peer, otherPeer Peer) { + idl := myPeer.computeDistance(otherPeer) + if idl == 0 { + return + } + tree.buckets[idl].addPeer(otherPeer) +} + +// Returns the total amount of peers that a `Peer` is connected to. +func (tree *Tree) getTotalPeers() uint64 { + var count uint64 = 0 + for i, bucket := range tree.buckets { + if i != 0 { + count += uint64(bucket.peerCount) + } + } + return count +} diff --git a/pkg/p2p/kadcast/utils.go b/pkg/p2p/kadcast/utils.go new file mode 100644 index 000000000..f10ccea9c --- /dev/null +++ b/pkg/p2p/kadcast/utils.go @@ -0,0 +1,185 @@ +package kadcast + +import ( + "encoding/binary" + "errors" + "github.com/sirupsen/logrus" + "net" + + "golang.org/x/crypto/sha3" + + // Just for debugging purposes + _ "fmt" +) + +// ------------------ DISTANCE UTILS ------------------ // + +// Computes the XOR between two [16]byte arrays. +func xor(a [16]byte, b [16]byte) [16]byte { + distance := [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + + for i := 0; i < 16; i++ { + distance[i] = a[i] ^ b[i] + } + return distance +} + +// Computes the XOR distance between 2 different +// ids and classifies it between the range 0-128. +func idXor(a [16]byte, b [16]byte) uint16 { + distance := xor(a, b) + return classifyDistance(distance) +} + +// This function gets the XOR distance as a byte-array +// and collapses it to classify the distance on one of the +// 128 buckets. +func classifyDistance(arr [16]byte) uint16 { + var collDist uint16 = 0 + for i := 0; i < 16; i++ { + collDist += countSetBits(arr[i]) + } + return collDist +} + +// Counts the number of setted bits in the given byte. +func countSetBits(byt byte) uint16 { + var count uint16 = 0 + for byt != 0 { + count += uint16(byt & 1) + byt >>= 1 + } + return count +} + +// Evaluates if an XOR-distance of two peers is +// bigger than another. +func xorIsBigger(a [16]byte, b [16]byte) bool { + for i := 15; i > 0; i-- { + if a[i] < b[i] { + return false + } + } + return true +} + +// ------------------ HASH KEY UTILS ------------------ // + +// Performs the hash of the wallet public +// IP address and gets the first 16 bytes of +// it. +func computePeerID(externIP [4]byte) [16]byte { + var halfLenID [16]byte + doubleLenID := sha3.Sum256(externIP[:]) + copy(halfLenID[:], doubleLenID[0:16]) + return halfLenID +} + +// This function is a middleware that allows the peer to verify +// other Peers nonce's and validate them if they are correct. +func verifyIDNonce(id [16]byte, nonce [4]byte) error { + idPlusNonce := make([]byte, 20) + copy(idPlusNonce[0:16], id[0:16]) + copy(idPlusNonce[16:20], nonce[0:4]) + hash := sha3.Sum256(idPlusNonce) + if (hash[31]) == 0 { + return nil + } + return errors.New("Id and Nonce are not valid parameters") //TODO: Create error type. +} + +// ------------------ NET UTILS ------------------ // + +// Gets the local IP address of the machine where +// the node is running in `net.UDPAddr` format. +// +// Panics if it there's not connection. +func getLocalUDPAddress() net.UDPAddr { + conn, err := net.Dial("udp", "8.8.8.8:80") + if err != nil { + logrus.WithError(err).Warn("Network Unreachable.") + } + defer conn.Close() + + localAddr := conn.LocalAddr().(*net.UDPAddr) + return *localAddr +} + +// Gets the local IP address of the machine where +// the node is running in `net.UDPAddr` format. +// +// Panics if it there's not connection. +func getLocalTCPAddress() net.TCPAddr { + conn, err := net.Dial("tcp", "8.8.8.8:80") + if err != nil { + logrus.WithError(err).Warn("Network Unreachable.") + } + defer conn.Close() + + localAddr := conn.LocalAddr().(*net.TCPAddr) + return *localAddr +} + + +// ------------------ ENC/DEC UTILS ------------------ // + +// Set a `uint32` in bytes format. +func getBytesFromUint32(num uint32) [4]byte { + res := [4]byte{0, 0, 0, 0} + for i := 0; num > 0; i++ { + res[i] = byte(num & 255) + num = num >> 8 + } + return res +} + +// Set a `uint16` in bytes format. +func getBytesFromUint16(num uint16) [2]byte { + res := [2]byte{0, 0} + for i := 0; num > 0; i++ { + // Cut the input to byte range. + res[i] = byte(num & 255) + // Shift it to subtract a byte from the number. + num = num >> 8 + } + return res +} + +// Encode a received packet to send it through the +// Ring to the packetProcess rutine. +func encodeRedPacket(byteNum uint16, peerAddr net.UDPAddr, payload []byte) []byte { + encodedLen := len(payload) + 8 + enc := make([]byte, encodedLen) + // Get numBytes as slice of bytes. + numBytes := getBytesFromUint16(byteNum) + // Append it to the resulting slice. + copy(enc[0:2], numBytes[0:2]) + // Append Peer IP. + copy(enc[2:6], peerAddr.IP[0:4]) + // Append Port + port := getBytesFromUint16(uint16(peerAddr.Port)) + copy(enc[6:8], port[0:2]) + // Append Payload + copy(enc[8:encodedLen], payload[0:len(payload)]) + return enc +} + +// Decode a CircularQueue packet and return the +// elements of the original received packet. +func decodeRedPacket(packet []byte) (int, *net.UDPAddr, []byte, error) { + redPackLen := len(packet) + byteNum := int(binary.LittleEndian.Uint16(packet[0:2])) + if (redPackLen) != (byteNum + 8) { + return 0, nil, nil, errors.New("Packet's length taken from the ring differs from expected") + } + ip := packet[2:6] + port := int(binary.LittleEndian.Uint16(packet[6:8])) + payload := packet[8:] + + peerAddr := net.UDPAddr { + IP: ip, + Port: port, + Zone: "N/A", + } + return byteNum, &peerAddr, payload, nil +} diff --git a/pkg/p2p/kadcast/utils_test.go b/pkg/p2p/kadcast/utils_test.go new file mode 100644 index 000000000..6b2dddc83 --- /dev/null +++ b/pkg/p2p/kadcast/utils_test.go @@ -0,0 +1,22 @@ +package kadcast + +import ( + "encoding/binary" + "fmt" + "testing" +) + +func testConv(t *testing.T) { + var slce []byte = []byte{0, 0} + peerNum := binary.BigEndian.Uint16(slce) + fmt.Printf("%v", peerNum) +} +func testPOW(t *testing.T) { + a := Peer{ + ip: [4]byte{192, 169, 1, 1}, + port: 25519, + id: [16]byte{22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22}, + } + + println(a.computePeerNonce()) +}