Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 29 additions & 64 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@ import (
func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) {
log.WithField("job", job).Info("Handling job")

err := q.getQueueForJob(ctx, job)
if err != nil {
log.WithError(err).Fatal("Failed to get status queue")
return
}

err = q.setjobReceived(ctx, job)
err := q.setjobReceived(ctx, job)
if err != nil {
q.setjobFailed(ctx, job)
return
Expand All @@ -35,27 +29,16 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker)
defer vm.shutDown()

var reqJSON []byte
switch job.Type {
case "command":
reqJSON, err = json.Marshal(agentExecReq{
ID: job.ID,
Command: job.Command,
})
if err != nil {
log.WithError(err).Error("Failed to marshal JSON request")
q.setjobFailed(ctx, job)
return
}
case "code":
reqJSON, err = json.Marshal(agentRunReq{
ID: job.ID,
Code: job.Code,
})
if err != nil {
log.WithError(err).Error("Failed to marshal JSON request")
q.setjobFailed(ctx, job)
return
}

reqJSON, err = json.Marshal(agentRunReq{
ID: job.ID,
Variant: job.Variant,
Code: job.Code,
})
if err != nil {
log.WithError(err).Error("Failed to marshal JSON request")
q.setjobFailed(ctx, job)
return
}

err = q.setjobRunning(ctx, job)
Expand All @@ -65,44 +48,26 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker)
}

var httpRes *http.Response
var res agentExecRes

switch job.Type {
case "command":
httpRes, err = http.Post("http://"+vm.ip.String()+":8080/exec", "application/json", bytes.NewBuffer(reqJSON))
if err != nil || httpRes.StatusCode != 200 {
log.WithError(err).Error("Failed to request execution to agent")
q.setjobFailed(ctx, job)
return
}
json.NewDecoder(httpRes.Body).Decode(&res)
log.WithField("result", res).Info("Job execution finished")

err = q.setjobResult(ctx, job, res)
if err != nil {
q.setjobFailed(ctx, job)
}
var agentRes agentExecRes

case "code":
httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/c", "application/json", bytes.NewBuffer(reqJSON))
if err != nil {
log.WithError(err).Error("Failed to request execution to agent")
q.setjobFailed(ctx, job)
return
}
json.NewDecoder(httpRes.Body).Decode(&res)
log.WithField("result", res).Info("Job execution finished")

if httpRes.StatusCode != 200 {
log.WithField("res", res).Error("Failed to compile and run code")
q.setjobFailed(ctx, job)
return
}
// FIXME
httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/python", "application/json", bytes.NewBuffer(reqJSON))
if err != nil {
log.WithError(err).Error("Failed to request execution to agent")
q.setjobFailed(ctx, job)
return
}
json.NewDecoder(httpRes.Body).Decode(&agentRes)
log.WithField("result", agentRes).Info("Job execution finished")
if httpRes.StatusCode != 200 {
log.WithField("res", agentRes).Error("Failed to compile and run code")
q.setjobFailed(ctx, job)
return
}

err = q.setjobResult(ctx, job, res)
if err != nil {
q.setjobFailed(ctx, job)
}
err = q.setjobResult(ctx, job, agentRes)
if err != nil {
q.setjobFailed(ctx, job)
}

}
93 changes: 56 additions & 37 deletions job_queue_rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ type jobQueue struct {
}

type jobStatus struct {
ID string `json:"id"`
Status string `json:"status"`
Command string `json:"command"`
StdErr string `json:"stderr"`
StdOut string `json:"stdout"`
ID string `json:"id"`
Status string `json:"status"`
StdErr string `json:"stderr"`
StdOut string `json:"stdout"`
}

func newJobQueue(endpoint string) jobQueue {
Expand All @@ -34,18 +33,40 @@ func newJobQueue(endpoint string) jobQueue {
log.WithError(err).Fatal("Failed to open a channel")
}

err = ch.ExchangeDeclare(
"jobs_ex", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithError(err).Fatal("Failed to declare an exchange")
}

jobsQ, err := ch.QueueDeclare(
"jobs", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
"jobs_q", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithError(err).Fatal("Failed to declare a queue")
}

err = ch.QueueBind(
jobsQ.Name, // queue name
"jobs_rk", // routing key
"jobs_ex", // exchange
false,
nil)
if err != nil {
log.WithError(err).Fatal("Failed to bind a queue")
}
jobs, err := ch.Consume(
jobsQ.Name, // queue
"", // consumer
Expand All @@ -67,36 +88,35 @@ func newJobQueue(endpoint string) jobQueue {
}
}

func (q jobQueue) getQueueForJob(ctx context.Context, job benchJob) error {
func (q jobQueue) getQueueForJob(ctx context.Context) error {
return q.ch.ExchangeDeclare(
"job_status", // name
"direct", // type
false, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
"jobs_status_ex", // name
"direct", // type
false, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}

func (q jobQueue) setjobStatus(ctx context.Context, job benchJob, status string) error {
log.WithField("status", status).Info("Set job status")
jobStatus := &jobStatus{
ID: job.ID,
Status: status,
Command: job.Command,
StdErr: "",
StdOut: "",
ID: job.ID,
Status: status,
StdErr: "",
StdOut: "",
}
b, err := json.Marshal(jobStatus)
if err != nil {
return err
}
err = q.ch.Publish(
"job_status", // exchange
job.ID, // routing key
false, // mandatory
false, // immediate
"jobs_status_ex", // exchange
"jobs_status_rk", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: b,
Expand All @@ -117,11 +137,10 @@ func (q jobQueue) setjobFailed(ctx context.Context, job benchJob) error {
}
func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecRes) error {
jobStatus := &jobStatus{
ID: job.ID,
Status: "done",
Command: job.Command,
StdErr: res.StdErr,
StdOut: res.StdOut,
ID: job.ID,
Status: "done",
StdErr: res.StdErr,
StdOut: res.StdOut,
}
log.WithField("jobStatus", jobStatus).Info("Set job result")

Expand All @@ -130,10 +149,10 @@ func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecR
return err
}
err = q.ch.Publish(
"job_status", // exchange
job.ID, // routing key
false, // mandatory
false, // immediate
"jobs_status_ex", // exchange
"jobs_status_rk", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: b,
Expand Down
21 changes: 16 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"syscall"

firecracker "github.com/firecracker-microvm/firecracker-go-sdk"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)

type benchJob struct {
ID string `json:"id"`
Type string `json:"type"`
Command string `json:"command"`
Variant string `json:"variant"`
Code string `json:"code"`
}

Expand All @@ -29,8 +29,9 @@ type agentExecReq struct {
}

type agentRunReq struct {
ID string `json:"id"`
Code string `json:"code"`
ID string `json:"id"`
Variant string `json:"variant"`
Code string `json:"code"`
}

type agentExecRes struct {
Expand Down Expand Up @@ -60,10 +61,20 @@ func main() {
installSignalHandlers()
log.SetReportCaller(true)

q = newJobQueue("amqp://admin:admin@localhost:5672/")
rabbitMQURL := os.Getenv("RABBITMQ_URL")
if len(rabbitMQURL) == 0 {
logrus.Fatal("Missing RABBITMQ_URL env variable")
}
q = newJobQueue(rabbitMQURL)
defer q.ch.Close()
defer q.conn.Close()

err := q.getQueueForJob(ctx)
if err != nil {
log.WithError(err).Fatal("Failed to get status queue")
return
}

log.Info("Waiting for RabbitMQ jobs...")
for d := range q.jobs {
log.Printf("Received a message: %s", d.Body)
Expand Down