Skip to content

Commit

Permalink
add draft parallelization of parse frames creation
Browse files Browse the repository at this point in the history
  • Loading branch information
jmellorcrummey committed Jan 24, 2018
1 parent 77af45e commit 53a61df
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 56 deletions.
106 changes: 60 additions & 46 deletions parseAPI/src/Parser.C
Expand Up @@ -215,7 +215,7 @@ Parser::parse_at(
{
Function *f;
ParseFrame *pf;
vector<ParseFrame *> work;
LockFreeQueue<ParseFrame *> work;

parsing_printf("[%s:%d] entered parse_at([%lx,%lx),%lx)\n",
FILE__,__LINE__,region->low(),region->high(),target);
Expand Down Expand Up @@ -248,7 +248,7 @@ Parser::parse_at(
_parse_data->record_frame(pf);
}

work.push_back(pf);
work.insert(pf);
parse_frames(work,recursive);

// downgrade state if necessary
Expand Down Expand Up @@ -286,7 +286,7 @@ void
Parser::parse_vanilla()
{
ParseFrame *pf;
vector<ParseFrame *> work;
LockFreeQueue<ParseFrame *> work;
vector<Function *>::iterator fit;

parsing_printf("[%s:%d] entered parse_vanilla()\n",FILE__,__LINE__);
Expand All @@ -299,8 +299,9 @@ Parser::parse_vanilla()
_parse_state);

/* Initialize parse frames from hints */
for(fit=hint_funcs.begin();fit!=hint_funcs.end();++fit) {
Function * hf = *fit;
#pragma omp parallel for
for(unsigned int i = 0; i < hint_funcs.size(); i++) {
Function * hf = hint_funcs[i];
ParseFrame::Status test = frame_status(hf->region(),hf->addr());
if(test != ParseFrame::BAD_LOOKUP &&
test != ParseFrame::UNPARSED)
Expand All @@ -313,7 +314,7 @@ Parser::parse_vanilla()
pf = new ParseFrame(hf,_parse_data);
init_frame(*pf);
frames.insert(pf);
work.push_back(pf);
work.insert(pf);
_parse_data->record_frame(pf);
}

Expand All @@ -328,7 +329,7 @@ Parser::parse_edges( vector< ParseWorkElem * > & work_elems )

// build up set of needed parse frames and load them with work elements
set<ParseFrame*> frameset; // for dup checking
vector<ParseFrame*> frames;
LockFreeQueue<ParseFrame*> frames;

for (unsigned idx=0; idx < work_elems.size(); idx++) {

Expand Down Expand Up @@ -412,7 +413,7 @@ Parser::parse_edges( vector< ParseWorkElem * > & work_elems )

if (frameset.end() == frameset.find(frame)) {
frameset.insert(frame);
frames.push_back(frame);
frames.insert(frame);
}
}
ScopeLock<Mutex<true> > L(parse_mutex);
Expand All @@ -432,8 +433,9 @@ Parser::parse_edges( vector< ParseWorkElem * > & work_elems )
}


vector<ParseFrame*> Parser::ProcessOneFrame(ParseFrame* pf, bool recursive) {
vector<ParseFrame*> more_frames;
LockFreeQueueItem<ParseFrame*> *
Parser::ProcessOneFrame(ParseFrame* pf, bool recursive) {
LockFreeQueueItem<ParseFrame*> *frame_list = 0;
if (pf->func && !pf->swap_busy(true)) {
#ifdef ADD_PARSE_FRAME_TIMERS
boost::timer::cpu_timer t;
Expand All @@ -451,7 +453,7 @@ vector<ParseFrame*> Parser::ProcessOneFrame(ParseFrame* pf, bool recursive) {
}
race_detector_fake_lock_release(race_detector_fake_lock(time_histogram));
#endif
more_frames = postProcessFrame(pf, recursive);
frame_list = postProcessFrame(pf, recursive);

// exclusive access to each ParseFrame is mediated by marking a frame busy.
// we clear evidence of our access to the ParseFrame here because a concurrent
Expand All @@ -462,11 +464,11 @@ vector<ParseFrame*> Parser::ProcessOneFrame(ParseFrame* pf, bool recursive) {

pf->swap_busy(false);
}
return more_frames;
return frame_list;
}

vector<ParseFrame *> Parser::postProcessFrame(ParseFrame *pf, bool recursive) {
vector<ParseFrame*> work;
LockFreeQueueItem<ParseFrame *> *Parser::postProcessFrame(ParseFrame *pf, bool recursive) {
LockFreeQueue<ParseFrame*> work;
boost::lock_guard<ParseFrame> g(*pf);
switch(pf->status()) {
case ParseFrame::CALL_BLOCKED: {
Expand All @@ -477,7 +479,7 @@ vector<ParseFrame *> Parser::postProcessFrame(ParseFrame *pf, bool recursive) {
assert(pf->call_target);

parsing_printf(" call target %lx\n",pf->call_target->addr());
work.push_back(pf);
work.insert(pf);

CodeRegion * cr = pf->call_target->region();
Address targ = pf->call_target->addr();
Expand All @@ -494,7 +496,7 @@ vector<ParseFrame *> Parser::postProcessFrame(ParseFrame *pf, bool recursive) {
_parse_data->record_frame(tf);
}
if(likely(recursive))
work.push_back(tf);
work.insert(tf);
else {
assert(0);
// XXX should never get here
Expand Down Expand Up @@ -574,24 +576,19 @@ vector<ParseFrame *> Parser::postProcessFrame(ParseFrame *pf, bool recursive) {
default:
assert(0 && "invalid parse frame status");
}
return work;

return work.steal();
}


static void
InsertFrames
(
vector<ParseFrame *> *frames,
LockFreeQueueItem<ParseFrame *> *frames,
LockFreeQueue<ParseFrame *> *q
)
{
if (frames->size() > 0) {
LockFreeQueue<ParseFrame *> myq;
for (size_t j = 0; j < frames->size(); ++j) {
ParseFrame *f = (*frames)[j];
if (f) myq.insert(f);
}
if (frames) {
LockFreeQueue<ParseFrame *> myq(frames);
q->splice(myq);
}
}
Expand All @@ -606,46 +603,64 @@ Parser::SpawnProcessFrame
std::atomic<int> *in_progress
)
{
vector<ParseFrame*> new_frames = ProcessOneFrame(pf, recursive);
InsertFrames(&new_frames, work_queue);
LockFreeQueueItem<ParseFrame*> *new_frames = ProcessOneFrame(pf, recursive);
InsertFrames(new_frames, work_queue);
in_progress->fetch_add(-1);
}
void print_work_queue(LockFreeQueue<ParseFrame *> *work_queue)
{
LockFreeQueueItem<ParseFrame *> *current = work_queue->peek();

std::cout << "Work Queue" << std::endl;
while (current) {
std::cout << " parse frame " << std::hex << current->value()
<< std::dec << std::endl;
current = current->next();
}
}


void
Parser::ProcessFrames
(
vector<ParseFrame *> *work,
LockFreeQueue<ParseFrame *> *work_queue,
bool recursive
)
{

// print_work_queue(work_queue);
#if 0
LockFreeQueue<ParseFrame *> work_queue;
InsertFrames(work, &work_queue);
#endif
std::atomic<int> in_progress(0);
#if USE_OPENMP
#pragma omp parallel shared(work_queue, in_progress)
{
int nthreads = omp_get_num_threads();
#pragma omp master
{
// print_work_queue(work_queue);
for (;;) {
if (work_queue.peek() == 0) {
if (work_queue->peek() == 0) {
if (in_progress.load() == 0) break;
if (nthreads < 10) {
#pragma omp taskwait
}
} else {
LockFreeQueue<ParseFrame *> private_queue(work_queue.steal());
LockFreeQueue<ParseFrame *> private_queue(work_queue->steal());
for(;;) {
LockFreeQueueItem<ParseFrame *> *first = private_queue.pop();
if (first == 0) break;
ParseFrame *frame = first->value();
delete first;
// delete first;
in_progress.fetch_add(1);
#pragma omp task shared(work_queue, in_progress)
SpawnProcessFrame(frame, recursive, &work_queue, &in_progress);
#pragma omp task shared(work_queue, in_progress) firstprivate(frame)
SpawnProcessFrame(frame, recursive, work_queue, &in_progress);
}
}
}
}
}
#elif USE_CILK
#if 0
Expand Down Expand Up @@ -709,10 +724,12 @@ Parser::ProcessFrames


void
Parser::parse_frames(vector<ParseFrame *> & work, bool recursive)
Parser::parse_frames(LockFreeQueue<ParseFrame *> &work, bool recursive)
{
ProcessFrames(&work, recursive);
#if 0
work.clear();
#endif

bool done = false, cycle = false;
{
Expand All @@ -739,7 +756,7 @@ Parser::parse_frames(vector<ParseFrame *> & work, bool recursive)
cleanup_frames();
}

void Parser::processFixedPoint(vector<ParseFrame *> &work, bool recursive) {// We haven't yet reached a fixedpoint; let's recurse
void Parser::processFixedPoint(LockFreeQueue<ParseFrame *> &work, bool recursive) {// We haven't yet reached a fixedpoint; let's recurse
{
boost::lock_guard<DelayedFrames> g(delayed_frames);

Expand Down Expand Up @@ -769,13 +786,12 @@ void Parser::processFixedPoint(vector<ParseFrame *> &work, bool recursive) {// W
}
}
// Recurse through parse_frames
parsing_printf("[%s] Calling parse_frames again, work.size() = %d\n",
__FILE__,
work.size());
parsing_printf("[%s] Calling parse_frames again... \n", __FILE__);

parse_frames(work, recursive);
}

void Parser::processCycle(vector<ParseFrame *> &work, bool recursive) {// If we've reached a fixedpoint and have remaining frames, we must
void Parser::processCycle(LockFreeQueue<ParseFrame *> &work, bool recursive) {// If we've reached a fixedpoint and have remaining frames, we must
// have a cyclic dependency
vector<Function *> updated;
{
Expand Down Expand Up @@ -824,10 +840,8 @@ void Parser::processCycle(vector<ParseFrame *> &work, bool recursive) {// If we'
resumeFrames((*uIter), work);
}

if (work.size()) {
parsing_printf("[%s] Updated retstatus of delayed frames, trying again; work.size() = %d\n",
__FILE__,
work.size());
if (work.peek()) {
parsing_printf("[%s] Updated retstatus of delayed frames, trying again...\n", __FILE__);
parse_frames(work, recursive);
}
} else {
Expand All @@ -844,7 +858,7 @@ void Parser::cleanup_frames() {
std::copy(frames.begin(), frames.end(), std::back_inserter(pfv));
#if USE_OPENMP
#pragma omp parallel for schedule(auto)
for (int i = 0; i < pfv.size(); i++) {
for (unsigned int i = 0; i < pfv.size(); i++) {
ParseFrame *pf = pfv[i];
if (pf) {
_parse_data->remove_frame(pf);
Expand Down Expand Up @@ -2215,7 +2229,7 @@ void Parser::invalidateContainingFuncs(Function *owner, Block *b)
}

/* Add ParseFrames waiting on func back to the work queue */
void Parser::resumeFrames(Function * func, vector<ParseFrame *> & work)
void Parser::resumeFrames(Function * func, LockFreeQueue<ParseFrame *> & work)
{
// If we do not know the function's return status, don't put its waiters back on the worklist
if (func->retstatus() == UNSET) {
Expand Down Expand Up @@ -2245,7 +2259,7 @@ void Parser::resumeFrames(Function * func, vector<ParseFrame *> & work)
for (set<ParseFrame *>::iterator fIter = vec.begin();
fIter != vec.end();
++fIter) {
work.push_back(*fIter);
work.insert(*fIter);
}
// remove func from delayedFrames map
delayed_frames.frames.erase(func);
Expand Down
16 changes: 8 additions & 8 deletions parseAPI/src/Parser.h
Expand Up @@ -197,13 +197,13 @@ class Parser {
pair<Function*,Edge*> bind_call(
ParseFrame & frame, Address target,Block *cur,Edge *exist);

void parse_frames(std::vector<ParseFrame *> &, bool);
void parse_frames(LockFreeQueue<ParseFrame *> &, bool);
void parse_frame(ParseFrame & frame,bool);

void resumeFrames(Function * func, vector<ParseFrame *> & work);
void resumeFrames(Function * func, LockFreeQueue<ParseFrame *> & work);

// defensive parsing details
void tamper_post_processing(std::vector<ParseFrame *>&, ParseFrame *);
void tamper_post_processing(LockFreeQueue<ParseFrame *>&, ParseFrame *);
ParseFrame * getTamperAbsFrame(Function *tamperFunc);

/* implementation of the parsing loop */
Expand Down Expand Up @@ -243,22 +243,22 @@ class Parser {

struct NewFrames : public std::set<ParseFrame*>, public boost::lockable_adapter<boost::mutex> {};

vector<ParseFrame *> ProcessOneFrame(ParseFrame *pf, bool recursive);
LockFreeQueueItem<ParseFrame *> *ProcessOneFrame(ParseFrame *pf, bool recursive);

void SpawnProcessFrame
(ParseFrame *frame, bool recursive, LockFreeQueue<ParseFrame *> *work_queue,
std::atomic<int> *inprogress);

void ProcessFrames
(vector<ParseFrame *> *work, bool recursive);
(LockFreeQueue<ParseFrame *> *work_queue, bool recursive);

void cleanup_frames() ;

void processCycle(vector<ParseFrame *> &work, bool recursive);
void processCycle(LockFreeQueue<ParseFrame *> &work, bool recursive);

void processFixedPoint(vector<ParseFrame *> &work, bool recursive);
void processFixedPoint(LockFreeQueue<ParseFrame *> &work, bool recursive);

vector<ParseFrame *> postProcessFrame(ParseFrame *pf, bool recursive);
LockFreeQueueItem<ParseFrame *> *postProcessFrame(ParseFrame *pf, bool recursive);

void updateBlockEnd(Block *b, Address addr, Address previnsn, region_data *rd) const;
};
Expand Down
6 changes: 4 additions & 2 deletions parseAPI/src/ParserDetails.C
Expand Up @@ -230,7 +230,9 @@ Parser::getTamperAbsFrame(Function *tamperFunc) {
// In the second case, add a new ParseFrame to the worklist or
// trigger parsing in the target object.
void
Parser::tamper_post_processing(vector<ParseFrame *> &work, ParseFrame *pf) {
Parser::tamper_post_processing(LockFreeQueue<ParseFrame *> &work_queue, ParseFrame *pf) {
vector<ParseFrame *> work;
std::copy(work_queue.begin(), work_queue.end(), std::back_inserter(work));
// tampers with stack by relative amount:
// adjust CALL_FT target edge if there is one
for (unsigned widx = 0;
Expand Down Expand Up @@ -284,7 +286,7 @@ Parser::tamper_post_processing(vector<ParseFrame *> &work, ParseFrame *pf) {
if (tf) {
mal_printf("adding TAMPER_ABS target %lx frame\n",
pf->func->_tamper_addr);
work.push_back(tf);
work_queue.insert(tf);
}
} else { // target is in another object, parse there
mal_printf("adding TAMPER_ABS target %lx "
Expand Down

0 comments on commit 53a61df

Please sign in to comment.