Skip to content

Commit

Permalink
Update LoadBalancer.hpp from current version
Browse files Browse the repository at this point in the history
  • Loading branch information
LennoxLiu committed Apr 7, 2024
1 parent 775373c commit dd28563
Showing 1 changed file with 117 additions and 75 deletions.
192 changes: 117 additions & 75 deletions hpc/singularity/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <memory>
#include <filesystem>
#include "../lib/umbridge.h"
#include <unistd.h>

// run and get the result of command
std::string getCommandOutput(const std::string command)
Expand All @@ -30,18 +31,6 @@ std::string getCommandOutput(const std::string command)
return output;
}

// wait until file is created
bool waitForFile(const std::string &filename)
{
// Check if the file exists
while (!std::filesystem::exists(filename)) {
// If the file doesn't exist, wait for a certain period
std::this_thread::sleep_for(std::chrono::seconds(1));
}

return true;
}

std::string readUrl(const std::string &filename)
{
std::ifstream file(filename);
Expand All @@ -65,69 +54,23 @@ std::string readUrl(const std::string &filename)
return url;
}

// state = ["WAITING", "RUNNING", "FINISHED", "CANCELED"]
bool waitForHQJobState(const std::string &job_id, const std::string &state = "COMPLETED")
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
// std::cout << "Checking runtime: " << command << std::endl;
std::string job_status;

do
{
job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "" || (state != "FINISHED" && job_status == "FINISHED") || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for job status failure, status : " << job_status << std::endl;
return false;
}
// std::cout<<"Job status: "<<job_status<<std::endl;
sleep(1);
} while (job_status != state);

return true;
}

std::string submitHQJob()
{
std::string hq_command = "hq submit --output-mode=quiet hq_scripts/job.sh";

std::string job_id = getCommandOutput(hq_command);

// Delete the line break
if (!job_id.empty())
job_id.pop_back();

std::cout << "Waiting for job " << job_id << " to start." << std::endl;

// Wait for the HQ Job to start
waitForHQJobState(job_id, "RUNNING");

// Also wait until job is running and url file is written
waitForFile("./urls/url-" + job_id + ".txt");

std::cout << "Job " << job_id << " started." << std::endl;

return job_id;
}
std::mutex job_submission_mutex;
int hq_submit_delay_ms = 0;

class HyperQueueJob
{
public:
HyperQueueJob(std::string model_name, bool start_client=true)
static std::atomic<int32_t> job_count;
HyperQueueJob(std::string model_name, bool start_client = true,
bool force_default_submission_script = false)
{
job_id = submitHQJob();
job_id = submitHQJob(model_name, force_default_submission_script);

// Get the server URL
server_url = readUrl("./urls/url-" + job_id + ".txt");

// Start a client, using unique pointer
if(start_client)
if (start_client)
{
client_ptr = std::make_unique<umbridge::HTTPModel>(server_url, model_name);
}
Expand All @@ -146,22 +89,121 @@ class HyperQueueJob
std::unique_ptr<umbridge::HTTPModel> client_ptr;

private:
std::string submitHQJob(const std::string &model_name, bool force_default_submission_script = false)
{
// Add optional delay to job submissions to prevent issues in some cases.
if (hq_submit_delay_ms)
{
std::lock_guard<std::mutex> lock(job_submission_mutex);
std::this_thread::sleep_for(std::chrono::milliseconds(hq_submit_delay_ms));
}

// Use model specific job script if available, default otherwise.
const std::filesystem::path submission_script_dir("./hq_scripts");
const std::filesystem::path submission_script_generic("job.sh");
const std::filesystem::path submission_script_model_specific("job_" + model_name + ".sh");

std::string hq_command = "hq submit --output-mode=quiet ";
hq_command += "--priority=" + std::to_string(job_count) + " ";
if (std::filesystem::exists(submission_script_dir / submission_script_model_specific) && !force_default_submission_script)
{
hq_command += (submission_script_dir / submission_script_model_specific).string();
}
else if (std::filesystem::exists(submission_script_dir / submission_script_generic))
{
hq_command += (submission_script_dir / submission_script_generic).string();
}
else
{
throw std::runtime_error("Job submission script not found: Check that file 'hq_script/job.sh' exists.");
}

// Submit the HQ job and retrieve the HQ job ID.
std::string job_id = getCommandOutput(hq_command);
job_count--;

// Delete the line break.
if (!job_id.empty())
{
job_id.pop_back();
}

std::cout << "Waiting for job " << job_id << " to start." << std::endl;

// Wait for the HQ Job to start
waitForHQJobState(job_id, "RUNNING");

// Also wait until job is running and url file is written
waitForFile("./urls/url-" + job_id + ".txt", job_id);

std::cout << "Job " << job_id << " started." << std::endl;

return job_id;
}

bool waitForFile(const std::string &filename, const std::string &job_id)
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
std::string job_status;

// Check if the file exists
while (!std::filesystem::exists(filename))
{
// If the file doesn't exist, wait for a certain period
std::this_thread::sleep_for(std::chrono::seconds(1));

job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "FINISHED" || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for file failed. Beacuse job " << job_id << " status is : " << job_status << std::endl;
return false;
}
}

return true;
}

// state = ["WAITING", "RUNNING", "FINISHED", "CANCELED"]
bool waitForHQJobState(const std::string &job_id, const std::string &state)
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
// std::cout << "Checking runtime: " << command << std::endl;
std::string job_status;

do
{
job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "" || (state != "FINISHED" && job_status == "FINISHED") || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for job status failure, status : " << job_status << std::endl;
return false;
}

sleep(1);
} while (job_status != state);

return true;
}

std::string job_id;
};


class LoadBalancer : public umbridge::Model
{
public:
LoadBalancer(std::string name) : umbridge::Model(name)
{
// Setup HyperQueue server
std::system("hq server start &");
sleep(1); // Workaround: give the HQ server enough time to start.

// Create allocation queue
std::system("hq_scripts/allocation_queue.sh");
}
LoadBalancer(std::string name) : umbridge::Model(name) {}

std::vector<std::size_t> GetInputSizes(const json &config_json = json::parse("{}")) const override
{
Expand Down

0 comments on commit dd28563

Please sign in to comment.