Skip to content

Commit

Permalink
Proper message sequence implementation for Response and Pong messages…
Browse files Browse the repository at this point in the history
…. Record round-trip time at connection level! Reply timeout. Close #13 #5

Fixes.
  • Loading branch information
Kleissner committed Apr 19, 2021
1 parent 00db57a commit b2db811
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func contactArbitraryPeer(publicKey *btcec.PublicKey, addresses []*net.UDPAddr)
}

for _, address := range addresses {
sendAllNetworks(publicKey, &PacketRaw{Command: CommandAnnouncement, Payload: packets[0]}, address)
sendAllNetworks(publicKey, &PacketRaw{Command: CommandAnnouncement, Payload: packets[0], Sequence: msgArbitrarySequence(publicKey)}, address)
}
}

Expand Down
30 changes: 6 additions & 24 deletions Commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ const respondClosesContactsCount = 5

// cmdAnouncement handles an incoming announcement
func (peer *PeerInfo) cmdAnouncement(msg *MessageAnnouncement) {
added := false
var added bool
if peer == nil {
// The added check is required due to potential race condition; initially the client may receive multiple incoming announcement from the same peer via different connections.
if peer, added = PeerlistAdd(msg.SenderPublicKey, msg.connection); !added {
return
}

peer, added = PeerlistAdd(msg.SenderPublicKey, msg.connection)
fmt.Printf("Incoming initial announcement from %s\n", msg.connection.Address.String())
}

Expand Down Expand Up @@ -165,26 +161,16 @@ func (peer *PeerInfo) cmdResponse(msg *MessageResponse) {
info.ActiveNodesSub(1)
info.Terminate() // file was found, terminate the request.
}

// check if incoming response to FIND_SELF
/*for _, hash2peer := range msg.Hash2Peers {
if !bytes.Equal(hash2peer.ID.Hash, nodeID) {
for _, closePeer := range hash2peer.Closest {
// Initiate contact. Once a response comes back, the peer is actually added to the list.
contactArbitraryPeer(closePeer.PublicKey, closePeer.IP, closePeer.Port)
}
}
}*/
}

// cmdPing handles an incoming ping message
func (peer *PeerInfo) cmdPing(msg *MessageRaw) {
if peer == nil {
// Unexpected incoming ping, reply with announce message
// TODO
return
peer, _ = PeerlistAdd(msg.SenderPublicKey, msg.connection)
peer.sendAnnouncement(true, true, nil, nil, nil)
}
peer.send(&PacketRaw{Command: CommandPong})
peer.send(&PacketRaw{Command: CommandPong, Sequence: msg.Sequence})
//fmt.Printf("Incoming ping from %s on %s\n", msg.connection.Address.String(), msg.connection.Address.String())
}

Expand All @@ -207,12 +193,8 @@ func (peer *PeerInfo) cmdLocalDiscovery(msg *MessageAnnouncement) {
return
}

var added bool
if peer == nil {
// The added check is required due to potential race condition; initially the client may receive multiple incoming announcement from the same peer via different connections.
if peer, added = PeerlistAdd(msg.SenderPublicKey, msg.connection); !added {
return
}
peer, _ = PeerlistAdd(msg.SenderPublicKey, msg.connection)

fmt.Printf("Incoming initial local discovery from %s\n", msg.connection.Address.String())
//} else {
Expand Down
15 changes: 8 additions & 7 deletions Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
// Connection is an established connection between a remote IP address and a local network adapter.
// New connections may only be created in case of successful INCOMING packets.
type Connection struct {
Network *Network // network which received the packet
Address *net.UDPAddr // address of the sender or receiver
LastPacketIn time.Time // Last time an incoming packet was received.
LastPacketOut time.Time // Last time an outgoing packet was attempted to send.
LastPingOut time.Time // Last ping out.
Expires time.Time // Inactive connections only: Expiry date. If it does not become active by that date, it will be considered expired and removed.
Status int // 0 = Active established connection, 1 = Inactive, 2 = Removed, 3 = Redundant
Network *Network // network which received the packet
Address *net.UDPAddr // address of the sender or receiver
LastPacketIn time.Time // Last time an incoming packet was received.
LastPacketOut time.Time // Last time an outgoing packet was attempted to send.
LastPingOut time.Time // Last ping out.
Expires time.Time // Inactive connections only: Expiry date. If it does not become active by that date, it will be considered expired and removed.
Status int // 0 = Active established connection, 1 = Inactive, 2 = Removed, 3 = Redundant
RoundTripTime time.Duration // Full round-trip time of last reply.
}

// Connection status
Expand Down
4 changes: 2 additions & 2 deletions Message Encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ createPacketLoop:

// pingConnection sends a ping to the target peer via the specified connection
func (peer *PeerInfo) pingConnection(connection *Connection) {
err := peer.sendConnection(&PacketRaw{Command: CommandPing}, connection)
err := peer.sendConnection(&PacketRaw{Command: CommandPing, Sequence: peer.msgNewSequence()}, connection)
connection.LastPingOut = time.Now()

if (connection.Status == ConnectionActive || connection.Status == ConnectionRedundant) && IsNetworkErrorFatal(err) {
Expand All @@ -756,7 +756,7 @@ func (peer *PeerInfo) sendAnnouncement(sendUA, findSelf bool, findPeer []KeyHash
packets, err := msgEncodeAnnouncement(sendUA, findSelf, findPeer, findValue, files)

for _, packet := range packets {
peer.send(&PacketRaw{Command: CommandAnnouncement, Payload: packet})
peer.send(&PacketRaw{Command: CommandAnnouncement, Payload: packet, Sequence: peer.msgNewSequence()})
}

return err
Expand Down
127 changes: 127 additions & 0 deletions Message Sequence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
File Name: Message Sequence.go
Copyright: 2021 Peernet s.r.o.
Author: Peter Kleissner
Records and verifies message sequences.
Advantages:
* This secures against replay and poisoning attacks.
* If used correctly it can also deduplicate messages (which occurs when 2 peers have multiple registered connections to each other but none are active and subsequent fallback to broadcast).
* The round-trip time can be measured and used to determine the connection quality.
* (future) It can be used to detect missed and lost replies.
*/

package core

import (
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/btcec"
)

// sequences stores all sequence numbers that are valid at the moment. The value represents the time the sequence number was used.
// Key = Peer ID + Sequence Number
var sequences map[string]*sequenceExpiry
var sequencesMutex sync.Mutex

type sequenceExpiry struct {
created time.Time // When the sequence was created.
expires time.Time // When the sequence expires. This can be extended on the fly!
counter int // How many replies used the sequence. Multiple Response messages may be returned for a single Announcement one.
}

func initMessageSequence() {
sequences = make(map[string]*sequenceExpiry)

// auto-delete worker to remove expired sequences
go func() {
for {
time.Sleep(time.Duration(ReplyTimeout) * time.Second)
now := time.Now()

sequencesMutex.Lock()
for key, sequence := range sequences {
if sequence.expires.Before(now) {
delete(sequences, key)
}
}
sequencesMutex.Unlock()
}
}()
}

// msgNewSequence returns a new sequence and registers is
// Use only for Announcement and Ping messages.
func (peer *PeerInfo) msgNewSequence() (sequence uint32) {
sequence = atomic.AddUint32(&peer.messageSequence, 1)

key := string(peer.PublicKey.SerializeCompressed()) + strconv.FormatUint(uint64(sequence), 10)

// Add the sequence to the list. Sequences are unique enough that collisions are unlikely and negligible.
sequencesMutex.Lock()
sequences[key] = &sequenceExpiry{
created: time.Now(),
expires: time.Now().Add(time.Duration(ReplyTimeout) * time.Second),
}
sequencesMutex.Unlock()

return sequence
}

// msgArbitrarySequence returns an arbitrary sequence to be used for uncontacted peers
func msgArbitrarySequence(publicKey *btcec.PublicKey) (sequence uint32) {
sequence = rand.Uint32()

key := string(publicKey.SerializeCompressed()) + strconv.FormatUint(uint64(sequence), 10)

// Add the sequence to the list. Sequences are unique enough that collisions are unlikely and negligible.
sequencesMutex.Lock()
sequences[key] = &sequenceExpiry{
created: time.Now(),
expires: time.Now().Add(time.Duration(ReplyTimeout) * time.Second),
}
sequencesMutex.Unlock()

return sequence
}

// msgValidateSequence validates the sequence number of an incoming message
func msgValidateSequence(raw *MessageRaw) (valid bool, rtt time.Duration) {
// Only Response and Pong
if raw.Command != CommandResponse && raw.Command != CommandPong {
return true, rtt
}

key := string(raw.SenderPublicKey.SerializeCompressed()) + strconv.FormatUint(uint64(raw.Sequence), 10)

sequencesMutex.Lock()
defer sequencesMutex.Unlock()

// lookup the sequence
sequence, ok := sequences[key]
if !ok {
return false, rtt
}

// Initial reply: Store latest roundtrip time. That value might be distorted on Response vs Pong since Response messages might send data
// up to 64 KB which obviously would be transmitted slower than an empty Pong reply. However, for the real world this is good enough.
if sequence.counter == 0 {
rtt = time.Since(sequence.created)
}

sequence.counter++

// Special case CommandResponse: Extend validity in case there are follow-up responses by half of the round-trip time since they will be sent one-way.
if raw.Command == CommandResponse {
sequence.expires = time.Now().Add(time.Duration(ReplyTimeout) * time.Second / 2)
}

return sequence.expires.After(time.Now()), rtt
}

// TODO: Manual invalidation of sequence number from high-level (once information request is considered handled).
11 changes: 11 additions & 0 deletions Network.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ var networksMutex sync.RWMutex
// Default ports to use. This may be randomized in the future to prevent fingerprinting (and subsequent blocking) by corporate and ISP firewalls.
const defaultPort = 'p' // 112

// ReplyTimeout is the round-trip timeout.
var ReplyTimeout = 20

// AutoAssignPort assigns a port for the given IP. Use port 0 for zero configuration.
func (network *Network) AutoAssignPort(ip net.IP, port int) (err error) {
networkA := "udp6"
Expand Down Expand Up @@ -155,6 +158,14 @@ func packetWorker(packets <-chan networkWire) {
// process the packet
raw := &MessageRaw{SenderPublicKey: senderPublicKey, PacketRaw: *decoded, connection: connection}

// Response, Pong: Validate sequence number which prevents unsolicited responses.
if valid, rtt := msgValidateSequence(raw); !valid {
log.Printf("packetWorker message with invalid sequence %d command %d from %s\n", raw.Sequence, raw.Command, raw.connection.Address.String()) // Only log for debug purposes.
continue
} else if rtt > 0 {
connection.RoundTripTime = rtt
}

switch decoded.Command {
case CommandAnnouncement: // Announce
if announce, _ := msgDecodeAnnouncement(raw); announce != nil {
Expand Down
2 changes: 1 addition & 1 deletion Packet Encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func PacketEncrypt(senderPrivateKey *btcec.PrivateKey, receiverPublicKey *btcec.
raw[4] = packet.Protocol
raw[5] = packet.Command

binary.LittleEndian.PutUint16(raw[6:10], uint16(packet.Sequence))
binary.LittleEndian.PutUint32(raw[6:10], uint32(packet.Sequence))
binary.LittleEndian.PutUint16(raw[10:12], uint16(len(packet.Payload)))
copy(raw[12:], packet.Payload)
copy(raw[12+len(packet.Payload):12+len(packet.Payload)+len(garbage)], garbage)
Expand Down
6 changes: 4 additions & 2 deletions Peer ID.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package core
import (
"encoding/hex"
"log"
"math/rand"
"net"
"os"
"sync"
Expand Down Expand Up @@ -82,6 +83,7 @@ type PeerInfo struct {
connectionInactive []*Connection // List of former connections that are no longer valid. They may be removed after a while.
connectionLatest *Connection // Latest valid connection.
sync.RWMutex // Mutex for access to list of connections.
messageSequence uint32 // Sequence number. Increased with every message.

// statistics
StatsPacketSent uint64 // Count of packets sent
Expand All @@ -105,7 +107,7 @@ func PeerlistAdd(PublicKey *btcec.PublicKey, connections ...*Connection) (peer *
return peer, false
}

peer = &PeerInfo{PublicKey: PublicKey, connectionActive: connections, connectionLatest: connections[0], NodeID: publicKey2NodeID(PublicKey)}
peer = &PeerInfo{PublicKey: PublicKey, connectionActive: connections, connectionLatest: connections[0], NodeID: publicKey2NodeID(PublicKey), messageSequence: rand.Uint32()}
peerList[publicKey2Compressed(peer.PublicKey)] = peer

// add to Kademlia
Expand Down Expand Up @@ -175,7 +177,7 @@ func record2Peer(record PeerRecord, network *Network) (peerN *PeerInfo) {

// Create temporary peer which is not added to the global list and not added to Kademlia.
connection := &Connection{Network: network, Address: &net.UDPAddr{IP: record.IP, Port: int(record.Port)}, Status: ConnectionActive}
return &PeerInfo{PublicKey: record.PublicKey, connectionActive: []*Connection{connection}, connectionLatest: connection, NodeID: publicKey2NodeID(record.PublicKey)}
return &PeerInfo{PublicKey: record.PublicKey, connectionActive: []*Connection{connection}, connectionLatest: connection, NodeID: publicKey2NodeID(record.PublicKey), messageSequence: rand.Uint32()}
}

// records2Nodes translates infoPeer structures to nodes
Expand Down
1 change: 1 addition & 0 deletions Peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package core
func Init() {
initPeerID()
initKademlia()
initMessageSequence()
initMulticastIPv6()
initBroadcastIPv4()
initNetwork()
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ Above limits are constants and can be adjusted in the code via `pingTime`, `conn

The routing table has a bucket size of 20 and the size of keys 256 bits (blake3 hash). Nodes within buckets are sorted by least recently seen. The number of nodes to contact concurrently in DHT lookups (also known as alpha number) is set to 5.

### Timeouts

* The default reply timeout (round-trip time) is 20 seconds set in `ReplyTimeout`. This applies to Response and Pong messages. The RTT timeout implies an average minimum connection speed between peers of about 6.4 KB/s for files of 64 KB size.
* Separate timeouts for file transfers will be established.

## Contributing

Please note that by contributing code, documentation, ideas, snippets, or any other intellectual property you agree that you have all the necessary rights and you agree that we, the Peernet organization, may use it for any purpose.
Expand Down

0 comments on commit b2db811

Please sign in to comment.