Permalink
Browse files

waitpid and fork seem to be functioning smoothly now. On to figuring …

…out pipes.
  • Loading branch information...
ashgti committed Mar 11, 2011
1 parent 8c819c4 commit 780a12445ae3ecd0b7834eeb2ff85c86ec3166cc
Showing with 117 additions and 41 deletions.
  1. +62 −12 multiprocessing.cc
  2. +18 −3 multiprocessing.js
  3. +37 −26 test/sanity.js
View
@@ -2,8 +2,11 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
+#include <fcntl.h>
#include <errno.h>
#include <iostream>
+#include <cstdio>
+#include <ev.h>
using namespace v8;
using namespace std;
@@ -18,18 +21,57 @@ static Handle<Value> _fork(const Arguments& args) {
Local<Function> fn = Local<Function>::Cast(args[0]);
switch(pid = fork()) {
case -1: /* Error */
- perror("fork()");
+ perror("fork");
_exit(256); /* Looking for a unique exit code... */
- return Undefined();
+ return scope.Close(Undefined());
break;
case 0: /* Child */
- return Integer::New(pid);
+ {
+ /* Fix ev */
+ ev_loop_fork(EV_DEFAULT);
+ /* update PID */
+ Local<Object> global = v8::Context::GetCurrent()->Global();
+ Local<Value> process_v = global->Get(String::NewSymbol("process"));
+ Local<Object> process = process_v->ToObject();
+ process->Set(String::NewSymbol("pid"), Integer::New(getpid()));
+ return scope.Close(Integer::New(pid));
+ }
break;
default: /* Parent */
- return Integer::New(pid);
+ return scope.Close(Integer::New(pid));
}
}
+/* This is used in the core of node so, I copied it to emulate
+ * the same behaviour in my module.
+ */
+static inline int SetCloseOnExec(int fd) {
+ int flags = fcntl(fd, F_GETFD, 0);
+ int r = fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
+ if (r != 0) {
+ perror("SetCloseOnExec");
+ }
+ return r;
+}
+
+/* Create a pipe */
+static Handle<Value> _pipe(const Arguments& args) {
+ HandleScope scope;
+ int fds[2];
+ if (pipe(fds) == -1) {
+ return ThrowException(ErrnoException(errno, "pipe"));
+ }
+
+ SetCloseOnExec(fds[0]);
+ SetCloseOnExec(fds[1]);
+
+ Local<Array> results = Array::New(2);
+ results->Set(0, Integer::New(fds[0]));
+ results->Set(1, Integer::New(fds[0]));
+ return scope.Close(results);
+}
+
+/* waitpid */
static Handle<Value> _waitpid(const Arguments& args) {
HandleScope scope;
pid_t pid;
@@ -53,7 +95,7 @@ static Handle<Value> _waitpid(const Arguments& args) {
results = Array::New(2);
results->Set(0, Integer::New(r));
results->Set(1, Integer::New(status));
- return results;
+ return scope.Close(results);
}
}
@@ -64,61 +106,67 @@ static Handle<Value> _WIFSIGNALED(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WIFSIGNALED(val));
+ return scope.Close(Boolean::New(WIFSIGNALED(val)));
}
#endif
#ifdef WEXITSTATUS
static Handle<Value> _WEXITSTATUS(const Arguments& args) {
+ HandleScope scope;
if (args.Length() < 1) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WEXITSTATUS(val));
+ return scope.Close(Boolean::New(WEXITSTATUS(val)));
}
#endif
#ifdef WSTOPSIG
static Handle<Value> _WSTOPSIG(const Arguments& args) {
+ HandleScope scope;
if (args.Length() < 1) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WSTOPSIG(val));
+ return scope.Close(Boolean::New(WSTOPSIG(val)));
}
#endif
#ifdef WIFCONTINUED
static Handle<Value> _WIFCONTINUED(const Arguments& args) {
+ HandleScope scope;
if (args.Length() < 1) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WIFCONTINUED(val));
+ return scope.Close(Boolean::New(WIFCONTINUED(val)));
}
#endif
#ifdef WIFEXITED
static Handle<Value> _WIFEXITED(const Arguments& args) {
+ HandleScope scope;
if (args.Length() < 1) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WIFEXITED(val));
+ return scope.Close(Boolean::New(WIFEXITED(val)));
}
#endif
#ifdef WTERMSIG
static Handle<Value> _WTERMSIG(const Arguments& args) {
+ HandleScope scope;
if (args.Length() < 1) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WTERMSIG(val));
+ return scope.Close(Boolean::New(WTERMSIG(val)));
}
#endif
#ifdef WCOREDUMP
static Handle<Value> _WCOREDUMP(const Arguments& args) {
+ HandleScope scope;
if (args.Length() < 1) {
return ThrowException(Exception::Error(String::New("Bad argument.")));
}
int val = args[0]->Int32Value();
- return Boolean::New(WCOREDUMP(val));
+ return scope.Close(Boolean::New(WCOREDUMP(val)));
}
#endif
@@ -160,5 +208,7 @@ init (Handle<Object> target) {
NODE_SET_METHOD(target, "fork", multiprocessing::_fork);
NODE_SET_METHOD(target, "waitpid", multiprocessing::_waitpid);
+
+ scope.Close(target);
}
View
@@ -1,12 +1,28 @@
var mp_native = require('./build/default/multiprocessing.node');
+/* Multiprocessing object, represents a 2nd process.
+ */
function mp(target) {
this.target = target;
this.args = arguments.slice(1);
this.pid = -1;
this.exitcode = -1; // Has not yet exited
+
+ var internal = this._internal = new InternalChildProcess();
+
+ var stdin = this.stdin = new Stream();
+ var stdout = this.stdout = new Stream();
+ var stderr = this.stderr = new Stream();
+
+ var stderrClosed = false;
+ var stdoutClosed = false;
+
+ stderr.addListener('close', function () {
+ stderrClosed = true;
+ });
}
+/* */
mp.prototype.start = function () {
this.pid = mp._fork();
if (pid == 0) {
@@ -44,9 +60,8 @@ exports.WIFCONTINUED = mp_native.WIFCONTINUED;
exports.WIFEXITED = mp_native.WIFEXITED;
exports.WTERMSIG = mp_native.WTERMSIG;
exports.WCOREDUMP = mp_native.WCOREDUMP;
-exports.sleep = function () {
-
-}
+
+/* IO Multiplexing... Do I need this or can I pull it from the core of node-js? */
exports.select = function () {
}
View
@@ -1,34 +1,45 @@
var mp = require('../multiprocessing.js');
-var r = mp.fork();
+var r = mp.fork();
if (r == 0) {
- console.log("process.pid: " + process.pid);
+ console.log("child process.pid: " + process.pid);
+ var finished = false;
+ var timer = (function() {
+ return setTimeout(function () {
+ console.log("callback called!!!?");
+ finished = true;
+ console.log("Dead... now");
+ process.exit(0);
+ }, 3500);
+ })();
- setTimeout(function () {
- process.exit(0);
- }, 3500);
-
- while(1) { }
+ for (var x in timer) {
+ console.log("x:" +x+ " :v: " +timer[x]);
+ }
+ console.log("Timer id: " + timer);
+ console.log("started child timer?");
}
+else {
+ console.log("worked? child is: " + r + " parent is:" + process.pid);
-console.log("worked? parent " + r + ":" + process.pid);
-
-var timer;
-
-timer = setInterval(function () {
- console.log('entering timer r: ' + r);
- var got = mp.waitpid(r, mp.WNOHANG);
+ var timer;
+ timer = setInterval(function () {
+ console.log('entering timer r: ' + r);
+ var got = mp.waitpid(-1, mp.WNOHANG);
- console.log("a: " + got[0] + " b:" + got[1]);
- if (got[0] == process.pid) {
- clearInterval(timer);
- if (mp.WIFSIGNALED(got[1])) {
- console.log("a: " + (-mp.WTERMSIG(got[1])));
- console.log('finished');
+ console.log("a: " + got[0] + " b:" + got[1]);
+ if (got[0] == 0) {
+ clearInterval(timer);
+ if (mp.WIFSIGNALED(got[1])) {
+ console.log("a: " + (-mp.WTERMSIG(got[1])));
+ console.log('finished');
+ }
+ else if (mp.WIFEXITED(got[1])) {
+ console.log("b: " + os.WEXITSTATUS(got[1]));
+ }
}
- else {
- console.log("b: " + os.WEXITSTATUS(got[1]));
- }
- }
- console.log('exiting timer');
-}, 2000);
+ console.log('exiting timer');
+ }, 2000);
+}
+
+console.log("Finished if");

0 comments on commit 780a124

Please sign in to comment.