Skip to content

Commit

Permalink
important fix to the message latency fix. the timing is now done in a
Browse files Browse the repository at this point in the history
separate thread so messages can't be delayed by compute tasks.

wrap finalizer calls in try/catch
  • Loading branch information
JeffBezanson committed Jul 22, 2011
1 parent 577814c commit f9637a2
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 123 deletions.
137 changes: 39 additions & 98 deletions j/multi.j
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,6 @@
##
## @bcast expr - run expr everywhere. useful for load().

## message i/o ##

function send_msg(s::IOStream, buf::IOStream, kind, args)
serialize(buf, kind)
for arg=args
serialize(buf, arg)
end
ccall(:jl_enq_send_req, Void, (Ptr{Void}, Ptr{Void}),
s.ios, buf.ios)
#ccall(:ios_write_direct, PtrInt, (Ptr{Void}, Ptr{Void}),
# s.ios, buf.ios)
end

SENDBUF = ()
function send_msg_unknown(s::IOStream, kind, args)
# for sending to a socket not associated with a worker
global SENDBUF
if is(SENDBUF,())
SENDBUF = memio()
end
send_msg(s, SENDBUF::IOStream, kind, args)
end

function send_msg(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

function send_msg_now(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg_now(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

# todo:
# - more indexing
# - take() to empty a Ref (full/empty variables)
Expand All @@ -101,7 +62,27 @@ end
# * add readline to event loop
# * GOs/darrays on a subset of nodes

## process group creation ##
## workers and message i/o ##

function send_msg_unknown(s::IOStream, kind, args)
error("attempt to send to unknown socket")
end

function send_msg(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

function send_msg_now(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg_now(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

type Worker
host::String
Expand All @@ -111,8 +92,6 @@ type Worker
sendbuf::IOStream
id::Int32
del_msgs::Array{Any,1}
lastmsg::Float64
dirty::Bool

function Worker(host, port)
fd = ccall(:connect_to_host, Int32,
Expand All @@ -123,70 +102,39 @@ type Worker
Worker(host, port, fd, fdio(fd))
end

Worker(host,port,fd,sock,id) =
new(host, port, fd, sock, memio(), id, {}, 0.0, false)
Worker(host,port,fd,sock,id) = new(host, port, fd, sock, memio(), id, {})
Worker(host,port,fd,sock) = Worker(host,port,fd,sock,0)
end

function flush_worker(w::Worker)
if !isempty(w.del_msgs)
#print("sending delete of $(w.del_msgs)\n")
remote_do(w, del_clients, w.del_msgs...)
del_all(w.del_msgs)
end
ccall(:jl_enq_send_req, Void, (Ptr{Void}, Ptr{Void}),
w.socket.ios, w.sendbuf.ios)
w.dirty = false
end

function send_msg_now(w::Worker, kind, args...)
send_msg(w.socket, w.sendbuf, kind, args)
w.dirty = false
send_msg_(w, kind, args, true)
end

function send_msg(w::Worker, kind, args...)
send_msg_(w, kind, args, false)
end

function send_msg_(w::Worker, kind, args, now::Bool)
buf = w.sendbuf
ccall(:jl_buf_mutex_lock, Void, ())
serialize(buf, kind)
for arg=args
serialize(buf, arg)
end
if !w.dirty
w.lastmsg = clock()
end
w.dirty = true
end
ccall(:jl_buf_mutex_unlock, Void, ())

function flush_workers()
global PGRP
now = 0.0
for w = PGRP.workers
if isa(w,Worker) && (w::Worker).dirty
k = w::Worker
if now==0.0
now = clock()
end
if now-k.lastmsg >= 0.0002
flush_worker(k)
end
end
if !now && !isempty(w.del_msgs)
msgs = w.del_msgs
w.del_msgs = {}
#print("sending delete of $msgs\n")
remote_do(w, del_clients, msgs...)
else
ccall(:jl_enq_send_req, Void, (Ptr{Void}, Ptr{Void}, Int32),
w.socket.ios, w.sendbuf.ios, now ? int32(1) : int32(0))
end
end

# determine the maximum time we can sleep in select
function max_sleep_time()
global PGRP
mt = 10.0
now = 0.0
for w = PGRP.workers
if isa(w,Worker) && (w::Worker).dirty
if now==0.0
now = clock()
end
mt = min(mt,(w::Worker).lastmsg+0.0002-now)
end
end
max(mt,0.0)
end
## process group creation ##

type LocalProcess
end
Expand Down Expand Up @@ -407,10 +355,6 @@ function send_del_client(rr::RemoteRef)
else
w = worker_from_id(rr.where)
push(w.del_msgs, (rr2id(rr), myid()))
if !w.dirty
w.lastmsg = clock()
end
w.dirty = true
end
end

Expand Down Expand Up @@ -1466,9 +1410,7 @@ function event_loop(isclient)
add(fdset, fd)
end

nselect = select_read(fdset,
isempty(Workqueue) ? max_sleep_time() :
0.0)
nselect = select_read(fdset, isempty(Workqueue) ? 10.0 : 0.0)
if nselect == 0
if !isempty(Workqueue)
perform_work()
Expand All @@ -1481,7 +1423,6 @@ function event_loop(isclient)
end
end
end
flush_workers()
end
catch e
if isa(e,DisconnectException)
Expand Down
6 changes: 5 additions & 1 deletion src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ static void run_finalizers()
while (jl_is_tuple(ff)) {
f = (jl_function_t*)jl_t0(ff);
assert(jl_is_function(f));
jl_apply(f, (jl_value_t**)&o, 1);
JL_TRY {
jl_apply(f, (jl_value_t**)&o, 1);
}
JL_CATCH {
}
ff = jl_t1(ff);
}
f = (jl_function_t*)ff;
Expand Down
2 changes: 2 additions & 0 deletions src/julia.expmap
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@
jl_eval_user_input;
jl_toplevel_eval;
jl_enq_send_req;
jl_buf_mutex_lock;
jl_buf_mutex_unlock;
jl_start_io_thread;
jl_save_system_image;
jl_restore_system_image;
Expand Down
2 changes: 2 additions & 0 deletions src/support/ios.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ typedef struct {
// use julia-compatible buffer allocator
unsigned char julia_alloc:1;

int64_t userdata;

// todo: mutex
char local[IOS_INLSIZE];
} ios_t;
Expand Down
115 changes: 91 additions & 24 deletions src/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,61 +228,127 @@ int jl_stderr() { return STDERR_FILENO; }
static pthread_t io_thread;
static pthread_mutex_t q_mut;
static pthread_mutex_t wake_mut;
static pthread_mutex_t buf_mut;
static pthread_cond_t wake_cond;

typedef struct _sendreq_t {
int fd;
void *buf;
size_t n;
ios_t *buf;
int now;
struct _sendreq_t *next;
} sendreq_t;

static sendreq_t *ioq = NULL;
static sendreq_t *ioq_freelist = NULL;

int _os_write_all(long fd, void *buf, size_t n, size_t *nwritten);

static void *run_io_thr(void *arg)
{
while (1) {
pthread_mutex_lock(&wake_mut);
pthread_cond_wait(&wake_cond, &wake_mut);
pthread_mutex_unlock(&wake_mut);
if (ioq == NULL) {
pthread_mutex_lock(&wake_mut);
pthread_cond_wait(&wake_cond, &wake_mut);
pthread_mutex_unlock(&wake_mut);
}
assert(ioq != NULL);

pthread_mutex_lock(&q_mut);
while (ioq != NULL) {
sendreq_t *r = ioq;
ioq = ioq->next;
pthread_mutex_unlock(&q_mut);
size_t nw;
_os_write_all(r->fd, r->buf, r->n, &nw);
julia_free(r->buf);
free(r);
pthread_mutex_lock(&q_mut);
sendreq_t *r = ioq;
ioq = ioq->next;
pthread_mutex_unlock(&q_mut);

if (!r->now) {
int64_t now = (int64_t)(clock_now()*1e6);
int64_t waittime = r->buf->userdata+200-now;
if (waittime > 0) {
struct timespec wt;
wt.tv_sec = 0;
wt.tv_nsec = waittime * 1000;
nanosleep(&wt, NULL);
}
}

pthread_mutex_lock(&buf_mut);
size_t sz;
size_t n = r->buf->size;
char *buf = ios_takebuf(r->buf, &sz);
pthread_mutex_unlock(&buf_mut);

size_t nw;
_os_write_all(r->fd, buf, n, &nw);
julia_free(buf);
//free(r);

pthread_mutex_lock(&q_mut);
r->next = ioq_freelist;
ioq_freelist = r;
pthread_mutex_unlock(&q_mut);
}
return NULL;
}

DLLEXPORT
void jl_enq_send_req(ios_t *dest, ios_t *buf)
void jl_buf_mutex_lock()
{
pthread_mutex_lock(&buf_mut);
}

DLLEXPORT
void jl_buf_mutex_unlock()
{
sendreq_t *req = (sendreq_t*)malloc(sizeof(sendreq_t));
pthread_mutex_unlock(&buf_mut);
}

DLLEXPORT
void jl_enq_send_req(ios_t *dest, ios_t *buf, int now)
{
pthread_mutex_lock(&q_mut);
sendreq_t *req = ioq;
sendreq_t **pr = &ioq;
while (req != NULL) {
if (req->fd == dest->fd) {
if (now && !req->now) {
// increase priority
*pr = req->next;
req->next = ioq;
ioq = req;
req->now = 1;
}
pthread_mutex_unlock(&q_mut);
return;
}
pr = &req->next;
req = req->next;
}

if (ioq_freelist != NULL) {
req = ioq_freelist;
ioq_freelist = ioq_freelist->next;
}
else {
req = (sendreq_t*)malloc(sizeof(sendreq_t));
}
req->fd = dest->fd;
size_t sz;
req->n = buf->size;
req->buf = ios_takebuf(buf, &sz);
req->buf = buf;
req->now = now;
req->next = NULL;
pthread_mutex_lock(&q_mut);
buf->userdata = (int64_t)(clock_now()*1e6);
if (ioq == NULL) {
ioq = req;
}
else {
sendreq_t *r = ioq;
while (r->next != NULL) {
r = r->next;
if (now) {
req->next = ioq;
ioq = req;
}
else {
sendreq_t *r = ioq;
while (r->next != NULL) {
r = r->next;
}
r->next = req;
}
r->next = req;
}
pthread_mutex_unlock(&q_mut);
pthread_cond_signal(&wake_cond);
Expand All @@ -293,6 +359,7 @@ void jl_start_io_thread()
{
pthread_mutex_init(&q_mut, NULL);
pthread_mutex_init(&wake_mut, NULL);
pthread_mutex_init(&buf_mut, NULL);
pthread_cond_init(&wake_cond, NULL);
pthread_create(&io_thread, NULL, run_io_thr, NULL);
}

0 comments on commit f9637a2

Please sign in to comment.