Skip to content

Commit

Permalink
Introduce threads for local computation
Browse files Browse the repository at this point in the history
  • Loading branch information
blacksheeep committed Apr 5, 2014
1 parent 87f9a80 commit d3bba05
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -4,7 +4,7 @@ CC=gcc
MYCFLAGS=-O3 -Wall -pedantic -std=c99 -g \
-D_XOPEN_SOURCE=500 -D_XOPEN_SOURCE_EXTENDED -Dlinux -O0
#MYCFLAGS= -Wall -g -O0
EXTLIBS= -lcrypto
EXTLIBS= -lcrypto -lpthread
EXTMAKE=
EXTMAKECLEAN=

Expand Down
11 changes: 10 additions & 1 deletion ccnl-core.c
Expand Up @@ -24,6 +24,7 @@
#include "ccnl-core.h"



#define CCNL_VERSION "2013-07-27"


Expand Down Expand Up @@ -664,7 +665,7 @@ ccnl_interest_remove(struct ccnl_relay_s *ccnl, struct ccnl_interest_s *i)
}
i2 = i->next;
DBL_LINKED_LIST_REMOVE(ccnl->pit, i);
free_prefix(i->prefix);
//free_prefix(i->prefix); //TODO: //FIXME: IMPORTANT: memory leak
free_3ptr_list(i->ppkd, i->pkt, i);
return i2;
}
Expand Down Expand Up @@ -783,6 +784,14 @@ ccnl_content_serve_pending(struct ccnl_relay_s *ccnl, struct ccnl_content_s *c)
DEBUGMSG(6, " forwarding content <%s>\n",
ccnl_prefix_to_path(c->name));
ccnl_print_stats(ccnl, STAT_SND_C); //log sent c
#ifdef CCNL_NFN
if(i->from->faceid < 0){
int threadid = -i->from->faceid;
DEBUGMSG(49, "Send signal for threadid: %d", threadid);
ccnl_nfn_send_signal(threadid);
}
else
#endif
ccnl_face_enqueue(ccnl, pi->face, buf_dup(c->pkt));
} else // upcall to deliver content to local client
ccnl_app_RX(ccnl, c);
Expand Down
48 changes: 43 additions & 5 deletions ccnl-ext-nfn.c
Expand Up @@ -22,6 +22,7 @@

#include "ccnl-core.h"


#include "krivine.c"
#include "ccnl-includes.h"

Expand All @@ -38,10 +39,19 @@ ccnl_nfn_count_required_thunks(char *str)
return num;
}

int
ccnl_nfn(struct ccnl_relay_s *ccnl, struct ccnl_buf_s *orig,
struct ccnl_prefix_s *prefix, struct ccnl_face_s *from)


void *
ccnl_nfn_thread(void *arg)
{
struct thread_parameter_s *t = ((struct thread_parameter_s*)arg);

struct ccnl_relay_s *ccnl = t->ccnl;
struct ccnl_buf_s *orig = t->orig;
struct ccnl_prefix_s *prefix = t->prefix;
struct ccnl_face_s *from = t->from;
struct thread_s *thread = t->thread;

int thunk_request = 0;
int num_of_thunks = 0;
struct ccnl_prefix_s *original_prefix;
Expand Down Expand Up @@ -78,7 +88,8 @@ ccnl_nfn(struct ccnl_relay_s *ccnl, struct ccnl_buf_s *orig,
if(thunk_request){
num_of_thunks = ccnl_nfn_count_required_thunks(str);
}
char *res = Krivine_reduction(ccnl, str, thunk_request, &num_of_thunks, original_prefix);
char *res = Krivine_reduction(ccnl, str, thunk_request, &num_of_thunks,
original_prefix, thread);
//stores result if computed

if(res){
Expand All @@ -100,6 +111,33 @@ ccnl_nfn(struct ccnl_relay_s *ccnl, struct ccnl_buf_s *orig,
{
ccnl_nfn_delete_prefix(prefix);
}*/

pthread_exit ((void *) 0);
return 0;
}

void
ccnl_nfn_send_signal(int threadid){
struct thread_s *thread = threads[threadid];
pthread_cond_broadcast(&thread->cond);
}


int
ccnl_nfn(struct ccnl_relay_s *ccnl, struct ccnl_buf_s *orig,
struct ccnl_prefix_s *prefix, struct ccnl_face_s *from)
{
struct thread_s *thread = new_thread();
struct thread_parameter_s *arg = malloc(sizeof(struct thread_parameter_s *));
char *h = malloc(10);
arg->ccnl = ccnl;
arg->orig = orig;
arg->prefix = prefix;
arg->from = from;
arg->thread = thread;

int threadpos = -threadid;
threads[threadpos] = thread;
--threadid;

pthread_create(&thread->thread, NULL, ccnl_nfn_thread, (void *)arg);
}
39 changes: 34 additions & 5 deletions krivine-common.c
Expand Up @@ -10,14 +10,47 @@
#ifndef KRIVINE_COMMON_C
#define KRIVINE_COMMON_C

//#include "krivine-common.h"

#include "ccnl.h"
#include "ccnx.h"
#include "ccnl-core.h"

#include "ccnl-pdu.c"

#include <pthread.h>


#define NFN_FACE -1;

struct thread_s{
int id;
pthread_cond_t cond;
pthread_mutex_t mutex;
pthread_t thread;
};

struct threads_s *threads[1024];
int threadid = -1;

struct thread_parameter_s{
struct ccnl_relay_s *ccnl;
struct ccnl_buf_s *orig;
struct ccnl_prefix_s *prefix;
struct thread_s *thread;
struct ccnl_face_s *from;

};

struct thread_s *
new_thread(){
struct thread_s *t = malloc(sizeof(struct thread_s));
t->id = --threadid;
t->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
t->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
return t;
}

struct thunk_s{
struct thunk_s *next, *prev;
char thunkid[10];
Expand All @@ -27,9 +60,6 @@ struct thunk_s{
struct thunk_s *thunk_list;
int thunkid = 0;

static struct ccnl_interest_s* ccnl_interest_remove(struct ccnl_relay_s *ccnl,
struct ccnl_interest_s *i);

int
hex2int(char c)
{
Expand Down Expand Up @@ -351,7 +381,7 @@ ccnl_nfn_global_content_search(struct ccnl_relay_s *ccnl, struct ccnl_interest_s
}

//FIXME use global search or special face?
struct ccnl_content_s *
void
ccnl_nfn_content_computation(struct ccnl_relay_s *ccnl, struct ccnl_interest_s *i){
DEBUGMSG(2, "ccnl_nfn_content_computation()\n");

Expand Down Expand Up @@ -396,7 +426,6 @@ isLocalAvailable(struct ccnl_relay_s *ccnl, char **namecomp){
if((c = ccnl_nfn_local_content_search(ccnl, interest, CMP_MATCH)) != NULL){ //todo: exact match not only prefix
found = 1;
}
printf("c: %p", c);
//ccnl_interest_remove(ccnl, interest);
return found;
}
Expand Down
43 changes: 30 additions & 13 deletions krivine.c
Expand Up @@ -37,6 +37,7 @@ struct configuration_s{
struct stack_s *argument_stack;
struct environment_s *env;
struct environment_s *global_dict;
struct thread_s *thread;
};

struct closure_s *
Expand Down Expand Up @@ -272,8 +273,8 @@ int iscontent(char *cp){
}
//------------------------------------------------------------
struct ccnl_content_s *
ccnl_nfn_handle_local_computation(struct ccnl_relay_s *ccnl, char **params,
int num_params, char **namecomp, char *out, char *comp, int thunk_request){
ccnl_nfn_handle_local_computation(struct ccnl_relay_s *ccnl, struct configuration_s *config,
char **params, int num_params, char **namecomp, char *out, char *comp, int thunk_request){
int complen = sprintf(comp, "call %d ", num_params);
struct ccnl_interest_s * interest;
struct ccnl_content_s *c;
Expand All @@ -290,13 +291,26 @@ ccnl_nfn_handle_local_computation(struct ccnl_relay_s *ccnl, char **params,
len = mkInterest(namecomp, 0, out);
interest = ccnl_nfn_create_interest_object(ccnl, out, len, namecomp[0]);

if((c = ccnl_nfn_content_computation(ccnl, interest)) != NULL){
/*if((c = ccnl_nfn_content_computation(ccnl, interest)) != NULL){
DEBUGMSG(49, "Content can be computed");
return c;
}else{
DEBUGMSG(49, "Got no thunk, continue with null result just for debugging\n");
return 0;
}*/
interest->from->faceid = config->thread->id;
printf("From face id %d\n", interest->from->faceid);
ccnl_interest_propagate(ccnl, i);
printf("WAITING on Signal; Threadid : %d \n", config->thread->id);
pthread_cond_wait(&config->thread->cond, &config->thread->mutex);
//local search. look if content is now available!
printf("Got signal CONTINUE\n");
if((c = ccnl_nfn_local_content_search(ccnl, interest, CMP_MATCH)) != NULL){
DEBUGMSG(49, "Content locally found\n");
return c;
}
return 0;
}

struct ccnl_content_s *
Expand Down Expand Up @@ -333,10 +347,10 @@ ccnl_nfn_handle_network_search(struct ccnl_relay_s *ccnl, int current_param, cha
}

struct ccnl_content_s *
ccnl_nfn_handle_routable_content(struct ccnl_relay_s *ccnl, int current_param,
char **params, int num_params, int thunk_request){
char *out = malloc(sizeof(char) * CCNL_MAX_PACKET_SIZE);
char *comp = malloc(sizeof(char) * CCNL_MAX_PACKET_SIZE);
ccnl_nfn_handle_routable_content(struct ccnl_relay_s *ccnl, struct configuration_s *config,
int current_param, char **params, int num_params, int thunk_request){
char *out = ccnl_malloc(sizeof(char) * CCNL_MAX_PACKET_SIZE);
char *comp = ccnl_malloc(sizeof(char) * CCNL_MAX_PACKET_SIZE);
char *namecomp[CCNL_MAX_NAME_COMP];
char *param;
struct ccnl_content_s *c;
Expand All @@ -350,12 +364,13 @@ ccnl_nfn_handle_routable_content(struct ccnl_relay_s *ccnl, int current_param,
if(isLocalAvailable(ccnl, namecomp)){
DEBUGMSG(49, "Routable content %s is local availabe --> start computation\n",
params[current_param]);
c = ccnl_nfn_handle_local_computation(ccnl, params, num_params,
c = ccnl_nfn_handle_local_computation(ccnl, config, params, num_params,
namecomp, out, comp, thunk_request);
return c;
}else{
c = ccnl_nfn_handle_network_search(ccnl, current_param, params, num_params,
namecomp, out, comp, param, thunk_request);
//search for content in the network
//FIXME: enable c = ccnl_nfn_handle_network_search(ccnl, current_param, params, num_params,
//namecomp, out, comp, param, thunk_request);
}
return c;
}
Expand Down Expand Up @@ -673,8 +688,8 @@ ZAM_term(struct ccnl_relay_s *ccnl, struct configuration_s *configuration,
//as long as there is a routable parameter: try to find a result
for(i = num_params - 1; i >= 0; --i){
if(iscontent(params[i])){
struct ccnl_content_s *c = ccnl_nfn_handle_routable_content(ccnl, i, params,
num_params, thunk_request);
struct ccnl_content_s *c = ccnl_nfn_handle_routable_content(ccnl,
configuration, i, params, num_params, thunk_request);
if(c){
push_to_stack(&configuration->result_stack, c->content);
goto tail;
Expand Down Expand Up @@ -740,7 +755,8 @@ setup_global_environment(struct environment_s **env){

char *
Krivine_reduction(struct ccnl_relay_s *ccnl, char *expression, int thunk_request,
int *num_of_required_thunks, struct ccnl_prefix_s *original_prefix){
int *num_of_required_thunks, struct ccnl_prefix_s *original_prefix,
struct thread_s *thread){
int steps = 0;
int halt = 0;
int len = strlen("CLOSURE(halt);RESOLVENAME()") + strlen(expression);
Expand All @@ -752,6 +768,7 @@ Krivine_reduction(struct ccnl_relay_s *ccnl, char *expression, int thunk_request
prog = malloc(len*sizeof(char));
sprintf(prog, "CLOSURE(halt);RESOLVENAME(%s)", expression);
struct configuration_s *config = new_config(prog, global_dict);
config->thread = thread;

while (config->prog && !halt && config->prog != 1){
steps++;
Expand Down

0 comments on commit d3bba05

Please sign in to comment.