Skip to content

Commit

Permalink
Move ack to mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewmarklloyd committed Apr 26, 2023
1 parent 7964808 commit 97d5109
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 32 deletions.
35 changes: 6 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
Expand All @@ -26,8 +25,8 @@ var messageClient mqtt.MqttClient
var awsClient aws.Client

const (
clientID = "math-visual-proofs-server"
clonePath = "/tmp/working"
clientID = "math-visual-proofs-server"
baseClonePath = "/tmp"
)

func main() {
Expand Down Expand Up @@ -60,20 +59,7 @@ func main() {
logger.Fatalf("connecting to mqtt: %s", err)
}

messageClient.Subscribe(mqtt.RenderStartTopic, func(message string) {
renderMessage := mqtt.RenderMessage{}
err := json.Unmarshal([]byte(message), &renderMessage)
if err != nil {
handleError(fmt.Errorf("unmarshalling render message: %w", err), mqtt.UnknownRepoURL, mqtt.UnknownGithubSHA)
return
}

logger.Info("received request to render: ", renderMessage)

if os.Getenv("MOCK_MODE") != "" {
return
}

messageClient.Subscribe(mqtt.RenderStartTopic, logger, func(renderMessage mqtt.RenderMessage) {
err = subscribeHandler(renderMessage)
if err != nil {
handleError(err, renderMessage.RepoURL, renderMessage.GithubSHA)
Expand All @@ -99,6 +85,7 @@ func main() {
}

func subscribeHandler(renderMessage mqtt.RenderMessage) error {
clonePath := fmt.Sprintf("%s/%s", baseClonePath, renderMessage.GithubSHA)
defer os.RemoveAll(clonePath)

err := os.RemoveAll(clonePath)
Expand All @@ -111,22 +98,12 @@ func subscribeHandler(renderMessage mqtt.RenderMessage) error {
return fmt.Errorf("cloning repository %s: %s", renderMessage.RepoURL, err.Error())
}

err = messageClient.PublishRenderFeedbackMessage(mqtt.RenderAckTopic, mqtt.RenderFeedbackMessage{
Status: mqtt.StatusSucceess,
RepoURL: renderMessage.RepoURL,
GithubSHA: renderMessage.GithubSHA,
Message: "successfully cloned repo and started render",
})
if err != nil {
return fmt.Errorf("publishing ack message: %w", err)
}

for _, f := range renderMessage.FileNames {
if _, err := os.Stat(fmt.Sprintf("%s/%s", clonePath, f)); errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("file %s not found, cannot render", renderMessage.FileNames)
}

err = render(f)
err = render(f, clonePath)
if err != nil {
return fmt.Errorf("error rendering: %s", err.Error())
}
Expand All @@ -142,7 +119,7 @@ func subscribeHandler(renderMessage mqtt.RenderMessage) error {
return nil
}

func render(fileName string) error {
func render(fileName, clonePath string) error {
c := fmt.Sprintf(`docker run --rm --user="$(id -u):$(id -g)" -v "%s":/manim manimcommunity/manim:stable manim %s -qm --progress_bar none`, clonePath, fileName)
cmd := exec.Command("bash", "-c", c)
out, err := cmd.CombinedOutput()
Expand Down
40 changes: 37 additions & 3 deletions pkg/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package mqtt

import (
"encoding/json"
"fmt"
"os"

mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
)

type fn func(string)
type fn func(RenderMessage)

type MqttClient struct {
client mqtt.Client
Expand Down Expand Up @@ -38,9 +41,40 @@ func (c MqttClient) Cleanup() {
c.client.Disconnect(250)
}

func (c MqttClient) Subscribe(topic string, subscribeHandler fn) error {
func (c MqttClient) Subscribe(topic string, logger *zap.SugaredLogger, subscribeHandler fn) error {
if token := c.client.Subscribe(topic, 1, func(client mqtt.Client, msg mqtt.Message) {
subscribeHandler(string(msg.Payload()))
renderMessage := RenderMessage{}
err := json.Unmarshal(msg.Payload(), &renderMessage)
if err != nil {
pubErr := c.PublishRenderFeedbackMessage(RenderErrTopic, RenderFeedbackMessage{
Status: StatusSucceess,
RepoURL: UnknownRepoURL,
GithubSHA: UnknownGithubSHA,
Message: fmt.Sprintf("error during render: %s", err.Error()),
})
if pubErr != nil {
logger.Errorf("error publishing to renderErrTopic: %s", pubErr)
}
return
}

logger.Info("received request to render: ", renderMessage)

if os.Getenv("MOCK_MODE") != "" {
return
}

err = c.PublishRenderFeedbackMessage(RenderAckTopic, RenderFeedbackMessage{
Status: StatusSucceess,
RepoURL: renderMessage.RepoURL,
GithubSHA: renderMessage.GithubSHA,
Message: "successfully cloned repo and started render",
})
if err != nil {
logger.Errorf("error publishing to renderErrTopic: %s", err)
}
// wait group here?
subscribeHandler(renderMessage)
}); token.Wait() && token.Error() != nil {
return token.Error()
}
Expand Down

0 comments on commit 97d5109

Please sign in to comment.