diff --git a/job.go b/job.go index c667685..2f2cc2f 100644 --- a/job.go +++ b/job.go @@ -12,13 +12,7 @@ import ( func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) { log.WithField("job", job).Info("Handling job") - err := q.getQueueForJob(ctx, job) - if err != nil { - log.WithError(err).Fatal("Failed to get status queue") - return - } - - err = q.setjobReceived(ctx, job) + err := q.setjobReceived(ctx, job) if err != nil { q.setjobFailed(ctx, job) return @@ -35,27 +29,16 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) defer vm.shutDown() var reqJSON []byte - switch job.Type { - case "command": - reqJSON, err = json.Marshal(agentExecReq{ - ID: job.ID, - Command: job.Command, - }) - if err != nil { - log.WithError(err).Error("Failed to marshal JSON request") - q.setjobFailed(ctx, job) - return - } - case "code": - reqJSON, err = json.Marshal(agentRunReq{ - ID: job.ID, - Code: job.Code, - }) - if err != nil { - log.WithError(err).Error("Failed to marshal JSON request") - q.setjobFailed(ctx, job) - return - } + + reqJSON, err = json.Marshal(agentRunReq{ + ID: job.ID, + Variant: job.Variant, + Code: job.Code, + }) + if err != nil { + log.WithError(err).Error("Failed to marshal JSON request") + q.setjobFailed(ctx, job) + return } err = q.setjobRunning(ctx, job) @@ -65,44 +48,26 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) } var httpRes *http.Response - var res agentExecRes - - switch job.Type { - case "command": - httpRes, err = http.Post("http://"+vm.ip.String()+":8080/exec", "application/json", bytes.NewBuffer(reqJSON)) - if err != nil || httpRes.StatusCode != 200 { - log.WithError(err).Error("Failed to request execution to agent") - q.setjobFailed(ctx, job) - return - } - json.NewDecoder(httpRes.Body).Decode(&res) - log.WithField("result", res).Info("Job execution finished") - - err = q.setjobResult(ctx, job, res) - if err != nil { - q.setjobFailed(ctx, job) - } + var agentRes agentExecRes - case "code": - httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/c", "application/json", bytes.NewBuffer(reqJSON)) - if err != nil { - log.WithError(err).Error("Failed to request execution to agent") - q.setjobFailed(ctx, job) - return - } - json.NewDecoder(httpRes.Body).Decode(&res) - log.WithField("result", res).Info("Job execution finished") - - if httpRes.StatusCode != 200 { - log.WithField("res", res).Error("Failed to compile and run code") - q.setjobFailed(ctx, job) - return - } + // FIXME + httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/python", "application/json", bytes.NewBuffer(reqJSON)) + if err != nil { + log.WithError(err).Error("Failed to request execution to agent") + q.setjobFailed(ctx, job) + return + } + json.NewDecoder(httpRes.Body).Decode(&agentRes) + log.WithField("result", agentRes).Info("Job execution finished") + if httpRes.StatusCode != 200 { + log.WithField("res", agentRes).Error("Failed to compile and run code") + q.setjobFailed(ctx, job) + return + } - err = q.setjobResult(ctx, job, res) - if err != nil { - q.setjobFailed(ctx, job) - } + err = q.setjobResult(ctx, job, agentRes) + if err != nil { + q.setjobFailed(ctx, job) } } diff --git a/job_queue_rabbitmq.go b/job_queue_rabbitmq.go index 6877a24..3ef7437 100644 --- a/job_queue_rabbitmq.go +++ b/job_queue_rabbitmq.go @@ -16,11 +16,10 @@ type jobQueue struct { } type jobStatus struct { - ID string `json:"id"` - Status string `json:"status"` - Command string `json:"command"` - StdErr string `json:"stderr"` - StdOut string `json:"stdout"` + ID string `json:"id"` + Status string `json:"status"` + StdErr string `json:"stderr"` + StdOut string `json:"stdout"` } func newJobQueue(endpoint string) jobQueue { @@ -34,18 +33,40 @@ func newJobQueue(endpoint string) jobQueue { log.WithError(err).Fatal("Failed to open a channel") } + err = ch.ExchangeDeclare( + "jobs_ex", // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + log.WithError(err).Fatal("Failed to declare an exchange") + } + jobsQ, err := ch.QueueDeclare( - "jobs", // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + "jobs_q", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments ) if err != nil { log.WithError(err).Fatal("Failed to declare a queue") } + err = ch.QueueBind( + jobsQ.Name, // queue name + "jobs_rk", // routing key + "jobs_ex", // exchange + false, + nil) + if err != nil { + log.WithError(err).Fatal("Failed to bind a queue") + } jobs, err := ch.Consume( jobsQ.Name, // queue "", // consumer @@ -67,36 +88,35 @@ func newJobQueue(endpoint string) jobQueue { } } -func (q jobQueue) getQueueForJob(ctx context.Context, job benchJob) error { +func (q jobQueue) getQueueForJob(ctx context.Context) error { return q.ch.ExchangeDeclare( - "job_status", // name - "direct", // type - false, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments + "jobs_status_ex", // name + "direct", // type + false, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) } func (q jobQueue) setjobStatus(ctx context.Context, job benchJob, status string) error { log.WithField("status", status).Info("Set job status") jobStatus := &jobStatus{ - ID: job.ID, - Status: status, - Command: job.Command, - StdErr: "", - StdOut: "", + ID: job.ID, + Status: status, + StdErr: "", + StdOut: "", } b, err := json.Marshal(jobStatus) if err != nil { return err } err = q.ch.Publish( - "job_status", // exchange - job.ID, // routing key - false, // mandatory - false, // immediate + "jobs_status_ex", // exchange + "jobs_status_rk", // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: b, @@ -117,11 +137,10 @@ func (q jobQueue) setjobFailed(ctx context.Context, job benchJob) error { } func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecRes) error { jobStatus := &jobStatus{ - ID: job.ID, - Status: "done", - Command: job.Command, - StdErr: res.StdErr, - StdOut: res.StdOut, + ID: job.ID, + Status: "done", + StdErr: res.StdErr, + StdOut: res.StdOut, } log.WithField("jobStatus", jobStatus).Info("Set job result") @@ -130,10 +149,10 @@ func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecR return err } err = q.ch.Publish( - "job_status", // exchange - job.ID, // routing key - false, // mandatory - false, // immediate + "jobs_status_ex", // exchange + "jobs_status_rk", // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: b, diff --git a/main.go b/main.go index 9f0bf12..48425f2 100644 --- a/main.go +++ b/main.go @@ -13,13 +13,13 @@ import ( "syscall" firecracker "github.com/firecracker-microvm/firecracker-go-sdk" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) type benchJob struct { ID string `json:"id"` - Type string `json:"type"` - Command string `json:"command"` + Variant string `json:"variant"` Code string `json:"code"` } @@ -29,8 +29,9 @@ type agentExecReq struct { } type agentRunReq struct { - ID string `json:"id"` - Code string `json:"code"` + ID string `json:"id"` + Variant string `json:"variant"` + Code string `json:"code"` } type agentExecRes struct { @@ -60,10 +61,20 @@ func main() { installSignalHandlers() log.SetReportCaller(true) - q = newJobQueue("amqp://admin:admin@localhost:5672/") + rabbitMQURL := os.Getenv("RABBITMQ_URL") + if len(rabbitMQURL) == 0 { + logrus.Fatal("Missing RABBITMQ_URL env variable") + } + q = newJobQueue(rabbitMQURL) defer q.ch.Close() defer q.conn.Close() + err := q.getQueueForJob(ctx) + if err != nil { + log.WithError(err).Fatal("Failed to get status queue") + return + } + log.Info("Waiting for RabbitMQ jobs...") for d := range q.jobs { log.Printf("Received a message: %s", d.Body)