Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ULimit #169

Merged
merged 8 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 52 additions & 28 deletions PythonTests/Meros/Meros.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,34 @@ class MessageType(
):
Handshake = 0
Syncing = 1
BlockchainTail = 2

PeersRequest = 3
Peers = 4
BlockListRequest = 5
BlockList = 6

BlockHeaderRequest = 8
BlockBodyRequest = 9
SketchHashesRequest = 10
SketchHashRequests = 11
TransactionRequest = 12
DataMissing = 13

Claim = 14
Send = 15
Data = 16

SignedVerification = 19
SignedSendDifficulty = 20
SignedDataDifficulty = 21
SignedMeritRemoval = 23

BlockHeader = 25
BlockBody = 26
SketchHashes = 27
VerificationPacket = 28
Busy = 2
BlockchainTail = 3

PeersRequest = 4
Peers = 5
BlockListRequest = 6
BlockList = 7

BlockHeaderRequest = 9
BlockBodyRequest = 10
SketchHashesRequest = 11
SketchHashRequests = 12
TransactionRequest = 13
DataMissing = 14

Claim = 15
Send = 16
Data = 17

SignedVerification = 20
SignedSendDifficulty = 21
SignedDataDifficulty = 22
SignedMeritRemoval = 24

BlockHeader = 26
BlockBody = 27
SketchHashes = 28
VerificationPacket = 29

#MessageType -> byte.
def toByte(
Expand All @@ -81,6 +82,7 @@ def toByte(
#A zero means custom logic should be used.
live_lengths: Dict[MessageType, List[int]] = {
MessageType.Handshake: [37],
MessageType.Busy: [1, -6],
MessageType.BlockchainTail: [32],

MessageType.Claim: [1, -33, 80],
Expand All @@ -97,10 +99,11 @@ def toByte(

sync_lengths: Dict[MessageType, List[int]] = {
MessageType.Syncing: live_lengths[MessageType.Handshake],
MessageType.Busy: live_lengths[MessageType.Busy],
MessageType.BlockchainTail: [32],

MessageType.PeersRequest: [],
MessageType.Peers: [1, -6],
MessageType.Peers: live_lengths[MessageType.Busy],
MessageType.BlockListRequest: [34],
MessageType.BlockList: [1, -32, 32],

Expand Down Expand Up @@ -204,6 +207,19 @@ def recv(

return result

#Raised when the node is busy.
#This isn't defined in Errors because it has no relation to the test suite.
class BusyError(
Exception
):
def __init__(
self,
msg: str,
handshake: bytes
) -> None:
Exception.__init__(self, msg)
self.handshake: bytes = handshake

class MerosSocket:
#Constructor.
def __init__(
Expand Down Expand Up @@ -232,6 +248,14 @@ def __init__(

#Receive their Handshake.
response: bytes = recv(self.connection, live_lengths if live else sync_lengths)
if MessageType(response[0]) == MessageType.Busy:
#Wrapped in a try/except as this will error out if Meros beats it to the punch.
try:
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()
except OSError:
pass
raise BusyError("Node was busy.", response)
if MessageType(response[0]) != (MessageType.Handshake if live else MessageType.Syncing):
raise TestError("Node didn't send the right Handshake for this connection type.")
if response[1] != network:
Expand Down
4 changes: 4 additions & 0 deletions PythonTests/Test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
from PythonTests.Tests.Consensus.HundredSix.HundredSixMeritRemovalsTest import HundredSixMeritRemovalsTest

from PythonTests.Tests.Network.LANPeersTest import LANPeersTest
from PythonTests.Tests.Network.ULimitTest import ULimitTest
from PythonTests.Tests.Network.BusyTest import BusyTest
from PythonTests.Tests.Network.HundredTwentyFiveTest import HundredTwentyFiveTest

#Arguments.
Expand Down Expand Up @@ -133,6 +135,8 @@
HundredSixMeritRemovalsTest,

LANPeersTest,
ULimitTest,
BusyTest,
HundredTwentyFiveTest
]

Expand Down
109 changes: 109 additions & 0 deletions PythonTests/Tests/Network/BusyTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#Blockchain class.
from PythonTests.Classes.Merit.Blockchain import Blockchain

#Meros classes.
from PythonTests.Meros.Meros import MessageType
from PythonTests.Meros.RPC import RPC

#TestError and SuccessError Exceptions.
from PythonTests.Tests.Errors import TestError, SuccessError

#Socket and select standard libs.
import socket
import select

#pylint: disable=too-many-statements
def BusyTest(
rpc: RPC
) -> None:
#Blockchain. Solely used to get the genesis Block hash.
blockchain: Blockchain = Blockchain()

#Handshake with the node.
rpc.meros.syncConnect(blockchain.blocks[0].header.hash)

#Create two new server sockets.
def createServerSocket() -> socket.socket:
result: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result.bind(("127.0.0.1", 0))
result.listen(2)
return result
busyServer: socket.socket = createServerSocket()
server: socket.socket = createServerSocket()

#Receive Syncing until Meros asks for peers.
while True:
res = rpc.meros.sync.recv()
if MessageType(res[0]) == MessageType.Syncing:
rpc.meros.sync.send(MessageType.BlockchainTail.toByte() + blockchain.blocks[0].header.hash)
elif MessageType(res[0]) == MessageType.PeersRequest:
break

#Craft a Peers message of our own server.
rpc.meros.sync.send(
MessageType.Peers.toByte() +
bytes.fromhex("017F000001") +
busyServer.getsockname()[1].to_bytes(2, "big")
)

#Use select to obtain a non-blocking accept.
busy: int = 0
buf: bytes
for _ in select.select([busyServer], [], [], 5000):
#Accept a new connection.
client, _ = busyServer.accept()

#Verify Meros's Handshake.
buf = client.recv(38)
if MessageType(buf[0]) not in {MessageType.Handshake, MessageType.Syncing}:
busyServer.close()
raise TestError("Meros didn't start its connection with a Handshake.")

if buf[1:] != (
(254).to_bytes(1, "big") +
(254).to_bytes(1, "big") +
(128).to_bytes(1, "big") + (rpc.meros.tcp).to_bytes(2, "big") +
blockchain.blocks[0].header.hash
):
busyServer.close()
raise TestError("Meros had an invalid Handshake.")

#Send back Busy.
client.send(
MessageType.Busy.toByte() +
bytes.fromhex("017F000001") +
server.getsockname()[1].to_bytes(2, "big")
)

busy += 1
if busy == 2:
busyServer.close()
break

#Make sure Meros connects to the server we redirected to.
for _ in select.select([server], [], [], 5000):
#Accept a new connection.
client, _ = server.accept()

#Verify Meros's Handshake.
buf = client.recv(38)
if MessageType(buf[0]) not in {MessageType.Handshake, MessageType.Syncing}:
server.close()
raise TestError("Meros didn't start its connection with a Handshake.")

if buf[1:] != (
(254).to_bytes(1, "big") +
(254).to_bytes(1, "big") +
(128).to_bytes(1, "big") + (rpc.meros.tcp).to_bytes(2, "big") +
blockchain.blocks[0].header.hash
):
server.close()
raise TestError("Meros had an invalid Handshake.")

server.close()
raise SuccessError("Meros connected to the server we redirected it to with a Busy message.")

#Raise a TestError.
busyServer.close()
server.close()
raise TestError("Meros didn't connect to the redirected server.")
84 changes: 84 additions & 0 deletions PythonTests/Tests/Network/ULimitTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#Verifies Meros will send Busy when it starts approaching the open file limit.

#Types.
from typing import List

#Blockchain class.
from PythonTests.Classes.Merit.Blockchain import Blockchain

#Exceptions.
from PythonTests.Tests.Errors import TestError

#Meros classes.
from PythonTests.Meros.Meros import MessageType, BusyError, MerosSocket
from PythonTests.Meros.RPC import RPC

#Socket standard lib.
import socket

#Sleep standard function.
from time import sleep

def ULimitTest(
#pylint: disable=unused-argument
rpc: RPC
) -> None:
#Sleep 60 seconds so Meros can correct its FD count.
sleep(60)

#Create a Blockchain so we have the genesis Block hash.
blockchain: Blockchain = Blockchain()

#Create peers until Meros sends us busy.
sockets: List[MerosSocket] = []
while True:
#Only create live sockets to trigger new peers for each socket.
try:
sockets.append(MerosSocket(5132, 254, 254, True, blockchain.blocks[0].header.hash))
except BusyError as e:
if e.handshake != (MessageType.Busy.toByte() + bytes(1)):
raise TestError("Meros sent an invalid Busy message.")
break

#Trigger busy 32 more times to verify Meros doesn't still allocate file handles.
for _ in range(32):
try:
MerosSocket(5132, 254, 254, True, blockchain.blocks[0].header.hash)
except BusyError as e:
if e.handshake != (MessageType.Busy.toByte() + bytes(1)):
raise TestError("Meros sent an invalid Busy message.")
continue
raise TestError("Meros didn't send Busy despite being at capacity.")

#Disconnect the last 50 sockets.
for _ in range(50):
sockets[-1].connection.shutdown(socket.SHUT_RDWR)
sockets[-1].connection.close()
del sockets[-1]

#Send a Handshake over every remaining socket every 20 seconds for a minute.
#Then Meros should update the amount of files it has open and accept 50 new sockets.
for _ in range(3):
for lSocket in sockets:
lSocket.send(
MessageType.Handshake.toByte() +
(254).to_bytes(1, "big") +
(254).to_bytes(1, "big") +
b'\0\0\0' +
blockchain.blocks[0].header.hash
)
sleep(20)

#Connect 50 sockets and verify Meros doesn't think it's still at capacity.
for _ in range(50):
try:
sockets.append(MerosSocket(5132, 254, 254, True, blockchain.blocks[0].header.hash))
except BusyError:
raise TestError("Meros thought it was at capcity when it wasn't.")

#Verify connecting one more socket returns Busy.
try:
MerosSocket(5132, 254, 254, True, blockchain.blocks[0].header.hash)
except BusyError:
return
raise TestError("Meros accepted a socket despite being at capcity.")
6 changes: 5 additions & 1 deletion docs/Protocol/Handshake.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ If a node sends it after connection, the expected response is a `BlockchainTail`

`Syncing` is sent when two nodes form a new connection. It declares the current connection as the Sync socket. It has a message length of 37-bytes; the 1-byte network ID, 1-byte protocol ID, 1-byte supported services, 2-byte server port, and the 32-byte sender's Blockchain's tail Block's hash.

# Busy

`Busy` is sent when a node receives a connection, which it can accept, yet is unwilling to handle it due to the lack of some resource. It's a valid response to either handshake message, yet only to the initial handshake. Beyond the message byte, it is a clone of `Peers` (described in the Syncing documentation), enabling nodes who tried to connect, and failed, to learn of other nodes to try.

# BlockchainTail

`BlockchainTail` is the expected response to a `Syncing` sent after the peers have already performed their initial connection. It has a message length of 32 bytes; the 32-byte sender's Blockchain's tail Block's hash.
`BlockchainTail` is the expected response to a `Handshake` or `Syncing` which was sent after the peers have already performed their initial handshake. It has a message length of 32 bytes; the 32-byte sender's Blockchain's tail Block's hash.
3 changes: 2 additions & 1 deletion docs/Protocol/Header.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The message types are as follows (with their list number being their byte header
<ol start="0">
<li><code>Handshake</code></li>
<li><code>Syncing</code></li>
<li><code>Busy</code></li>
<li><code>BlockchainTail</code></li>
<br>
<li><code>PeersRequest</code></li>
Expand Down Expand Up @@ -41,7 +42,7 @@ The message types are as follows (with their list number being their byte header
<li><code>VerificationPacket</code></li>
</ol>

Every message between `Syncing` and `DataMissing`, as well as everything after `BlockBody` (inclusive), can only be sent over the Sync socket. `Handshake`, as well as every message between `SignedVerification` and `SignedMeritRemoval` can only be sent over the Live socket. Every other message (`BlockchainTail`, `Claim` through `Unlock`, `Checkpoint`, and `BlockHeader`) can be sent over either socket.
Every message between `Syncing` and `DataMissing`, excluding `Busy`, as well as everything after `BlockBody` (inclusive), can only be sent over the Sync socket. `Handshake`, as well as every message between `SignedVerification` and `SignedMeritRemoval` can only be sent over the Live socket. Every other message (`Busy`, `BlockchainTail`, `Claim` through `Unlock`, `Checkpoint`, and `BlockHeader`) can be sent over either socket.

The Live socket is a connection where every message is proactive. When a node rebroadcasts new data, it's sent over the Live Socket. The Sync socket is a connection where every message is reactive. One party makes a request and one party makes a response. Either party can make a request at any point in time, yet the responses must be in the exact same order as the requests.

Expand Down
4 changes: 0 additions & 4 deletions docs/Protocol/Syncing.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# Syncing

### Syncing

`Syncing` is described in the Handshake documentation.

### PeersRequest and Peers

`PeersRequest` is used to request the connection info of other Meros nodes, and has a message length of 0 bytes. The expected response is a `Peers`, which has a variable message length; the 1-byte amount of peers, and for each peer, the 4-byte IPv4 address and 2-byte port. The peers sent in a `Peers` message is completely up to the syncee.
Expand Down
7 changes: 6 additions & 1 deletion src/MainNetwork.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ proc mainNetwork(
panic("Network.broadcast threw an Exception despite not naturally throwing any: " & e.msg)

#Look for new peers if we don't have enough already.
var requestPeersTimer: TimerCallback = nil
proc requestPeersRegularly(
data: pointer = nil
) {.gcsafe, forceCheck: [].} =
Expand All @@ -75,9 +76,13 @@ proc mainNetwork(
except Exception as e:
panic("Couldn't request peers despite requesting peers not raising anything: " & e.msg)

#Clear the existing timer.
if not requestPeersTimer.isNil:
clearTimer(requestPeersTimer)

#Add a new timer to look for peers since this one expired.
try:
discard setTimer(Moment.fromNow(minutes(2)), requestPeersRegularly)
requestPeersTimer = setTimer(Moment.fromNow(minutes(2)), requestPeersRegularly)
except OSError as e:
panic("Couldn't re-set a timer to request peers: " & e.msg)

Expand Down