Skip to content

Commit

Permalink
update AWS image
Browse files Browse the repository at this point in the history
  • Loading branch information
asonnino committed Feb 8, 2024
1 parent 667595d commit 7c385a0
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 100 deletions.
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
> **Note to readers:** MystenLabs is making this codebase production-ready [here](https://github.com/MystenLabs/sui/tree/main/narwhal).

# Narwhal and Tusk
[![build status](https://img.shields.io/github/actions/workflow/status/asonnino/narwhal/rust.yml?branch=main&logo=github&style=flat-square)](https://github.com/asonnino/narwhal/actions)

[![build status](https://img.shields.io/github/actions/workflow/status/asonnino/narwhal/rust.yml?branch=master&logo=github&style=flat-square)](https://github.com/asonnino/narwhal/actions)
[![rustc](https://img.shields.io/badge/rustc-1.51+-blue?style=flat-square&logo=rust)](https://www.rust-lang.org)
[![python](https://img.shields.io/badge/python-3.9-blue?style=flat-square&logo=python&logoColor=white)](https://www.python.org/downloads/release/python-390/)
[![license](https://img.shields.io/badge/license-Apache-blue.svg?style=flat-square)](LICENSE)

This repo provides an implementation of [Narwhal and Tusk](https://arxiv.org/pdf/2105.11827.pdf). The codebase has been designed to be small, efficient, and easy to benchmark and modify. It has not been designed to run in production but uses real cryptography ([dalek](https://doc.dalek.rs/ed25519_dalek)), networking ([tokio](https://docs.rs/tokio)), and storage ([rocksdb](https://docs.rs/rocksdb)).

## Quick Start

The core protocols are written in Rust, but all benchmarking scripts are written in Python and run with [Fabric](http://www.fabfile.org/).
To deploy and benchmark a testbed of 4 nodes on your local machine, clone the repo and install the python dependencies:

```
$ git clone https://github.com/asonnino/narwhal.git
$ cd narwhal/benchmark
$ pip install -r requirements.txt
```

You also need to install Clang (required by rocksdb) and [tmux](https://linuxize.com/post/getting-started-with-tmux/#installing-tmux) (which runs all nodes and clients in the background). Finally, run a local benchmark using fabric:

```
$ fab local
```

This command may take a long time the first time you run it (compiling rust code in `release` mode may be slow) and you can customize a number of benchmark parameters in `fabfile.py`. When the benchmark terminates, it displays a summary of the execution similarly to the one below.

```
-----------------------------------------
SUMMARY:
Expand Down Expand Up @@ -54,9 +61,11 @@ This command may take a long time the first time you run it (compiling rust code
```

## Next Steps
The next step is to read the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf). It is then recommended to have a look at the README files of the [worker](https://github.com/asonnino/narwhal/tree/master/worker) and [primary](https://github.com/asonnino/narwhal/tree/master/primary) crates. An additional resource to better understand the Tusk consensus protocol is the paper [All You Need is DAG](https://arxiv.org/abs/2102.08325) as it describes a similar protocol.

The next step is to read the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf). It is then recommended to have a look at the README files of the [worker](https://github.com/asonnino/narwhal/tree/master/worker) and [primary](https://github.com/asonnino/narwhal/tree/master/primary) crates. An additional resource to better understand the Tusk consensus protocol is the paper [All You Need is DAG](https://arxiv.org/abs/2102.08325) as it describes a similar protocol.

The README file of the [benchmark folder](https://github.com/asonnino/narwhal/tree/master/benchmark) explains how to benchmark the codebase and read benchmarks' results. It also provides a step-by-step tutorial to run benchmarks on [Amazon Web Services (AWS)](https://aws.amazon.com) accross multiple data centers (WAN).

## License

This software is licensed as [Apache 2.0](LICENSE).
199 changes: 102 additions & 97 deletions benchmark/benchmark/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@
class AWSError(Exception):
def __init__(self, error):
assert isinstance(error, ClientError)
self.message = error.response['Error']['Message']
self.code = error.response['Error']['Code']
self.message = error.response["Error"]["Message"]
self.code = error.response["Error"]["Code"]
super().__init__(self.message)


class InstanceManager:
INSTANCE_NAME = 'dag-node'
SECURITY_GROUP_NAME = 'dag'
INSTANCE_NAME = "dag-node"
SECURITY_GROUP_NAME = "dag"

def __init__(self, settings):
assert isinstance(settings, Settings)
self.settings = settings
self.clients = OrderedDict()
for region in settings.aws_regions:
self.clients[region] = boto3.client('ec2', region_name=region)
self.clients[region] = boto3.client("ec2", region_name=region)

@classmethod
def make(cls, settings_file='settings.json'):
def make(cls, settings_file="settings.json"):
try:
return cls(Settings.load(settings_file))
except SettingsError as e:
raise BenchError('Failed to load settings', e)
raise BenchError("Failed to load settings", e)

def _get(self, state):
# Possible states are: 'pending', 'running', 'shutting-down',
Expand All @@ -41,21 +41,15 @@ def _get(self, state):
for region, client in self.clients.items():
r = client.describe_instances(
Filters=[
{
'Name': 'tag:Name',
'Values': [self.INSTANCE_NAME]
},
{
'Name': 'instance-state-name',
'Values': state
}
{"Name": "tag:Name", "Values": [self.INSTANCE_NAME]},
{"Name": "instance-state-name", "Values": state},
]
)
instances = [y for x in r['Reservations'] for y in x['Instances']]
instances = [y for x in r["Reservations"] for y in x["Instances"]]
for x in instances:
ids[region] += [x['InstanceId']]
if 'PublicIpAddress' in x:
ips[region] += [x['PublicIpAddress']]
ids[region] += [x["InstanceId"]]
if "PublicIpAddress" in x:
ips[region] += [x["PublicIpAddress"]]
return ids, ips

def _wait(self, state):
Expand All @@ -69,51 +63,63 @@ def _wait(self, state):

def _create_security_group(self, client):
client.create_security_group(
Description='HotStuff node',
Description="HotStuff node",
GroupName=self.SECURITY_GROUP_NAME,
)

client.authorize_security_group_ingress(
GroupName=self.SECURITY_GROUP_NAME,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [{
'CidrIp': '0.0.0.0/0',
'Description': 'Debug SSH access',
}],
'Ipv6Ranges': [{
'CidrIpv6': '::/0',
'Description': 'Debug SSH access',
}],
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [
{
"CidrIp": "0.0.0.0/0",
"Description": "Debug SSH access",
}
],
"Ipv6Ranges": [
{
"CidrIpv6": "::/0",
"Description": "Debug SSH access",
}
],
},
{
'IpProtocol': 'tcp',
'FromPort': self.settings.base_port,
'ToPort': self.settings.base_port + 2_000,
'IpRanges': [{
'CidrIp': '0.0.0.0/0',
'Description': 'Dag port',
}],
'Ipv6Ranges': [{
'CidrIpv6': '::/0',
'Description': 'Dag port',
}],
}
]
"IpProtocol": "tcp",
"FromPort": self.settings.base_port,
"ToPort": self.settings.base_port + 2_000,
"IpRanges": [
{
"CidrIp": "0.0.0.0/0",
"Description": "Dag port",
}
],
"Ipv6Ranges": [
{
"CidrIpv6": "::/0",
"Description": "Dag port",
}
],
},
],
)

def _get_ami(self, client):
# The AMI changes with regions.
response = client.describe_images(
Filters=[{
'Name': 'description',
'Values': ['Canonical, Ubuntu, 20.04 LTS, amd64 focal image build on 2020-10-26']
}]
Filters=[
{
"Name": "description",
"Values": [
"Canonical, Ubuntu, 22.04 LTS, amd64 jammy image build on 2023-09-19"
],
}
]
)
return response['Images'][0]['ImageId']
return response["Images"][0]["ImageId"]

def create_instances(self, instances):
assert isinstance(instances, int) and instances > 0
Expand All @@ -124,14 +130,14 @@ def create_instances(self, instances):
self._create_security_group(client)
except ClientError as e:
error = AWSError(e)
if error.code != 'InvalidGroup.Duplicate':
raise BenchError('Failed to create security group', error)
if error.code != "InvalidGroup.Duplicate":
raise BenchError("Failed to create security group", error)

try:
# Create all instances.
size = instances * len(self.clients)
progress = progress_bar(
self.clients.values(), prefix=f'Creating {size} instances'
self.clients.values(), prefix=f"Creating {size} instances"
)
for client in progress:
client.run_instances(
Expand All @@ -141,37 +147,38 @@ def create_instances(self, instances):
MaxCount=instances,
MinCount=instances,
SecurityGroups=[self.SECURITY_GROUP_NAME],
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [{
'Key': 'Name',
'Value': self.INSTANCE_NAME
}]
}],
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [{"Key": "Name", "Value": self.INSTANCE_NAME}],
}
],
EbsOptimized=True,
BlockDeviceMappings=[{
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeType': 'gp2',
'VolumeSize': 200,
'DeleteOnTermination': True
BlockDeviceMappings=[
{
"DeviceName": "/dev/sda1",
"Ebs": {
"VolumeType": "gp2",
"VolumeSize": 200,
"DeleteOnTermination": True,
},
}
}],
],
)

# Wait for the instances to boot.
Print.info('Waiting for all instances to boot...')
self._wait(['pending'])
Print.heading(f'Successfully created {size} new instances')
Print.info("Waiting for all instances to boot...")
self._wait(["pending"])
Print.heading(f"Successfully created {size} new instances")
except ClientError as e:
raise BenchError('Failed to create AWS instances', AWSError(e))
raise BenchError("Failed to create AWS instances", AWSError(e))

def terminate_instances(self):
try:
ids, _ = self._get(['pending', 'running', 'stopping', 'stopped'])
ids, _ = self._get(["pending", "running", "stopping", "stopped"])
size = sum(len(x) for x in ids.values())
if size == 0:
Print.heading(f'All instances are shut down')
Print.heading(f"All instances are shut down")
return

# Terminate instances.
Expand All @@ -180,64 +187,62 @@ def terminate_instances(self):
client.terminate_instances(InstanceIds=ids[region])

# Wait for all instances to properly shut down.
Print.info('Waiting for all instances to shut down...')
self._wait(['shutting-down'])
Print.info("Waiting for all instances to shut down...")
self._wait(["shutting-down"])
for client in self.clients.values():
client.delete_security_group(
GroupName=self.SECURITY_GROUP_NAME
)
client.delete_security_group(GroupName=self.SECURITY_GROUP_NAME)

Print.heading(f'Testbed of {size} instances destroyed')
Print.heading(f"Testbed of {size} instances destroyed")
except ClientError as e:
raise BenchError('Failed to terminate instances', AWSError(e))
raise BenchError("Failed to terminate instances", AWSError(e))

def start_instances(self, max):
size = 0
try:
ids, _ = self._get(['stopping', 'stopped'])
ids, _ = self._get(["stopping", "stopped"])
for region, client in self.clients.items():
if ids[region]:
target = ids[region]
target = target if len(target) < max else target[:max]
size += len(target)
client.start_instances(InstanceIds=target)
Print.heading(f'Starting {size} instances')
Print.heading(f"Starting {size} instances")
except ClientError as e:
raise BenchError('Failed to start instances', AWSError(e))
raise BenchError("Failed to start instances", AWSError(e))

def stop_instances(self):
try:
ids, _ = self._get(['pending', 'running'])
ids, _ = self._get(["pending", "running"])
for region, client in self.clients.items():
if ids[region]:
client.stop_instances(InstanceIds=ids[region])
size = sum(len(x) for x in ids.values())
Print.heading(f'Stopping {size} instances')
Print.heading(f"Stopping {size} instances")
except ClientError as e:
raise BenchError(AWSError(e))

def hosts(self, flat=False):
try:
_, ips = self._get(['pending', 'running'])
_, ips = self._get(["pending", "running"])
return [x for y in ips.values() for x in y] if flat else ips
except ClientError as e:
raise BenchError('Failed to gather instances IPs', AWSError(e))
raise BenchError("Failed to gather instances IPs", AWSError(e))

def print_info(self):
hosts = self.hosts()
key = self.settings.key_path
text = ''
text = ""
for region, ips in hosts.items():
text += f'\n Region: {region.upper()}\n'
text += f"\n Region: {region.upper()}\n"
for i, ip in enumerate(ips):
new_line = '\n' if (i+1) % 6 == 0 else ''
text += f'{new_line} {i}\tssh -i {key} ubuntu@{ip}\n'
new_line = "\n" if (i + 1) % 6 == 0 else ""
text += f"{new_line} {i}\tssh -i {key} ubuntu@{ip}\n"
print(
'\n'
'----------------------------------------------------------------\n'
' INFO:\n'
'----------------------------------------------------------------\n'
f' Available machines: {sum(len(x) for x in hosts.values())}\n'
f'{text}'
'----------------------------------------------------------------\n'
"\n"
"----------------------------------------------------------------\n"
" INFO:\n"
"----------------------------------------------------------------\n"
f" Available machines: {sum(len(x) for x in hosts.values())}\n"
f"{text}"
"----------------------------------------------------------------\n"
)

0 comments on commit 7c385a0

Please sign in to comment.