Skip to content

Commit

Permalink
Resolved conflict with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
basilkohler committed May 5, 2014
2 parents 381436e + 2c8c601 commit f7db708
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 101 deletions.
84 changes: 41 additions & 43 deletions ccnl-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ ccnl_nfn_continue_computation(struct ccnl_relay_s *ccnl, int configid);
static struct ccnl_interest_s*
ccnl_interest_remove(struct ccnl_relay_s *ccnl, struct ccnl_interest_s *i);

static struct ccnl_interest_s*
ccnl_interest_remove_continue_computations(struct ccnl_relay_s *ccnl, struct ccnl_interest_s *i);

// ----------------------------------------------------------------------
// datastructure support functions

Expand Down Expand Up @@ -368,7 +371,7 @@ ccnl_face_remove(struct ccnl_relay_s *ccnl, struct ccnl_face_s *f)
if (pit->pending)
pit = pit->next;
else
pit = ccnl_interest_remove(ccnl, pit);
pit = ccnl_interest_remove_continue_computations(ccnl, pit);
}
for (ppfwd = &ccnl->fib; *ppfwd;) {
if ((*ppfwd)->face == f) {
Expand Down Expand Up @@ -659,15 +662,15 @@ ccnl_interest_propagate(struct ccnl_relay_s *ccnl, struct ccnl_interest_s *i)
// suppress forwarding to origin of interest, except wireless
if (!i->from || fwd->face != i->from ||
(i->from->flags & CCNL_FACE_FLAGS_REFLECT)){
ccnl_face_enqueue(ccnl, fwd->face, buf_dup(i->pkt));
#ifdef CCNL_NFN_MONITOR
char monitorpacket[CCNL_MAX_PACKET_SIZE];
int l = create_packet_log(inet_ntoa(fwd->face->peer.ip4.sin_addr),
ntohs(fwd->face->peer.ip4.sin_port),
i->prefix, NULL, 0, monitorpacket);
sendtomonitor(ccnl, monitorpacket, l);
#endif
}
ccnl_face_enqueue(ccnl, fwd->face, buf_dup(i->pkt));
}

}
return;
Expand Down Expand Up @@ -696,6 +699,20 @@ ccnl_interest_remove(struct ccnl_relay_s *ccnl, struct ccnl_interest_s *i)
return i2;
}

struct ccnl_interest_s*
ccnl_interest_remove_continue_computations(struct ccnl_relay_s *ccnl,
struct ccnl_interest_s *i){
DEBUGMSG(99, "ccnl_interest_remove_continue_computations()\n");
int faceid = i->from->faceid;
ccnl_interest_remove(ccnl, i);
#ifdef CCNL_NFN
if(faceid < 0){
ccnl_nfn_continue_computation(ccnl, -i->from->faceid);
}
#endif

}

// ----------------------------------------------------------------------
// handling of content messages

Expand Down Expand Up @@ -818,14 +835,14 @@ ccnl_content_serve_pending(struct ccnl_relay_s *ccnl, struct ccnl_content_s *c)
ccnl_print_stats(ccnl, STAT_SND_C); //log sent c

DEBUGMSG("--- Serve to: %d", pi->face->faceid);
ccnl_face_enqueue(ccnl, pi->face, buf_dup(c->pkt));
#ifdef CCNL_NFN_MONITOR
char monitorpacket[CCNL_MAX_PACKET_SIZE];
int l = create_packet_log(inet_ntoa(pi->face->peer.ip4.sin_addr),
ntohs(pi->face->peer.ip4.sin_port),
c->name, c->content, c->contentlen, monitorpacket);
sendtomonitor(ccnl, monitorpacket, l);
#endif
ccnl_face_enqueue(ccnl, pi->face, buf_dup(c->pkt));
} else // upcall to deliver content to local client
ccnl_app_RX(ccnl, c);
c->served_cnt++;
Expand Down Expand Up @@ -856,8 +873,9 @@ ccnl_do_ageing(void *ptr, void *dummy)
while (i) { // CONFORM: "Entries in the PIT MUST timeout rather
// than being held indefinitely."
if ((i->last_used + CCNL_INTEREST_TIMEOUT) <= t ||
i->retries > CCNL_MAX_INTEREST_RETRANSMIT)
i = ccnl_interest_remove(relay, i);
i->retries > CCNL_MAX_INTEREST_RETRANSMIT){
i = ccnl_interest_remove_continue_computations(relay, i);
}
else {
// CONFORM: "A node MUST retransmit Interest Messages
// periodically for pending PIT entries."
Expand Down Expand Up @@ -939,16 +957,17 @@ ccnl_core_RX_i_or_c(struct ccnl_relay_s *ccnl, struct ccnl_face_s *from,
DEBUGMSG(7, " matching content for interest, content %p\n", (void *) c);
ccnl_print_stats(ccnl, STAT_SND_C); //log sent_c
if (from->ifndx >= 0){
ccnl_face_enqueue(ccnl, from, buf_dup(c->pkt));

#ifdef CCNL_NFN_MONITOR
char monitorpacket[CCNL_MAX_PACKET_SIZE];
int l = create_packet_log(inet_ntoa(from->peer.ip4.sin_addr),
ntohs(from->peer.ip4.sin_port),
ntohs(from->peer.ip4.sin_port),
c->name, c->content, c->contentlen, monitorpacket);
sendtomonitor(ccnl, monitorpacket, l);
#endif
#endif
ccnl_face_enqueue(ccnl, from, buf_dup(c->pkt));
}

else
ccnl_app_RX(ccnl, c);
goto Skip;
Expand All @@ -964,41 +983,19 @@ ccnl_core_RX_i_or_c(struct ccnl_relay_s *ccnl, struct ccnl_face_s *from,
if (!i) { // this is a new/unknown I request: create and propagate
//NFN PLUGIN CALL
#ifdef CCNL_NFN
//if routable content local available, allow computation
int local_content = 0;
if(!memcmp(p->comp[p->compcnt-1], "NFN", 3)){
struct ccnl_prefix_s *prefix_nfn = ccnl_malloc(sizeof(struct ccnl_prefix_s));
prefix_nfn->comp = ccnl_malloc(p->compcnt-1);
prefix_nfn->comp[p->compcnt-2] = NULL;
prefix_nfn->complen = ccnl_malloc(p->compcnt-1);
prefix_nfn->compcnt = p->compcnt-2;

for(int it = 0; it < p->compcnt-2; ++it){
prefix_nfn->comp[it] = strdup(p->comp[it]);
prefix_nfn->complen[it] = p->complen[it];
}

for(struct ccnl_content_s *c = ccnl->contents; c; c=c->next){
if(!ccnl_prefix_cmp(prefix_nfn, 0, p, CMP_EXACT)){
local_content = 1;
break;
}
}
}
DEBUGMSG(99, "Local computation: %d", local_content);
if((numOfRunningComputations < NFN_MAX_RUNNING_COMPUTATIONS || local_content) //full, do not compute but propagate
if((numOfRunningComputations < NFN_MAX_RUNNING_COMPUTATIONS) //full, do not compute but propagate
&& !memcmp(p->comp[p->compcnt-1], "NFN", 3)){
struct ccnl_buf_s *buf2 = buf;
struct ccnl_prefix_s *p2 = p;

i = ccnl_interest_new(ccnl, from, &buf, &p, minsfx, maxsfx, &ppkd);
i->propagate = 0; //do not forward interests for running computations
ccnl_interest_append_pending(i, from);
if(!i->propagate)ccnl_nfn(ccnl, buf2, p2, from, NULL);
goto Done;
}
#endif /*CCNL_NFN*/

i = ccnl_interest_new(ccnl, from, &buf, &p, minsfx, maxsfx, &ppkd);
if (i) { // CONFORM: Step 3 (and 4)
DEBUGMSG(7, " created new interest entry %p\n", (void *) i);
Expand Down Expand Up @@ -1051,20 +1048,21 @@ ccnl_core_RX_i_or_c(struct ccnl_relay_s *ccnl, struct ccnl_face_s *from,
if(!memcmp(c->name->comp[c->name->compcnt-1], "NFN", 3)){
struct ccnl_interest_s *i_it = NULL;
int found = 0;
for(i_it = ccnl->pit; i_it; i_it = i_it->next){
int md = i_it->prefix->compcnt - c->name->compcnt == 1 ? compute_ccnx_digest(c->pkt) : NULL;
for(i_it = ccnl->pit; i_it;/* i_it = i_it->next*/){
//Check if prefix match and it is a nfn request
int cmp = ccnl_prefix_cmp(c->name, md, i_it->prefix, CMP_MATCH);
DEBUGMSG(99, "CMP: %d, faceid: %d \n", cmp, i_it->from->faceid);
if( ccnl_prefix_cmp(c->name, md, i_it->prefix, CMP_MATCH)
int cmp = ccnl_prefix_cmp(c->name, NULL, i_it->prefix, CMP_EXACT);
DEBUGMSG(99, "CMP: %d (match if zero), faceid: %d \n", cmp, i_it->from->faceid);
if( !ccnl_prefix_cmp(c->name, NULL, i_it->prefix, CMP_EXACT)
&& i_it->from->faceid < 0){
ccnl_content_add2cache(ccnl, c);
int configid = -i_it->from->faceid;
i_it = ccnl_interest_remove(ccnl, i_it);
i_it = ccnl_interest_remove_continue_computations(ccnl, i_it);
DEBUGMSG(49, "Continue configuration for configid: %d\n", configid);
ccnl_nfn_continue_computation(ccnl, configid);
++found;
goto Done;
//goto Done;
}
else{
i_it = i_it->next;
}
}
if(found) goto Done;
Expand Down
21 changes: 16 additions & 5 deletions ccnl-ext-nfn.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,17 @@ ccnl_nfn_remove_thunk_from_prefix(struct ccnl_prefix_s *prefix){
void
ccnl_nfn_continue_computation(struct ccnl_relay_s *ccnl, int configid){
DEBUGMSG(49, "ccnl_nfn_continue_computation()\n");
ccnl_nfn(ccnl, NULL, NULL, NULL, configuration_list[configid]);
struct configuration_s *config = find_configuration(configuration_list, -configid);

if(config->thunk && CCNL_NOW() > config->endtime){
DEBUGMSG(49, "NFN: Exit computation: timeout when resolving thunk\n");
DBL_LINKED_LIST_REMOVE(configuration_list, config);
//Reply error!
//config->thunk = 0;
return;
}

ccnl_nfn(ccnl, NULL, NULL, NULL, config);
}

int
Expand All @@ -63,7 +73,7 @@ ccnl_nfn_thunk_already_computing(struct ccnl_prefix_s *prefix)
int i = 0;
for(i = 0; i < -configid; ++i){
struct ccnl_prefix_s *copy;
struct configuration_s *config = configuration_list[i];
struct configuration_s *config = find_configuration(configuration_list, -i);
if(!config) continue;
ccnl_nfn_copy_prefix(config->prefix ,&copy);
ccnl_nfn_remove_thunk_from_prefix(copy);
Expand Down Expand Up @@ -123,17 +133,18 @@ ccnl_nfn(struct ccnl_relay_s *ccnl, struct ccnl_buf_s *orig,

//stores result if computed
if(res){
--numOfRunningComputations;

DEBUGMSG(2,"Computation finshed: %s, running computations: %d\n", res, numOfRunningComputations);
if(config && config->fox_state->thunk_request){
ccnl_nfn_remove_thunk_from_prefix(config->prefix);
}
struct ccnl_content_s *c = create_content_object(ccnl, config->prefix, res, strlen(res));

c->flags = CCNL_CONTENT_FLAGS_STATIC;
if(!config->fox_state->thunk_request)ccnl_content_serve_pending(ccnl,c);
ccnl_content_serve_pending(ccnl,c);
ccnl_content_add2cache(ccnl, c);

--numOfRunningComputations;
DBL_LINKED_LIST_REMOVE(configuration_list, config);
}

//TODO: check if really necessary
Expand Down
66 changes: 42 additions & 24 deletions krivine-common.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,42 @@



struct configuration_s{
int configid;
char *prog;
struct stack_s *result_stack;
struct stack_s *argument_stack;
struct environment_s *env;
struct environment_s *global_dict;
struct fox_machine_state_s *fox_state;
struct ccnl_prefix_s *prefix;
};

struct thunk_s{
struct thunk_s *next, *prev;
char thunkid[10];
struct ccnl_prefix_s *prefix;
};

struct thunk_s *thunk_list;
int thunkid = 0;

struct configuration_s *configuration_list[NFN_MAX_RUNNING_COMPUTATIONS*100+1];
int configid = -1;
struct fox_machine_state_s *
new_machine_state(int thunk_request, int num_of_required_thunks){
struct fox_machine_state_s *ret = malloc(sizeof(struct fox_machine_state_s));
ret->thunk_request = thunk_request;
ret->num_of_required_thunks = num_of_required_thunks;

return ret;
}

struct configuration_s *
new_config(char *prog, struct environment_s *global_dict, int thunk_request,
int num_of_required_thunks, struct ccnl_prefix_s *prefix, int configid){
struct configuration_s *ret = malloc(sizeof(struct configuration_s));
ret->prog = prog;
ret->result_stack = NULL;
ret->argument_stack = NULL;
ret->env = NULL;
ret->global_dict = global_dict;
ret->fox_state = new_machine_state(thunk_request, num_of_required_thunks);
ret->configid = configid;
ret->prefix = prefix;
ret->thunk = 0;
ret->thunk_time = NFN_DEFAULT_WAITING_TIME;
return ret;
}

struct configuration_s*
find_configuration(struct configuration_s *config_list, int configid){
struct configuration_s *config;
for(config = config_list; config; config = config->next){
if(config->configid == configid){
return config;
}
}
return NULL;
}

int
hex2int(char c)
Expand Down Expand Up @@ -499,9 +512,14 @@ ccnl_nfn_remove_thunk(char* thunkid){
}

int
ccnl_nfn_reply_thunk(struct ccnl_relay_s *ccnl, struct ccnl_prefix *original_prefix){
ccnl_nfn_reply_thunk(struct ccnl_relay_s *ccnl, struct configuration_s *config){
DEBUGMSG(2, "ccnl_nfn_reply_thunk()\n");
struct ccnl_content_s *c = create_content_object(ccnl, original_prefix, "THUNK", strlen("THUNK"));
struct ccnl_prefix_s *original_prefix = config->prefix;
char reply_content[100];
memset(reply_content, 0, 100);
int thunk_time = (int)config->thunk_time;
sprintf(reply_content, "THUNK%d", thunk_time);
struct ccnl_content_s *c = create_content_object(ccnl, original_prefix, reply_content, strlen(reply_content));
ccnl_content_add2cache(ccnl, c);
ccnl_content_serve_pending(ccnl,c);
return 0;
Expand Down
33 changes: 33 additions & 0 deletions krivine-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


#define NFN_MAX_RUNNING_COMPUTATIONS 10
#define NFN_DEFAULT_WAITING_TIME 10
int numOfRunningComputations = 0;

struct fox_machine_state_s{
Expand All @@ -23,4 +24,36 @@ struct fox_machine_state_s{
char *thunk;
};

struct configuration_s{
int configid;
char *prog;
struct stack_s *result_stack;
struct stack_s *argument_stack;
struct environment_s *env;
struct environment_s *global_dict;
struct fox_machine_state_s *fox_state;
struct ccnl_prefix_s *prefix;

struct configuration_s *next;
struct configuration_s *prev;

double starttime;
double endtime;
int thunk;
double thunk_time;
};

struct thunk_s{
struct thunk_s *next, *prev;
char thunkid[10];
struct ccnl_prefix_s *prefix;
};

struct thunk_s *thunk_list;
int thunkid = 0;

struct configuration_s *configuration_list;

int configid = -1;

#endif //KRIVINE_COMMON_H
Loading

0 comments on commit f7db708

Please sign in to comment.