Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Various bugfixes for cluster support

  • Loading branch information...
commit 427c583435bf2d7fbf47fa059fa36f6ac5cb5dee 1 parent b4b8454
John authored
25 cache.cc
View
@@ -30,23 +30,26 @@ int read_cached_features(parser* p, void* ec)
return 0;
char* c;
- size_t num_indices = 0;
- if (buf_read(p->input, c, int_size) < int_size)
+ unsigned char num_indices = 0;
+ if (buf_read(p->input, c, sizeof(num_indices)) < sizeof(num_indices))
return 0;
- c = run_len_decode(c, num_indices);
+ num_indices = *(unsigned char*)c;
+ c += sizeof(num_indices);
+
p->input.set(c);
for (;num_indices > 0; num_indices--)
{
size_t temp;
- if((temp = buf_read(p->input,c,int_size + sizeof(size_t))) < char_size + sizeof(size_t)) {
+ unsigned char index = 0;
+ if((temp = buf_read(p->input,c,sizeof(index) + sizeof(size_t))) < sizeof(index) + sizeof(size_t)) {
cerr << "truncated example! " << temp << " " << char_size +sizeof(size_t) << endl;
return 0;
}
- size_t index = 0;
- c = run_len_decode(c, index);
- push(ae->indices, index);
+ index = *(unsigned char*)c;
+ c+= sizeof(index);
+ push(ae->indices, (size_t)index);
v_array<feature>* ours = ae->atomics+index;
size_t storage = *(size_t *)c;
c += sizeof(size_t);
@@ -104,7 +107,7 @@ void output_int(io_buf& cache, size_t s)
cache.set(c);
}
-void output_features(io_buf& cache, size_t index, feature* begin, feature* end)
+void output_features(io_buf& cache, unsigned char index, feature* begin, feature* end)
{
char* c;
@@ -113,8 +116,10 @@ void output_features(io_buf& cache, size_t index, feature* begin, feature* end)
if (i->x != 1. && i->x != -1.)
storage+=sizeof(float);
- buf_write(cache, c, int_size + storage + sizeof(size_t));
- c = run_len_encode(c, index);
+ buf_write(cache, c, sizeof(index) + storage + sizeof(size_t));
+ *(unsigned char*)c = index;
+ c += sizeof(index);
+
char *storage_size_loc = c;
c += sizeof(size_t);
2  cache.h
View
@@ -18,6 +18,6 @@ char* run_len_encode(char *p, size_t i);
int read_cached_features(parser* p, void* ec);
void cache_features(io_buf& cache, example* ae);
void output_int(io_buf& cache, size_t s);
-void output_features(io_buf& cache, size_t index, feature* begin, feature* end);
+void output_features(io_buf& cache, unsigned char index, feature* begin, feature* end);
#endif
5 gd.cc
View
@@ -48,10 +48,7 @@ void* gd_thread(void *in)
else if (thread_done(thread_num))
{
if (global.local_prediction > 0)
- {
- cout << "shutting down local_prediction" << endl;
- shutdown(global.local_prediction, SHUT_WR);
- }
+ shutdown(global.local_prediction, SHUT_WR);
return NULL;
}
else
25 multisource.cc
View
@@ -5,12 +5,6 @@
#include <sys/socket.h>
#include <errno.h>
-bool get_prediction(int sock, prediction &p)
-{
- bool ret = (recv(sock, &p, sizeof(p), MSG_DONTWAIT) == sizeof(p));
- return ret;
-}
-
bool blocking_get_prediction(int sock, prediction &p)
{
int count = read(sock, &p, sizeof(p));
@@ -34,8 +28,6 @@ void reset(partial_example &ex)
ex.features.erase();
}
-int receive_count = 0;
-
int receive_features(parser* p, void* ex)
{
example* ae = (example*)ex;
@@ -64,20 +56,28 @@ int receive_features(parser* p, void* ex)
close(sock);
memmove(p->input.files.begin+index,
p->input.files.begin+index+1,
- p->input.files.index() - index-1);
+ (p->input.files.index() - index-1)*sizeof(int));
+ p->input.files.pop();
memmove(p->ids.begin+index,
p->ids.begin+index+1,
- p->ids.index() - index-1);
- p->input.files.pop();
+ (p->ids.index() - index-1)*sizeof(size_t));
p->ids.pop();
+ memmove(p->counts.begin+index,
+ p->counts.begin+index+1,
+ (p->counts.index() - index-1)*sizeof(size_t));
+ p->counts.pop();
index--;
}
else
{
+ if (pre.example_number != ++ (p->counts[index]))
+ cout << "count is off! " << pre.example_number << " != " << p->counts[index] << endl;
+ if (pre.example_number == p->finished_count + ring_size -1)
+ FD_CLR(sock,&fds);//this ones to far ahead, let the buffer fill for awhile.
size_t ring_index = pre.example_number % p->pes.index();
if (p->pes[ring_index].features.index() == 0)
p->pes[ring_index].example_number = pre.example_number;
- if (p->pes[ring_index].example_number != pre.example_number)
+ if (p->pes[ring_index].example_number != (int)pre.example_number)
cerr << "Error, example " << p->pes[ring_index].example_number << " != " << pre.example_number << endl;
feature f = {pre.p, p->ids[index]};
push(p->pes[ring_index].features, f);
@@ -96,6 +96,7 @@ int receive_features(parser* p, void* ex)
label_data* ld = (label_data*)ae->ld;
*ld = p->pes[ring_index].ld;
reset(p->pes[ring_index]);
+ p->finished_count++;
return ae->atomics[multindex].index();
}
}
3  multisource.h
View
@@ -6,14 +6,13 @@
struct prediction {
float p;
- int example_number;
+ size_t example_number;
};
const size_t multindex = 5;
int receive_features(parser* p, void* ex);
void send_prediction(int sock, prediction pred);
-bool get_prediction(int sock, prediction &p);
bool blocking_get_prediction(int sock, prediction &p);
#endif
1  parse_args.cc
View
@@ -41,6 +41,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
("min_prediction", po::value<float>(&vars.min_prediction)->default_value(0), "Smallest prediction to output")
("max_prediction", po::value<float>(&vars.max_prediction)->default_value(1), "Largest prediction to output")
("multisource", po::value<size_t>(), "multiple sources for daemon input")
+ ("noop","do no learning")
("of", po::value<size_t>(&of)->default_value(1), "keep k of <n> features")
("port", po::value<size_t>(),"port to listen on")
("power_t", po::value<float>(&vars.power_t)->default_value(0.), "t power value")
6 parse_regressor.cc
View
@@ -121,10 +121,10 @@ void parse_regressor_args(po::variables_map& vm, regressor& r, string& final_reg
regressor.close();
}
if (!initialized)
- if(final_regressor_name != string(""))
- initialize_regressor(r);
- else
+ if(vm.count("noop") || vm.count("sendto"))
r.weight_vectors = NULL;
+ else
+ initialize_regressor(r);
}
void free_regressor(regressor &r)
9 parser.cc
View
@@ -231,6 +231,9 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
cout << "reserving " << ring_size << endl;
calloc_reserve(par->pes,ring_size);
par->pes.end = par->pes.begin+ring_size;
+ calloc_reserve(par->counts,ring_size);
+ par->counts.end = par->counts.begin+ring_size;
+ par->finished_count = 0;
}
else
par->reader = read_cached_features;
@@ -468,7 +471,9 @@ void setup_example(example* ae)
size_t current = expert_size;
while (current <= length)
{
- feature* ret = search(f, current, ae->atomics[*i].end);
+ feature* ret = f;
+ if (ae->atomics[*i].end > f)
+ ret = search(f, current, ae->atomics[*i].end);
push(ae->subsets[*i],ret);
f = ret;
current += expert_size;
@@ -627,5 +632,7 @@ void end_parser(parser* pf)
}
if (pf->ids.begin != NULL)
free(pf->ids.begin);
+ if (pf->counts.begin != NULL)
+ free(pf->counts.begin);
}
2  parser.h
View
@@ -36,6 +36,8 @@ struct parser {
v_array<partial_example> pes;//partial examples
v_array<size_t> ids; //unique ids for sources
+ v_array<size_t> counts; //partial examples received from sources
+ size_t finished_count;//the number of finished examples;
int label_sock;
int max_fd;
};
13 simple_label.cc
View
@@ -16,13 +16,14 @@ size_t read_cached_simple_label(void* v, io_buf& cache)
{
label_data* ld = (label_data*) v;
char *c;
- size_t total = sizeof(ld->label)+sizeof(ld->weight)+int_size;
size_t tag_size = 0;
+ size_t total = sizeof(ld->label)+sizeof(ld->weight)+sizeof(tag_size);
if (buf_read(cache, c, total) < total)
return 0;
c = bufread_simple_label(ld,c);
-
- c = run_len_decode(c, tag_size);
+
+ tag_size = *(size_t*)c;
+ c += sizeof(tag_size);
cache.set(c);
if (buf_read(cache, c, tag_size) < tag_size)
@@ -47,10 +48,12 @@ void cache_simple_label(void* v, io_buf& cache)
{
char *c;
label_data* ld = (label_data*) v;
- buf_write(cache, c, sizeof(ld->label)+sizeof(ld->weight)+int_size+ld->tag.index());
+ buf_write(cache, c, sizeof(ld->label)+sizeof(ld->weight)+sizeof(ld->tag.index())+ld->tag.index());
c = bufcache_simple_label(ld,c);
- c = run_len_encode(c, ld->tag.index());
+ *(size_t*)c = ld->tag.index();
+ c += sizeof(ld->tag.index());
+
memcpy(c,ld->tag.begin,ld->tag.index());
c += ld->tag.index();
2  vw.cc
View
@@ -63,7 +63,7 @@ gd_vars* vw(int argc, char *argv[])
setup_send();
destroy_send();
}
- else if (final_regressor_name == string("") && !vm.count("initial_regressor"))
+ else if (vm.count("noop"))
{
start_noop();
end_noop();
Please sign in to comment.
Something went wrong with that request. Please try again.