Skip to content

Commit

Permalink
Sketching the ZMQ Protobuf solution
Browse files Browse the repository at this point in the history
  • Loading branch information
hisorange committed Jul 27, 2021
1 parent 3683b76 commit 3d19a7e
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 8 deletions.
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ asyncio
aio-udp-server
pycryptodome
msgpack
coloredlogs
coloredlogs
pyzmq
pymitter
35 changes: 35 additions & 0 deletions src/coordination.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
syntax = "proto2";

package scale;

message Interface {
required string name = 1;
required string address = 2;
required string mask = 3;
required string broadcast = 4;
required string gateway = 4;
required string mac = 4;
}

message Join {
required string hostname = 1;
required int32 port = 2;
repeated Interface interfaces = 3;
}

message Leave {
required string hostname = 1;
required int32 port = 2;
}

message NetworkMap {
message Node {
required string hostname = 1;
required int32 port = 2;
required boolean neighbor = 3;

repeated Interface interfaces = 4;
}

repeated Node nodes = 1;
}
33 changes: 33 additions & 0 deletions src/scale/network/cordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

import zmq
import zmq.asyncio
from pymitter import EventEmitter


class Coordinator:
def __init__(self, loop: asyncio.AbstractEventLoop, event_bus: EventEmitter):
self.loop = loop
self.event_bus = event_bus

def connect(self):
self.context = zmq.asyncio.Context.instance()
self.socket = self.context.socket(zmq.ROUTER)
self.socket.bind(self.address)

self.subscribe()

def subscribe(self):
self.loop.ensure_futures(self.handle_incoming(), self.loop)
self.loop.ensure_futures(self.publish(), self.loop)

async def handle_incoming(self,):
msg = await self.sock.recv_multipart()
# Decode protobuf
print(msg)
# Dispatch to listeners
self.event_bus.emit('message.message_type', msg)

async def send(self, reply):
# Encode protobuf
await self.socket.send_multipart(reply)
4 changes: 4 additions & 0 deletions src/scale/scale.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio

from pymitter import EventEmitter

from scale.config import Config
from scale.dependencymanager import DependencyManager
from scale.discovery import Discovery
Expand All @@ -11,6 +13,8 @@ async def start_scale(loop: asyncio.AbstractEventLoop):
config = Config()
config.load()

event_bus = EventEmitter()

DependencyManager()

vpn = VPNManager(config)
Expand Down
14 changes: 7 additions & 7 deletions src/scale/vpn.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import string
from configparser import ConfigParser

import netifaces

from scale.logger import create_logger
from scale.network.node import Node

Expand All @@ -15,6 +17,10 @@ def __init__(self, config):
self.iface = self.config.network['interface']
pass

# Read the interfaces from the host
def get_interfaces(self):
return netifaces.interfaces()

def bootstrap(self):
# Check if the VPN is running
if (len(self.config.network['passKey']) == 0):
Expand All @@ -31,13 +37,7 @@ def bootstrap(self):
self.logger.info('Bootstrapped VPN...')

def check_if_wg_running(self):
result = os.system('ip a | grep {}'.format(
self.iface))

if (result == 0):
return True
else:
return False
return self.get_interfaces().__contains__(self.iface)

def generate_wg_config(self) -> None:
config_path = os.path.join(
Expand Down

0 comments on commit 3d19a7e

Please sign in to comment.