Permalink
Browse files

Rcalls and lookups are happening successfully in threads, but overlap…

…ping R commands still cause hang
  • Loading branch information...
1 parent 0671f1e commit 6d1e0a71739c09aec2b236375efe34c5aa981fcc @gmbecker committed Jul 5, 2012
Showing with 125 additions and 20 deletions.
  1. +4 −0 inst/NPAPI/BasicPlugin.cpp
  2. +2 −1 inst/NPAPI/Makefile.in
  3. +14 −7 inst/NPAPI/WebR.h
  4. +105 −12 inst/NPAPI/WebRMutex.cpp
@@ -50,8 +50,12 @@ int initR( const char **args, int nargs)
for(int i = 0 ; i < nargs; i++)
rargs[i] = strdup(args[i]);
fprintf(stderr, "Attempting to start embedded R.\n");fflush(stderr);
+R_SignalHandlers = 0;
Rf_initEmbeddedR(nargs, rargs);
fprintf(stderr, "R initialization done.\n"); fflush(stderr);
+
+ R_CStackLimit = (uintptr_t)-1;
+ R_SignalHandlers = 0;
int error=0;
SEXP call;
PROTECT(call = allocVector(LANGSXP, 2));
View
@@ -44,11 +44,12 @@ IS_MAC=@IS_MAC@
CPPFLAGS = -Wall -O0 -DXP_UNIX=1 -DMOZ_X11=1 -fPIC -g -I ../npapi-sdk-headers -I$ /usr/share/R/include
R_LD_FLAGS=-L$(R_HOME)/lib -lR
+LD_FLAGS=-lpthreads
#OBJ_FILES = WebRPlugin.o WebREngine.o NPConversions.o WebRObject.o
OBJ_FILES = BasicPlugin.o WebREngine.o NPConversions.o WebRObject.o WebRFunction.o WebRMutex.o
npbasicplugin : $(OBJ_FILES)
ifeq ($(IS_MAC), 0)
- cc $(CPPFLAGS) $(R_LD_FLAGS) -shared $(OBJ_FILES) -o RBrowserPlugin.so
+ cc $(CPPFLAGS) $(R_LD_FLAGS) $(LD_FLAGS) -shared $(OBJ_FILES) -o RBrowserPlugin.so
else
#we need to make sure to clean every time because some stuff seems to be getting cached when it shouldn't
xcodebuild clean -configuration Debug -sdk macosx10.5 ARCHS="i386"
View
@@ -44,15 +44,20 @@
#include "npfunctions.h"
#include <npruntime.h>
#include <stdio.h>
+#include "pthread.h"
+
+#define CSTACK_DEFNS 1
#ifndef XP_MACOSX
#include <R.h>
#include <Rdefines.h>
#include <Rembedded.h>
+#include <Rinterface.h>
#else
#include <R/R.h>
-#include </Rdefines.h>
+#include <R/Rdefines.h>
#include <R/Rembedded.h>
+#include <R/Rinterface.h>
#endif
@@ -271,14 +276,15 @@ class RCallQueue
public:
SEXP requestRCall(SEXP toEval, SEXP env, int *err, NPP inst);
SEXP requestRLookup(const char *name);
-
- private:
- uint64_t enterQueue();
- void lock();
- void unlock();
void waitInQueue(uint64_t spot);
void advanceQueue(uint64_t spot);
+ uint64_t enterQueue();
+private:
+
+ void lock();
+ void unlock();
+
private:
int isLocked;
uint64_t serving;
@@ -290,6 +296,7 @@ class RCallQueue
SEXP innerGetVar(const char * varName, NPP inst);
SEXP doGetVar(NPIdentifier name, NPP inst);
-
+ void* doRCall(void *in);
+ void* doRLookup(void *in);
#endif // WebR.h
View
@@ -1,5 +1,20 @@
#include "WebR.h"
+typedef struct rCall {
+ RCallQueue *queue;
+ SEXP toeval;
+ SEXP env;
+ int *err;
+ NPP inst;
+ SEXP _ret;
+} rcall_t;
+
+typedef struct rLookup {
+ RCallQueue *queue;
+ const char *name;
+ SEXP _ret;
+} rlookup_t;
+
void RCallQueue::lock()
{
this->isLocked = 1;
@@ -10,13 +25,61 @@ void RCallQueue::unlock()
this->isLocked = 0;
}
-SEXP
-RCallQueue::requestRCall(SEXP toeval, SEXP env, int *err, NPP inst)
+SEXP RCallQueue::requestRCall(SEXP toeval, SEXP env, int *err, NPP inst)
{
- uint64_t spot = 0;
- spot = this->enterQueue();
+ pthread_t thr;
+ rcall_t *argsin = (rcall_t *) malloc(sizeof(rcall_t));
+ argsin->queue = this;
+ argsin->toeval = toeval;
+ argsin->env = env;
+ argsin->err = err;
+ argsin->inst = inst;
+ /*
+void * argsin = malloc(sizeof(SEXP)*2 + sizeof(int*) + sizeof(NPP));
+ int *curpos = (int *) argsin;
+ *(SEXP *)curpos = toeval;
+ curpos = curpos + sizeof(SEXP);
+ *(SEXP *)curpos = env;
+ curpos = curpos + sizeof(SEXP);
+ *(int **) curpos = err;
+ curpos = curpos + sizeof(int*);
+ *(NPP *) curpos = inst;
+ curpos = curpos + sizeof(NPP);
+ *(RCallQueue**) curpos = this;
+ */
+ (SEXP) pthread_create(&thr, NULL, &doRCall, (void*)argsin);
+
+//XXX pretty sure this is going to cause the same problem as not having threads...
+ pthread_join(thr, NULL);
+ SEXP ans = argsin->_ret;
+ free(argsin);
+ return ans;
+}
+
+void* doRCall(void * in)
+{
+ /*
+ long long int curpos = (long long int) in;
+ SEXP toeval = (SEXP) curpos;
+ curpos = curpos + sizeof(SEXP);
+ SEXP env = (SEXP) curpos;
+ curpos = curpos + sizeof(SEXP);
+ int *err = (int *) curpos;
+ curpos = curpos + sizeof(int*);
+ NPP inst = (NPP) curpos;
+ curpos = curpos + sizeof(NPP);
+ RCallQueue *queue = *(RCallQueue **) curpos;
+ */
+ rcall_t *callin = reinterpret_cast<rcall_t*>(in);
- this->waitInQueue(spot);
+ RCallQueue *queue = reinterpret_cast<RCallQueue*>(callin->queue);
+ SEXP toeval = reinterpret_cast<SEXP>(callin->toeval);
+ SEXP env = reinterpret_cast<SEXP>(callin->env);
+ int *err = reinterpret_cast<int*>(callin->err);
+ NPP inst = reinterpret_cast<NPP>(callin->inst);
+ uint64_t spot = 0;
+ spot = queue->enterQueue();
+ queue->waitInQueue(spot);
if(inst)
{
@@ -28,10 +91,10 @@ RCallQueue::requestRCall(SEXP toeval, SEXP env, int *err, NPP inst)
PROTECT(call = toeval);
Rf_PrintValue(call);
- Rf_PrintValue(env);
-
+
PROTECT( ans = R_tryEval(toeval, env, err));
if(err)
+
ans = R_NilValue;
else
{
@@ -40,26 +103,56 @@ RCallQueue::requestRCall(SEXP toeval, SEXP env, int *err, NPP inst)
//Unlock R and reset queue if this was the last request.
}
- this->advanceQueue(spot);
+ queue->advanceQueue(spot);
+ callin->_ret = ans;
UNPROTECT(1);
return ans;
}
SEXP RCallQueue::requestRLookup(const char *name)
+{
+ pthread_t thr;
+
+ rlookup_t *argsin = (rlookup_t *) malloc(sizeof(rlookup_t));
+ /*
+ *(RCallQueue **)argsin = this;
+ *(const char **) (argsin + sizeof(RCallQueue)) = name;
+ */
+ argsin->queue = this;
+ argsin->name = name;
+
+ (SEXP) pthread_create(&thr, NULL, &doRLookup, (void*) argsin);
+
+ //XXX pretty sure this is going to cause the same problem as not having threads...
+ pthread_join(thr, NULL);
+ SEXP ans = argsin->_ret;
+ free(argsin);
+return ans;
+}
+
+
+void* doRLookup(void *in)
{
uint64_t spot = 0;
- spot = this->enterQueue();
+ /* RCallQueue *queue = *(RCallQueue **)in;
+
+ const char *name = (const char*) (in + sizeof(RCallQueue *));*/
+ rlookup_t *argsin = reinterpret_cast<rlookup_t*>(in);
+ RCallQueue *queue = reinterpret_cast<RCallQueue *>(argsin->queue);
+ const char *name = reinterpret_cast<const char *>(argsin->name);
+ spot = queue->enterQueue();
- this->waitInQueue(spot);
+ queue->waitInQueue(spot);
SEXP ans;
int err = 0;
ans = Rf_findVar( Rf_install(name), R_GlobalEnv);
if(TYPEOF(ans) == PROMSXP)
ans = R_tryEval(ans, R_GlobalEnv, &err);
- this->advanceQueue(spot);
-
+ queue->advanceQueue(spot);
+ //R_PreserveObject(ans); //XXX This is never getting released!!!!!
+ argsin->_ret = ans;
return ans;
}

0 comments on commit 6d1e0a7

Please sign in to comment.