Skip to content

Commit

Permalink
Sketching some more stuff out
Browse files Browse the repository at this point in the history
  • Loading branch information
soup-in-boots committed Apr 20, 2019
1 parent c447bb6 commit e31d853
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 20 deletions.
6 changes: 6 additions & 0 deletions lib/adapters/cluster-slave.js
Expand Up @@ -63,6 +63,8 @@ function create_adapter(emitter) {
switch (message.cmd) {
case 'rpc-reply':
return handle_rpc_reply(message);
case 'rpc':
return handle_rpc(message);
case 'deliver':
return deliver(message.payload, false);
}
Expand All @@ -80,6 +82,10 @@ function create_adapter(emitter) {
return resolve(reply.result);
}

async function handle_rpc(message) {
log('rpc : %o', message);
}

async function deliver(message, tryMaster = true) {
log('deliver : %O', message);

Expand Down
42 changes: 25 additions & 17 deletions lib/adapters/cluster.js
Expand Up @@ -3,6 +3,7 @@ const path = require('path');
const os = require('os');
const serialize = require('../serialize');
const debug = require('debug');
const {PID} = require('../types');

const log = debug('open-telecom:adapters:cluster');

Expand All @@ -12,29 +13,36 @@ function create_cluster_manager(cluster = node_cluster) {
this.workers = [];
this.node_ids = 0;
this.cpus = os.cpus();
this.resolvers = new Map();
this.node_workers = new Map();
}

start() {
async start() {
cluster.setupMaster({
exec: path.resolve(__dirname, 'cluster-slave.js'),
exec: path.resolve(__dirname, 'slave-start.js'),
});

for (let cpu of this.cpus) {
const worker = cluster.fork();
this.workers.push(worker);
}

cluster.on('message', (worker, message) => {
log('message : %o', message);
const parsed = serialize.parse(message);
log('parsed : %o', parsed);
this.handle_message(worker, parsed);
});

for (let cpu of this.cpus) {
log('fork');
const worker = cluster.fork();
this.workers.push(worker);
await new Promise(resolve => setTimeout(resolve, 50));
}
}

async load_module(load_module) {
for (let worker in this.workers) {
await this.rpc(worker, 'load_module', [module]);
async load_module(module) {
log('load_module : %o', module);
for (let node_id of this.node_workers.keys()) {
log('load_module : deliver : %o', node_id);
await this.deliver_message(
{to: PID.of(node_id, 0), msg: {cmd: 'load_module', args: [module]}});
}
}

Expand All @@ -49,6 +57,7 @@ function create_cluster_manager(cluster = node_cluster) {
break;
case 'register_node':
const node_id = ++this.node_ids;
this.node_workers.set(node_id, worker);
result = node_id;
break;
default:
Expand All @@ -62,14 +71,13 @@ function create_cluster_manager(cluster = node_cluster) {
worker.send(serialize.stringify({cmd: 'rpc-reply', ref: message.ref, error, result}));
}

deliver_message(message) {}
deliver_message(message) {
log('deliver_message : %o', message);

rpc(worker, cmd, args = []) {
return new Promise((resolve, reject) => {
const ref = this.refs++;
worker.send(serialize.stringify({cmd, args, ref}));
this._resolvers.set(ref, {resolve, reject});
});
const node_id = message.to.node;
const worker = this.node_workers.get(node_id);

worker.send(serialize.stringify({cmd: 'deliver', payload: message}))
}
}

Expand Down
29 changes: 26 additions & 3 deletions lib/otp/supervisor.js
@@ -1,5 +1,8 @@
const proc_lib = require('./proc_lib');

const ONE_FOR_ONE = Symbol.for('$otp:supervisor:one_for_one');
const ONE_FOR_ALL = Symbol.for('$otp:supervisor:one_for_all');

async function start(ctx, callbacks) {
return proc_lib.start((ctx) => init(ctx, callbacks));
}
Expand All @@ -8,11 +11,31 @@ async function start_link() {
return proc_lib.start_link((ctx) => init(ctx, callbacks));
}

async function init(ctx) {}
async function init(ctx, callbacks) {
const {ok, strategy, children} = callbacks.init(ctx);

if (!ok)
throw new Error('invalid initial state');

async function loop(state) {}
const spawned = children.map(descriptor => {
const pid = ctx.spawn_link(descriptor);
return pid;
});

return loop(ctx, {children, spawned, strategy});
}

async function loop(ctx, state) {
let running = true;
while (running) {
const message = ctx.receive();
}
}

module.exports = {
start,
start_link
start_link,

ONE_FOR_ONE,
ONE_FOR_ALL,
};

0 comments on commit e31d853

Please sign in to comment.