Skip to content

Commit

Permalink
cmd/relui: create pubsub topic on start
Browse files Browse the repository at this point in the history
This will create and connect to a pubsub topic for communicating with
relui workers on application start. If the topic already exists, it will
just get a reference to the topic.

For golang/go#40279

Co-authored-by: Carlos Amedee <carlos@golang.org>
Change-Id: Ic173212cd15562b9d1a1cc601d307d5ee1a4e811
Reviewed-on: https://go-review.googlesource.com/c/build/+/257237
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
  • Loading branch information
toothrot and cagedmantis committed Sep 25, 2020
1 parent 3b519a6 commit fac8f1e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
29 changes: 28 additions & 1 deletion cmd/relui/main.go
Expand Up @@ -6,19 +6,25 @@
package main

import (
"context"
"flag"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"

"cloud.google.com/go/pubsub"
"github.com/golang/protobuf/proto"
reluipb "golang.org/x/build/cmd/relui/protos"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
devDataDir = flag.String("dev-data-directory", defaultDevDataDir(), "Development-only directory to use for storage of application state.")
projectID = flag.String("project-id", os.Getenv("PUBSUB_PROJECT_ID"), "Pubsub project ID for communicating with workers. Uses PUBSUB_PROJECT_ID if unset.")
topicID = flag.String("topic-id", "relui-development", "Pubsub topic ID for communicating with workers.")
)

func main() {
Expand All @@ -27,7 +33,12 @@ func main() {
if err := fs.load(); err != nil {
log.Fatalf("Error loading state from %q: %v", *devDataDir, err)
}
s := &server{store: fs, configs: loadWorkflowConfig("./workflows")}
ctx := context.Background()
s := &server{
configs: loadWorkflowConfig("./workflows"),
store: fs,
topic: getTopic(ctx),
}
http.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler))
http.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
http.Handle("/", fileServerHandler(relativeFile("./static"), http.HandlerFunc(s.homeHandler)))
Expand All @@ -39,6 +50,22 @@ func main() {
log.Fatal(http.ListenAndServe(":"+port, http.DefaultServeMux))
}

// getTopic creates and returns a pubsub topic from the project specified in projectId, which is to be used for
// communicating with relui workers.
//
// It is safe to call if a topic already exists. A reference to the topic will be returned.
func getTopic(ctx context.Context) *pubsub.Topic {
client, err := pubsub.NewClient(ctx, *projectID)
if err != nil {
log.Fatalf("pubsub.NewClient(_, %q) = %v, wanted no error", *projectID, err)
}
_, err = client.CreateTopic(ctx, *topicID)
if err != nil && status.Code(err) != codes.AlreadyExists {
log.Fatalf("client.CreateTopic(_, %q) = %v, wanted no error", *topicID, err)
}
return client.Topic(*topicID)
}

// loadWorkflowConfig loads Workflow configuration files from dir. It expects all files to be in textproto format.
func loadWorkflowConfig(dir string) []*reluipb.Workflow {
fs, err := filepath.Glob(filepath.Join(relativeFile(dir), "*.textpb"))
Expand Down
4 changes: 4 additions & 0 deletions cmd/relui/web.go
Expand Up @@ -15,6 +15,7 @@ import (
"path"
"path/filepath"

"cloud.google.com/go/pubsub"
"github.com/golang/protobuf/proto"
reluipb "golang.org/x/build/cmd/relui/protos"
)
Expand Down Expand Up @@ -55,6 +56,9 @@ type server struct {

// store is for persisting application state.
store store

// topic is for communicating with relui workers.
topic *pubsub.Topic
}

type homeResponse struct {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -6,6 +6,7 @@ require (
cloud.google.com/go v0.54.0
cloud.google.com/go/bigquery v1.4.0
cloud.google.com/go/datastore v1.1.0
cloud.google.com/go/pubsub v1.2.0
cloud.google.com/go/storage v1.6.0
github.com/NYTimes/gziphandler v1.1.1
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
Expand Down

0 comments on commit fac8f1e

Please sign in to comment.