Skip to content

Commit

Permalink
Merge pull request #115 from MelLain/MelLain_issues_1
Browse files Browse the repository at this point in the history
Mel lain issues 1
  • Loading branch information
bigartm committed Feb 14, 2015
2 parents a40682d + 0da5152 commit 76f2901
Show file tree
Hide file tree
Showing 32 changed files with 2,181 additions and 285 deletions.
27 changes: 18 additions & 9 deletions src/artm/c_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,34 @@ int ArtmSaveBatch(const char* disk_path, int length, const char* batch) {
} CATCH_EXCEPTIONS;
}

int ArtmAddBatch(int master_id, int length, const char* batch) {
int ArtmAddBatch(int master_id, int length, const char* add_batch_args) {
try {
artm::Batch batch_object;
ParseFromArray(batch, length, &batch_object);
master_component(master_id)->AddBatch(batch_object);
return ARTM_SUCCESS;
artm::AddBatchArgs add_batch_args_object;
ParseFromArray(add_batch_args, length, &add_batch_args_object);
bool result = master_component(master_id)->AddBatch(add_batch_args_object);
if (result) {
return ARTM_SUCCESS;
} else {
set_last_error("Artm's processor queue is full. Call ArtmAddBatch() later.");
return ARTM_STILL_WORKING;
}
} CATCH_EXCEPTIONS;
}

int ArtmInvokeIteration(int master_id, int iterations_count) {
int ArtmInvokeIteration(int master_id, int length, const char* invoke_iteration_args) {
try {
master_component(master_id)->InvokeIteration(iterations_count);
artm::InvokeIterationArgs invoke_iteration_args_object;
ParseFromArray(invoke_iteration_args, length, &invoke_iteration_args_object);
master_component(master_id)->InvokeIteration(invoke_iteration_args_object);
return ARTM_SUCCESS;
} CATCH_EXCEPTIONS;
}

int ArtmWaitIdle(int master_id, int timeout_milliseconds) {
int ArtmWaitIdle(int master_id, int length, const char* wait_idle_args) {
try {
bool result = master_component(master_id)->WaitIdle(timeout_milliseconds);
artm::WaitIdleArgs wait_idle_args_object;
ParseFromArray(wait_idle_args, length, &wait_idle_args_object);
bool result = master_component(master_id)->WaitIdle(wait_idle_args_object);

if (result) {
return ARTM_SUCCESS;
Expand Down
6 changes: 3 additions & 3 deletions src/artm/c_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ extern "C" {
DLL_PUBLIC int ArtmReconfigureDictionary(int master_id, int length, const char* dictionary_config);
DLL_PUBLIC int ArtmDisposeDictionary(int master_id, const char* dictionary_name);

DLL_PUBLIC int ArtmAddBatch(int master_id, int length, const char* batch);
DLL_PUBLIC int ArtmInvokeIteration(int master_id, int iterations_count);
DLL_PUBLIC int ArtmWaitIdle(int master_id, int timeout_milliseconds);
DLL_PUBLIC int ArtmAddBatch(int master_id, int length, const char* add_batch_args);
DLL_PUBLIC int ArtmInvokeIteration(int master_id, int length, const char* invoke_iteration_args);
DLL_PUBLIC int ArtmWaitIdle(int master_id, int length, const char* wait_idle_args);
DLL_PUBLIC int ArtmSynchronizeModel(int master_id, int length, const char* sync_model_args);

DLL_PUBLIC int ArtmOverwriteTopicModel(int master_id, int length, const char* topic_model);
Expand Down
16 changes: 16 additions & 0 deletions src/artm/core/batch_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ void BatchManager::DisposeModel(const ModelName& model_name) {
in_progress_.erase(model_name);
}

void BatchManager::AddAndNext(const BatchManagerTask& task) {
boost::lock_guard<boost::mutex> guard(lock_);

std::vector<ModelName> models = schema_->get()->GetModelNames();
for (auto &model_name : models) {
auto model_iter = in_progress_.find(model_name);
if (model_iter == in_progress_.end()) {
in_progress_.insert(std::make_pair(
model_name, std::make_shared<std::map<boost::uuids::uuid, std::string>>()));
model_iter = in_progress_.find(model_name);
}

model_iter->second->insert(std::make_pair(task.uuid, task.file_path));
}
}

BatchManagerTask BatchManager::Next() {
boost::lock_guard<boost::mutex> guard(lock_);
for (auto iter = tasks_.begin(); iter != tasks_.end(); ++iter) {
Expand Down
8 changes: 7 additions & 1 deletion src/artm/core/batch_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ class BatchManager : boost::noncopyable, public Notifiable {
explicit BatchManager(ThreadSafeHolder<InstanceSchema>* schema);
virtual ~BatchManager() {}

// Add batch to the task queue.
// Add() adds the a passive task to the queue
// Next() finds some passive task and moves it to the active queue (for each model)
// AddAndNext() puts the task directly to the active queue (for each model)
// Done() marks an active task as "done" (for a given model)

// OK to add the same uuid multiple times.
void Add(const BatchManagerTask& task);
void Add(boost::uuids::uuid uuid, std::string file_path) { Add(BatchManagerTask(uuid, file_path)); }
Expand All @@ -46,6 +50,8 @@ class BatchManager : boost::noncopyable, public Notifiable {
// until it is marked as processed by Done().
BatchManagerTask Next();

void AddAndNext(const BatchManagerTask& task);

// Eliminates uuid from "in progress" set.
void Done(const boost::uuids::uuid& id, const ModelName& model_name);

Expand Down
41 changes: 34 additions & 7 deletions src/artm/core/data_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ LocalDataLoader::~LocalDataLoader() {
}
}

void LocalDataLoader::AddBatch(const Batch& batch, bool invoke) {
bool LocalDataLoader::AddBatch(const AddBatchArgs& args) {
auto& batch = args.batch();
int timeout = args.timeout_milliseconds();
MasterComponentConfig config = instance()->schema()->config();

std::shared_ptr<Batch> modified_batch;
if (config.compact_batches()) {
modified_batch = std::make_shared<Batch>(); // constructor
Expand All @@ -114,18 +117,36 @@ void LocalDataLoader::AddBatch(const Batch& batch, bool invoke) {
BatchHelpers::PopulateClassId(modified_batch.get());
}

boost::uuids::uuid uuid = generation_->AddBatch(modified_batch);
if (instance()->schema()->config().online_batch_processing()) {
auto time_start = boost::posix_time::microsec_clock::local_time();
for (;;) {
if (instance_->processor_queue()->size() < config.processor_queue_max_size()) break;

if (invoke)
instance_->batch_manager()->Add(uuid, std::string());
}
boost::this_thread::sleep(boost::posix_time::milliseconds(kIdleLoopFrequency));

if (timeout >= 0) {
auto time_end = boost::posix_time::microsec_clock::local_time();
if ((time_end - time_start).total_milliseconds() >= timeout) return false;
}
}
auto pi = std::make_shared<ProcessorInput>();
pi->mutable_batch()->CopyFrom(*modified_batch);
boost::uuids::uuid uuid = boost::uuids::random_generator()();
pi->set_batch_uuid(boost::lexical_cast<std::string>(uuid));
instance_->batch_manager()->AddAndNext(BatchManagerTask(uuid, std::string()));
instance_->processor_queue()->push(pi);
} else {
generation_->AddBatch(modified_batch);
}
return true;
}

int LocalDataLoader::GetTotalItemsCount() const {
return generation_->GetTotalItemsCount();
}

void LocalDataLoader::InvokeIteration(int iterations_count) {
void LocalDataLoader::InvokeIteration(const InvokeIterationArgs& args) {
int iterations_count = args.iterations_count();
if (iterations_count <= 0) {
LOG(WARNING) << "DataLoader::InvokeIteration() was called with argument '"
<< iterations_count << "'. Call is ignored.";
Expand All @@ -147,7 +168,8 @@ void LocalDataLoader::InvokeIteration(int iterations_count) {
}
}

bool LocalDataLoader::WaitIdle(int timeout) {
bool LocalDataLoader::WaitIdle(const WaitIdleArgs& args) {
int timeout = args.timeout_milliseconds();
auto time_start = boost::posix_time::microsec_clock::local_time();
for (;;) {
if (instance_->batch_manager()->IsEverythingProcessed())
Expand Down Expand Up @@ -342,6 +364,11 @@ void RemoteDataLoader::ThreadFunction() {
break;
}

if (instance()->schema()->config().online_batch_processing()) {
boost::this_thread::sleep(boost::posix_time::milliseconds(kIdleLoopFrequency));
continue;
}

MasterComponentConfig config = instance()->schema()->config();
int processor_queue_size = instance()->processor_queue()->size();
int max_queue_size = config.processor_queue_max_size();
Expand Down
6 changes: 3 additions & 3 deletions src/artm/core/data_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ class LocalDataLoader : public DataLoader {
virtual ~LocalDataLoader();

int GetTotalItemsCount() const;
void AddBatch(const Batch& batch, bool invoke);
bool AddBatch(const AddBatchArgs& args);
virtual void Callback(ModelIncrement* model_increment);

void InvokeIteration(int iterations_count);
void InvokeIteration(const InvokeIterationArgs& args);

// Returns false if BigARTM is still processing the collection, otherwise true.
bool WaitIdle(int timeout = -1);
bool WaitIdle(const WaitIdleArgs& args);
void DisposeModel(ModelName model_name);
bool RequestThetaMatrix(const GetThetaMatrixArgs& get_theta_args,
::artm::ThetaMatrix* theta_matrix);
Expand Down
19 changes: 10 additions & 9 deletions src/artm/core/internals.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/artm/core/internals.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ service NodeControllerService {
rpc RequestThetaMatrix(GetThetaMatrixArgs) returns (ThetaMatrix);
rpc RequestScore(GetScoreValueArgs) returns (ScoreData);

rpc AddBatch(Batch) returns (Void);
rpc InvokeIteration(Void) returns (Void);
rpc WaitIdle(Void) returns (Int);
rpc AddBatch(AddBatchArgs) returns (Int);
rpc InvokeIteration(InvokeIterationArgs) returns (Void);
rpc WaitIdle(WaitIdleArgs) returns (Int);
rpc SynchronizeModel(SynchronizeModelArgs) returns (Void);
rpc InitializeModel(InitializeModelArgs) returns (Void);
}

0 comments on commit 76f2901

Please sign in to comment.