Skip to content

Commit

Permalink
reusage node prog code, pass remove debug printfs
Browse files Browse the repository at this point in the history
  • Loading branch information
dubey committed May 9, 2016
1 parent 378c1ce commit e147414
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 105 deletions.
3 changes: 0 additions & 3 deletions common/transaction.h
Expand Up @@ -19,7 +19,6 @@
#include <unordered_set>

#include "common/vclock.h"
#include "node_prog/node_prog_type.h"

namespace transaction
{
Expand All @@ -43,8 +42,6 @@ namespace transaction
std::unique_ptr<std::string> key, value;
};

using done_req_t = std::vector<std::pair<uint64_t, node_prog::prog_type>>;

struct nop_data
{
vc::vclock_t max_done_clk;
Expand Down
10 changes: 2 additions & 8 deletions coordinator/timestamper.cc
Expand Up @@ -38,7 +38,6 @@ DECLARE_CONFIG_CONSTANTS;

using coordinator::current_prog;
using coordinator::blocked_prog;
using transaction::done_req_t;
using node_prog::Node_Parameters_Base;
static coordinator::timestamper *vts;
static uint64_t vt_id;
Expand Down Expand Up @@ -224,7 +223,6 @@ nop_function()
}

if (kronos_call) {
WDEBUG << "Kronos call" << std::endl;
int64_t id;
chronos_returncode call_code, wait_code;

Expand All @@ -237,6 +235,7 @@ nop_function()
<< ", wait code " << chronos_returncode_to_string(wait_code) << std::endl;
}

/*
// for debugging
ssize_t ret;
chronos_stats stats;
Expand All @@ -252,6 +251,7 @@ nop_function()
} else {
WDEBUG << "Kronos get_stats: call code " << call_code << ", wait code " << wait_code << std::endl;
}
*/
}
}
}
Expand Down Expand Up @@ -322,7 +322,6 @@ unpack_and_forward_node_prog(std::unique_ptr<message::message> msg,
uint64_t clientID,
coordinator::hyper_stub *hstub)
{
WDEBUG << "got node prog" << std::endl;
vts->restore_mtx.lock();
if (vts->restore_status > 0) {
vts->prog_queue->emplace_back(blocked_prog(clientID, std::move(msg)));
Expand All @@ -334,7 +333,6 @@ unpack_and_forward_node_prog(std::unique_ptr<message::message> msg,

std::string prog_type;
msg->unpack_partial_message(message::CLIENT_NODE_PROG_REQ, prog_type);
WDEBUG << "node prog type=" << prog_type << std::endl;

void *prog_handle = nullptr;
vts->m_dyn_prog_mtx.lock();
Expand Down Expand Up @@ -734,10 +732,6 @@ server_loop(void *args)
case message::NODE_PROG_RETURN: {
uint64_t req_id, cp_int, client;
msg->unpack_partial_message(message::NODE_PROG_RETURN, prog_type, req_id, cp_int); // don't unpack rest
WDEBUG << "prog_type=" << prog_type
<< " req_id=" << req_id
<< " cp_int=" << cp_int
<< std::endl;
current_prog *cp = (current_prog*)cp_int;
client = cp->client;

Expand Down
1 change: 0 additions & 1 deletion db/async_nodeprog_state.h
Expand Up @@ -13,7 +13,6 @@
#ifndef weaver_db_async_nodeprog_state_h_
#define weaver_db_async_nodeprog_state_h_

#include "node_prog/node_prog_type.h"
#include "common/vclock.h"
#include "db/node.h"

Expand Down
7 changes: 0 additions & 7 deletions db/shard.cc
Expand Up @@ -37,7 +37,6 @@
#include "db/remote_node.h"
#include "db/node_prog_running_state.h"
#include "node_prog/node.h"
#include "node_prog/node_prog_type.h"
#include "node_prog/base_classes.h"

#define XML_CHUNK_SZ 10000
Expand Down Expand Up @@ -1949,7 +1948,6 @@ inline void node_prog_loop(uint64_t tid,
// call node program
std::pair<node_prog::search_type, std::vector<std::pair<db::remote_node, np_param_ptr_t>>> next_node_params;
next_node_params = prog_ptr(*node, this_node, params, node_state_getter);
WDEBUG << "done node program at node=" << node_handle << std::endl;

node->base.view_time = nullptr;
node->base.time_oracle = nullptr;
Expand All @@ -1970,11 +1968,6 @@ inline void node_prog_loop(uint64_t tid,
done_request = true;
// signal to send back to vector timestamper that issued request
std::unique_ptr<message::message> m(new message::message());
WDEBUG << "prog_handle=" << prog_handle
<< " np.m_type=" << np.m_type
<< " np.req_id=" << np.req_id
<< " np.vt_prog_ptr=" << np.vt_prog_ptr
<< std::endl;
m->prepare_message(message::NODE_PROG_RETURN, prog_handle, np.m_type, np.req_id, np.vt_prog_ptr, res.second);
S->comm.send(np.vt_id, m->buf);
break; // can only send one message back
Expand Down
104 changes: 104 additions & 0 deletions node_prog/boilerplate.h
@@ -0,0 +1,104 @@
/*
* ===============================================================
* Description: Boilerplate declarations and definitions for
* node programs
*
* Author: Ayush Dubey, dubey@cs.cornell.edu
*
* Copyright (C) 2015, Cornell University, see the LICENSE file
* for licensing agreement
* ===============================================================
*/

#ifndef weaver_node_prog_boilerplate_h_
#define weaver_node_prog_boilerplate_h_

#include <memory>
#include <vector>
#include <deque>

#include "common/message.h"
#include "node_prog/base_classes.h"
#include "node_prog/node.h"
#include "db/remote_node.h"

#define PROG_FUNC_DECLARE \
std::shared_ptr<Node_Parameters_Base> param_ctor(); \
std::shared_ptr<Node_State_Base> state_ctor(); \
\
uint64_t param_size(const Node_Parameters_Base&, void*); \
void param_pack(const Node_Parameters_Base&, e::packer&, void*); \
void param_unpack(Node_Parameters_Base&, e::unpacker&, void*); \
\
uint64_t state_size(const Node_State_Base&, void*); \
void state_pack(const Node_State_Base&, e::packer&, void*); \
void state_unpack(Node_State_Base&, e::unpacker&, void*); \
\
std::pair<search_type, std::vector<std::pair<db::remote_node, std::shared_ptr<Node_Parameters_Base>>>> \
node_program(node &n, \
db::remote_node &rn, \
std::shared_ptr<Node_Parameters_Base> param_ptr, \
std::function<Node_State_Base&()> state_getter);

#define CAST_ARG_REF(type, arg) \
type &tp = dynamic_cast<type&>(p);

#define PROG_FUNC_DEFINE(PREFIX) \
std::shared_ptr<Node_Parameters_Base> \
param_ctor() \
{ \
auto new_params = std::make_shared<PREFIX##_params>(); \
return std::dynamic_pointer_cast<Node_Parameters_Base>(new_params); \
} \
\
std::shared_ptr<Node_State_Base> \
state_ctor() \
{ \
auto new_state = std::make_shared<PREFIX##_state>(); \
return std::dynamic_pointer_cast<Node_State_Base>(new_state); \
} \
\
uint64_t \
param_size(const Node_Parameters_Base &p, void *aux_args) \
{ \
CAST_ARG_REF(const PREFIX##_params, p); \
return tp.size(aux_args); \
} \
\
void \
param_pack(const Node_Parameters_Base &p, e::packer &packer, void *aux_args) \
{ \
CAST_ARG_REF(const PREFIX##_params, p); \
tp.pack(packer, aux_args); \
} \
\
void \
param_unpack(Node_Parameters_Base &p, e::unpacker &unpacker, void *aux_args) \
{ \
CAST_ARG_REF(PREFIX##_params, p); \
tp.unpack(unpacker, aux_args); \
} \
\
uint64_t \
state_size(const Node_State_Base &p, void *aux_args) \
{ \
CAST_ARG_REF(const PREFIX##_state, p); \
return tp.size(aux_args); \
} \
\
void \
state_pack(const Node_State_Base &p, e::packer &packer, void *aux_args) \
{ \
CAST_ARG_REF(const PREFIX##_state, p); \
tp.pack(packer, aux_args); \
} \
\
void \
state_unpack(Node_State_Base &p, e::unpacker &unpacker, void *aux_args) \
{ \
CAST_ARG_REF(PREFIX##_state, p); \
tp.unpack(unpacker, aux_args); \
}


#endif
65 changes: 2 additions & 63 deletions node_prog/traverse_with_props.cc
Expand Up @@ -78,7 +78,7 @@ traverse_props_params :: unpack(e::unpacker &unpacker, void *aux_args)
traverse_props_state :: traverse_props_state()
: visited(false)
, out_count(0)
{ WDEBUG << "traverse props state ctor" << std::endl; }
{ }

uint64_t
traverse_props_state :: size(void *aux_args) const
Expand Down Expand Up @@ -124,66 +124,7 @@ check_aliases(const node_prog::node &n, const std::vector<std::string> &aliases)

extern "C" {

std::shared_ptr<Node_Parameters_Base>
param_ctor()
{
auto new_params = std::make_shared<traverse_props_params>();
return std::dynamic_pointer_cast<Node_Parameters_Base>(new_params);
}

std::shared_ptr<Node_State_Base>
state_ctor()
{
auto new_state = std::make_shared<traverse_props_state>();
return std::dynamic_pointer_cast<Node_State_Base>(new_state);
}

#define CAST_ARG_REF(type, arg) \
type &tp = dynamic_cast<type&>(p);

uint64_t
param_size(const Node_Parameters_Base &p, void *aux_args)
{
CAST_ARG_REF(const traverse_props_params, p);
return tp.size(aux_args);
}

void
param_pack(const Node_Parameters_Base &p, e::packer &packer, void *aux_args)
{
CAST_ARG_REF(const traverse_props_params, p);
tp.pack(packer, aux_args);
}

void
param_unpack(Node_Parameters_Base &p, e::unpacker &unpacker, void *aux_args)
{
CAST_ARG_REF(traverse_props_params, p);
tp.unpack(unpacker, aux_args);
}

uint64_t
state_size(const Node_State_Base &p, void *aux_args)
{
CAST_ARG_REF(const traverse_props_state, p);
return tp.size(aux_args);
}

void
state_pack(const Node_State_Base &p, e::packer &packer, void *aux_args)
{
CAST_ARG_REF(const Node_State_Base, p);
tp.pack(packer, aux_args);
}

void
state_unpack(Node_State_Base &p, e::unpacker &unpacker, void *aux_args)
{
CAST_ARG_REF(Node_State_Base, p);
tp.unpack(unpacker, aux_args);
}

#undef CAST_ARG_REF
PROG_FUNC_DEFINE(traverse_props);

std::pair<search_type, std::vector<std::pair<db::remote_node, std::shared_ptr<Node_Parameters_Base>>>>
node_prog :: node_program(node &n,
Expand Down Expand Up @@ -251,7 +192,6 @@ node_prog :: node_program(node &n,
// return now
params.returning = true;
next.emplace_back(std::make_pair(state.prev_node, std::make_shared<traverse_props_params>(params)));
WDEBUG << "prev node=" << params.prev_node.handle << " @ " << params.prev_node.loc << std::endl;
}
}

Expand All @@ -271,7 +211,6 @@ node_prog :: node_program(node &n,
params.return_nodes = std::move(state.return_nodes);
params.return_edges = std::move(state.return_edges);
next.emplace_back(std::make_pair(state.prev_node, std::make_shared<traverse_props_params>(params)));
WDEBUG << "prev node=" << params.prev_node.handle << " @ " << params.prev_node.loc << std::endl;
}
}

Expand Down
22 changes: 2 additions & 20 deletions node_prog/traverse_with_props.h
Expand Up @@ -21,10 +21,7 @@
#include <vector>
#include <deque>

#include "db/remote_node.h"
#include "node_prog/node_prog_type.h"
#include "node_prog/node.h"
#include "node_prog/base_classes.h"
#include "node_prog/boilerplate.h"
#include "node_prog/cache_response.h"

namespace node_prog
Expand Down Expand Up @@ -68,22 +65,7 @@ namespace node_prog
};

extern "C" {
std::shared_ptr<Node_Parameters_Base> param_ctor();
std::shared_ptr<Node_State_Base> state_ctor();

uint64_t param_size(const Node_Parameters_Base&, void*);
void param_pack(const Node_Parameters_Base&, e::packer&, void*);
void param_unpack(Node_Parameters_Base&, e::unpacker&, void*);

uint64_t state_size(const Node_State_Base&, void*);
void state_pack(const Node_State_Base&, e::packer&, void*);
void state_unpack(Node_State_Base&, e::unpacker&, void*);

std::pair<search_type, std::vector<std::pair<db::remote_node, std::shared_ptr<Node_Parameters_Base>>>>
node_program(node &n,
db::remote_node &rn,
std::shared_ptr<Node_Parameters_Base> param_ptr,
std::function<Node_State_Base&()> state_getter);
PROG_FUNC_DECLARE;
}
}

Expand Down
6 changes: 3 additions & 3 deletions startup_scripts/start_weaver.sh
Expand Up @@ -14,15 +14,15 @@ fi

if [ -z "$WEAVER_BUILDDIR" ]
then
if [ -e /home/dubey/installs/lib/libweaverservermanager.so ]
if [ -e /usr/local/lib/libweaverservermanager.so ]
then
weaver_libdir=/home/dubey/installs/lib
weaver_libdir=/usr/local/lib
else
if [ -e /usr/lib/libweaverservermanager.so ]
then
weaver_libdir=/usr/lib
else
echo 'Did not find libweaverservermanager.so at /usr/lib and /home/dubey/installs/lib. Also, WEAVER_BUILDDIR not set. Exiting now.'
echo 'Did not find libweaverservermanager.so at /usr/lib and /usr/local/lib. Also, WEAVER_BUILDDIR not set. Exiting now.'
exit 1
fi
fi
Expand Down

0 comments on commit e147414

Please sign in to comment.