Skip to content

Commit

Permalink
Merge branch 'fixing-issue-48' into singularity
Browse files Browse the repository at this point in the history
  • Loading branch information
LennoxLiu committed Apr 7, 2024
2 parents 1af04ca + 8af4d31 commit 955221c
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 23 deletions.
71 changes: 67 additions & 4 deletions hpc/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,28 @@ std::string getCommandOutput(const std::string command)
return output;
}

// wait until file is created
bool waitForFile(const std::string &filename)
bool waitForFile(const std::string &filename, const std::string &job_id)
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
std::string job_status;

// Check if the file exists
while (!std::filesystem::exists(filename)) {
// If the file doesn't exist, wait for a certain period
std::this_thread::sleep_for(std::chrono::seconds(1));

job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "FINISHED" || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for file failed. Beacuse job "<< job_id <<" status is : " << job_status << std::endl;
return false;
}
}

return true;
Expand Down Expand Up @@ -65,8 +80,56 @@ std::string readUrl(const std::string &filename)
return url;
}

std::mutex job_submission_mutex;
int hq_submit_delay_ms = 0;
// state = ["WAITING", "RUNNING", "FINISHED", "CANCELED"]
bool waitForHQJobState(const std::string &job_id, const std::string &state = "COMPLETED")
{
const std::string command = "hq job info " + job_id + " | grep State | awk '{print $4}'";
// std::cout << "Checking runtime: " << command << std::endl;
std::string job_status;

do
{
job_status = getCommandOutput(command);

// Delete the line break
if (!job_status.empty())
job_status.pop_back();

// Don't wait if there is an error or the job is ended
if (job_status == "" || (state != "FINISHED" && job_status == "FINISHED") || job_status == "FAILED" || job_status == "CANCELED")
{
std::cerr << "Wait for job status failure, status : " << job_status << std::endl;
return false;
}
// std::cout<<"Job status: "<<job_status<<std::endl;
sleep(1);
} while (job_status != state);

return true;
}

std::string submitHQJob()
{
std::string hq_command = "hq submit --output-mode=quiet hq_scripts/job.sh";

std::string job_id = getCommandOutput(hq_command);

// Delete the line break
if (!job_id.empty())
job_id.pop_back();

std::cout << "Waiting for job " << job_id << " to start." << std::endl;

// Wait for the HQ Job to start
waitForHQJobState(job_id, "RUNNING");

// Also wait until job is running and url file is written
waitForFile("./urls/url-" + job_id + ".txt", job_id);

std::cout << "Job " << job_id << " started." << std::endl;

return job_id;
}

class HyperQueueJob
{
Expand Down
4 changes: 2 additions & 2 deletions hpc/hq_scripts/job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

function get_avaliable_port {
# Define the range of ports to select from
MIN_PORT=1024
MIN_PORT=60000
MAX_PORT=65535

# Generate a random port number
port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1)

# Check if the port is in use
while lsof -Pi :$port -sTCP:LISTEN -t >/dev/null; do
while lsof -Pi :$port -t; do
# If the port is in use, generate a new random port number
port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1)
done
Expand Down
12 changes: 12 additions & 0 deletions hpc/test/MultiplyBy2/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
all: build-server build-lb run-lb

load-balancer-files = ../../LoadBalancer.cpp ../../LoadBalancer.hpp ../../lib/httplib.h ../../lib/json.hpp ../../lib/umbridge.h

build-server:
g++ -O3 -w -std=c++11 minimal-server.cpp -o server -lssl -lcrypto -pthread

build-lb:
g++ -O3 -Wno-unused-result -std=c++17 $(load-balancer-files) -o load-balancer -pthread

run-lb:
./load-balancer
Empty file modified hpc/test/MultiplyBy2/client.py
100644 → 100755
Empty file.
25 changes: 25 additions & 0 deletions hpc/test/MultiplyBy2/client.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# export TEST_DELAY=1e4

echo "Sending requests..."

for i in {1..300}
do
# Expected output: {"output":[[200.0]]}
# Check if curl output equals expected output
# If not, print error message

if [ "$(curl -s http://localhost:4242/Evaluate -X POST -d '{"name": "forward", "input": [[100.0]]}')" == '{"output":[[200.0]]}' ]; then
echo -n "y"
else
echo $(curl -s http://localhost:4242/Evaluate -X POST -d '{"name": "forward", "input": [[100.0]]}')
echo -n "n"
#echo "Error: curl output does not equal expected output"
fi &

done

echo "Requests sent. Waiting for responses..."

wait
11 changes: 11 additions & 0 deletions hpc/test/MultiplyBy2/hq_scripts/allocation_queue.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#! /bin/bash

hq alloc add slurm --time-limit 10m \
--idle-timeout 3m \
--backlog 1 \
--workers-per-alloc 1 \
--max-worker-count 5 \
--resource "model=range(1-1)" \
--cpus=1 \
-- -p "devel" # Add any neccessary SLURM arguments
# Any parameters after -- will be passed directly to sbatch (e.g. credentials, partition, mem, etc.)
52 changes: 52 additions & 0 deletions hpc/test/MultiplyBy2/hq_scripts/job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#! /bin/bash

#HQ --resource model=1
#HQ --time-request=1m
#HQ --time-limit=2m

# Launch model server, send back server URL
# and wait to ensure that HQ won't schedule any more jobs to this allocation.
load_balancer_dir="./"
# Write port to file identified by HQ job ID.
mkdir -p "$load_balancer_dir/ports"

function get_avaliable_port {
# Define the range of ports to select from
MIN_PORT=49152
MAX_PORT=65535

# Generate a random port number
port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1)

# Check if the port is in use
while lsof -Pi :$port -t; do
# If the port is in use, generate a new random port number
port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1)

done

echo "$port" > "$load_balancer_dir/ports/$HQ_JOB_ID-1.txt"
echo $port
}


port=$(get_avaliable_port)
# sleep 1 # Wait for the port to be correctly assigned, otherwise it will sometimes get strange value in $port

echo "$port" > "$load_balancer_dir/ports/$HQ_JOB_ID-2.txt"

export PORT=$port && ./server & # Assume that server sets the port according to the environment variable 'PORT'.


host=$(hostname -I | awk '{print $1}')

# Wait for model server to start
while ! curl -s "http://$host:$port/Info" > /dev/null; do
sleep 1
done

# Write server URL to file identified by HQ job ID.
mkdir -p "$load_balancer_dir/urls"
echo "http://$host:$port" > "$load_balancer_dir/urls/url-$HQ_JOB_ID.txt"

sleep infinity # keep the job occupied
39 changes: 22 additions & 17 deletions hpc/test/MultiplyBy2/minimal-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@

#include "../../../lib/umbridge.h"

void logMessage(const std::string& message) {
// Get the current time point
auto currentTime = std::chrono::system_clock::now();

// Convert the time point to a time_t object
std::time_t currentTime_t = std::chrono::system_clock::to_time_t(currentTime);

// Convert the time_t to a struct tm in local time
std::tm* localTime = std::localtime(&currentTime_t);

// Format the timestamp
std::cout << "[" << std::put_time(localTime, "%Y-%m-%d %H:%M:%S") << "] server: " << message << std::endl;
}

class ExampleModel : public umbridge::Model
{
public:
Expand Down Expand Up @@ -51,6 +65,7 @@ std::string getCommandOutput(const std::string command)
if (!pipe)
{
std::cerr << "Failed to execute the command: " + command << std::endl;
logMessage("Failed to execute the command: " + command);
return "";
}

Expand All @@ -73,11 +88,12 @@ int main(int argc, char *argv[])
int port = 0;
if (port_cstr == NULL)
{
std::cout << "Environment variable PORT not set! Using port 4242 as default." << std::endl;
logMessage("Environment variable PORT not set! Using port 4242 as default.");
port = 4242;
}
else
{
logMessage("Environment variable PORT set to " + std::string(port_cstr) + ".");
port = atoi(port_cstr);
}

Expand All @@ -87,7 +103,7 @@ int main(int argc, char *argv[])
{
test_delay = atoi(delay_cstr);
}
std::cout << "Evaluation delay set to " << test_delay << " ms." << std::endl;
logMessage("Evaluation delay set to " + std::to_string(test_delay) + " ms.");

// Set up and serve model
ExampleModel model(test_delay);
Expand All @@ -96,22 +112,11 @@ int main(int argc, char *argv[])
ExampleModel model4(5, "outward");

std::string hostname = "0.0.0.0";
/*
if (argc == 2)
{
hostname = argv[1];
}
else
{
hostname = getCommandOutput("hostname"); // get the hostname of node
// delete the line break
if (!hostname.empty())
hostname.pop_back();
}
*/
std::cout << "Hosting server at : "
<< "http://" << hostname << ":" << port << std::endl;

logMessage("Hosting server at : http://" + hostname + ":" + std::to_string(port));

umbridge::serveModels({&model,&model2,&model3,&model4}, hostname, port); // start server at the hostname

logMessage("Server exit: http://" + hostname + ":" + std::to_string(port));
return 0;
}

0 comments on commit 955221c

Please sign in to comment.