Skip to content

Commit

Permalink
finished implementing backprop, debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
John Langford committed Sep 4, 2010
1 parent b7ba3f3 commit f7388c0
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 150 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ BOOST_LIBRARY = /usr/local/boost/lib
ARCH = -march=nocona

# for normal fast execution.
FLAGS = -Wall $(ARCH) -ffast-math -fno-strict-aliasing -D_FILE_OFFSET_BITS=64 -I $(BOOST_INCLUDE) -O3
#FLAGS = -Wall $(ARCH) -ffast-math -fno-strict-aliasing -D_FILE_OFFSET_BITS=64 -I $(BOOST_INCLUDE) -O3

# for parallelization
#FLAGS = -Wall $(ARCH) -ffast-math -Wno-strict-aliasing -D_FILE_OFFSET_BITS=64 -I $(BOOST_INCLUDE) -O3 -fopenmp
Expand All @@ -16,7 +16,7 @@ FLAGS = -Wall $(ARCH) -ffast-math -fno-strict-aliasing -D_FILE_OFFSET_BITS=64 -I
#FLAGS = -Wall $(ARCH) -ffast-math -D_FILE_OFFSET_BITS=64 -I $(BOOST_INCLUDE) -pg -g

# for valgrind
#FLAGS = -Wall $(ARCH) -ffast-math -D_FILE_OFFSET_BITS=64 -I $(BOOST_INCLUDE) -g -O0
FLAGS = -Wall $(ARCH) -ffast-math -D_FILE_OFFSET_BITS=64 -I $(BOOST_INCLUDE) -g -O0

BINARIES = vw
MANPAGES = vw.1
Expand Down
93 changes: 67 additions & 26 deletions delay_ring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ v_array<size_t> delay_indicies;//thread specific state.

v_array<example*> delay_ring;//delay_ring state & mutexes
v_array<size_t> threads_to_use;
size_t master_index;
size_t local_index;
size_t global_index;
pthread_mutex_t delay = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t delay_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t delay_nonempty = PTHREAD_COND_INITIALIZER;
Expand All @@ -24,14 +25,17 @@ void initialize_delay_ring()
if (global.local_prediction > 0 && (global.unique_id == 0 || global.backprop))
mesg = 1;
size_t nt = global.num_threads()+mesg;
if (global.backprop)
nt += global.num_threads();
reserve(delay_indicies, nt);
for (size_t i = 0; i < nt; i++)
delay_indicies[i] = 0;
reserve(delay_ring, ring_size);
for (size_t i = 0; i < ring_size; i++)
delay_ring[i] = NULL;
reserve(threads_to_use, ring_size);
master_index = 0;
local_index = 0;
global_index = 0;
}

void destroy_delay_ring()
Expand All @@ -48,33 +52,43 @@ bool thread_done(size_t thread)
{
bool ret;
pthread_mutex_lock(&delay);
ret = (delay_indicies[thread] == master_index);
ret = (delay_indicies[thread] == local_index)
&& (!global.backprop || delay_indicies[thread+1+global.num_threads()] == global_index);
pthread_mutex_unlock(&delay);
return ret;
}

example* return_example(size_t thread)
{
size_t index = delay_indicies[thread] % ring_size;
example* ret = delay_ring[index];

pthread_mutex_lock(&delay);
delay_indicies[thread]++;
pthread_mutex_unlock(&delay);

pthread_mutex_lock(&ret->lock);
if (--threads_to_use[index] == 0)
{
pthread_mutex_lock(&delay);
delay_ring[index] = NULL;
pthread_cond_broadcast(&delay_empty);
pthread_mutex_unlock(&delay);
}
pthread_mutex_unlock(&ret->lock);
return ret;
}

example* get_delay_example(size_t thread)
{//nonblocking
if (delay_indicies[thread] == master_index)
return NULL;
if (global.backprop &&
(delay_indicies[thread+1+global.num_threads()]
!= global_index))
return return_example(thread+1+global.num_threads());
else if (delay_indicies[thread] != local_index)
return return_example(thread);
else
{
size_t index = delay_indicies[thread] % ring_size;
example* ret = delay_ring[index];

delay_indicies[thread]++;

pthread_mutex_lock(&ret->lock);
if (--threads_to_use[index] == 0)
{
pthread_mutex_lock(&delay);
delay_ring[index] = NULL;
pthread_cond_broadcast(&delay_empty);
pthread_mutex_unlock(&delay);
}
pthread_mutex_unlock(&ret->lock);
return ret;
}
return NULL;
}

example* blocking_get_delay_example(size_t thread)
Expand All @@ -84,13 +98,14 @@ example* blocking_get_delay_example(size_t thread)
while(delay_ring[index] == NULL)
pthread_cond_wait(&delay_nonempty, &delay);
pthread_mutex_unlock(&delay);

return get_delay_example(thread);
return return_example(thread);
}

void delay_example(example* ex, size_t count)
{
size_t delay_count = count+mesg;
if (global.backprop)
delay_count = 2*count+mesg;

if (delay_count == 0)
{
Expand All @@ -100,7 +115,7 @@ void delay_example(example* ex, size_t count)
}
else
{
size_t index = master_index % ring_size;
size_t index = local_index % ring_size;

pthread_mutex_lock(&delay);
while (delay_ring[index] != NULL)
Expand All @@ -110,7 +125,33 @@ void delay_example(example* ex, size_t count)
threads_to_use[index] = delay_count;
ex->threads_to_finish = delay_count;

master_index++;
local_index++;
if (count == 0)
for (size_t i = 0; i < global.num_threads();i++)
delay_indicies[i]++;

pthread_cond_broadcast(&delay_nonempty);
pthread_mutex_unlock(&delay);
}
}

void delay_global_example(example* ex, size_t count)
{//only called by delayed backprop when there is training to do.
if (count == 0)
{
ex->threads_to_finish = 0;
output_and_account_example(ex);
free_example(ex);
}
else
{
size_t index = global_index % ring_size;

pthread_mutex_lock(&delay);
while (delay_ring[index] != NULL)
pthread_cond_wait(&delay_empty, &delay);

global_index++;
pthread_cond_broadcast(&delay_nonempty);
pthread_mutex_unlock(&delay);
}
Expand Down
1 change: 1 addition & 0 deletions delay_ring.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ void destroy_delay_ring();
example* get_delay_example(size_t thread);
example* blocking_get_delay_example(size_t thread);
void delay_example(example* ex, size_t count);
void delay_global_example(example* ex, size_t count);
bool thread_done(size_t thread);

#endif
2 changes: 1 addition & 1 deletion example.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
struct label_data {
double label;
float weight;
bool undo;
};

struct feature {
Expand Down Expand Up @@ -44,6 +43,7 @@ struct example // core example datatype.
float final_prediction;
float loss;
float eta_round;
float global_weight;

pthread_mutex_t lock; //thread coordination devices
size_t threads_to_finish;
Expand Down
14 changes: 8 additions & 6 deletions gd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ void output_and_account_example(example* ec)
{
int f = global.final_prediction_sink[i].fd;
float w;
if (global.reg.weight_vectors != NULL) {
w = global.reg.weight_vectors[0][global.final_prediction_sink[i].id];
if (global.reg->weight_vectors != NULL) {
w = global.reg->weight_vectors[0][global.final_prediction_sink[i].id];
} else {
w = 0.;
}
global.print(f, ec->final_prediction, w, ec->tag);
cout << "printing prediction = " << ec->final_prediction << "\t" << w << "\t" << ec->global_weight << endl;
global.print(f, ec->final_prediction, w*ec->global_weight, ec->tag);
}

print_update(ec);
Expand Down Expand Up @@ -315,13 +316,14 @@ void local_predict(example* ec, size_t num_threads, gd_vars& vars, regressor& re
vars.t += ld->weight;

ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, vars.eta/pow(vars.t,vars.power_t), ec->total_sum_feat_sq, ld->weight);
if (ld->undo)
ec->eta_round = -ec->eta_round;
}

if (global.local_prediction > 0)
{
prediction pred = {ec->final_prediction+ ec->eta_round * ec->total_sum_feat_sq, ec->example_counter};
prediction pred={0};
pred.p = ec->final_prediction+ ec->eta_round * ec->total_sum_feat_sq;
pred.example_number = ec->example_counter;
cout << pred.p << "\t" << pred.example_number << endl;
send_prediction(global.local_prediction, pred);
if (global.unique_id == 0)
{
Expand Down
2 changes: 1 addition & 1 deletion global_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct global_data {
double sum_loss_since_last_dump;
float dump_interval;// when should I update for the user.

regressor reg;
regressor* reg;
};
extern pthread_mutex_t io;
extern global_data global;
Expand Down
Loading

0 comments on commit f7388c0

Please sign in to comment.