Skip to content

Commit

Permalink
Demo checker running
Browse files Browse the repository at this point in the history
  • Loading branch information
pomo-mondreganto committed Sep 20, 2023
1 parent b349844 commit d0a4e6b
Show file tree
Hide file tree
Showing 13 changed files with 1,833 additions and 13 deletions.
39 changes: 39 additions & 0 deletions cmd/checkers/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"log"

"github.com/c4t-but-s4d/fastad/internal/checkers"
"github.com/c4t-but-s4d/fastad/internal/logging"
"github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
logging.Init()

temporalClient, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Logger: logging.NewTemporalAdapter(
logrus.WithFields(logrus.Fields{
"component": "checkers_worker",
}),
),
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer temporalClient.Close()

checkersWorker := worker.New(temporalClient, "checkers", worker.Options{})
checkersWorker.RegisterWorkflow(checkers.WorkflowDefinition)

checkersWorker.RegisterActivity(checkers.CheckActivityDefinition)
checkersWorker.RegisterActivity(checkers.PutActivityDefinition)
checkersWorker.RegisterActivity(checkers.GetActivityDefinition)

if err := checkersWorker.Run(worker.InterruptCh()); err != nil {
logrus.Fatalf("Unable to start workers: %v", err)
}
}
12 changes: 2 additions & 10 deletions cmd/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"
"time"

"github.com/c4t-but-s4d/fastad/internal/logging"
"github.com/c4t-but-s4d/fastad/internal/multiproto"
"github.com/c4t-but-s4d/fastad/internal/pinger"
"github.com/c4t-but-s4d/fastad/internal/receiver"
Expand All @@ -21,7 +22,7 @@ import (
)

func main() {
initLogger()
logging.Init()

logrus.Info("Starting flag receiver")

Expand Down Expand Up @@ -53,12 +54,3 @@ func main() {
logrus.Fatalf("Shutting down http server: %v", err)
}
}

func initLogger() {
mainFormatter := &logrus.TextFormatter{}
mainFormatter.FullTimestamp = true
mainFormatter.ForceColors = true
mainFormatter.PadLevelText = true
mainFormatter.TimestampFormat = "2006-01-02 15:04:05"
logrus.SetFormatter(mainFormatter)
}
37 changes: 37 additions & 0 deletions cmd/ticker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
"os/signal"
"syscall"
"time"

"github.com/c4t-but-s4d/fastad/internal/logging"
"github.com/c4t-but-s4d/fastad/internal/ticker"
"github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
)

func main() {
logging.Init()

temporalClient, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Logger: logging.NewTemporalAdapter(
logrus.WithFields(logrus.Fields{
"component": "ticker",
}),
),
})
if err != nil {
logrus.Fatalf("dialing temporal: %v", err)
}
defer temporalClient.Close()

t := ticker.NewTicker(time.Second*10, temporalClient)

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

t.Run(ctx)
}
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,39 @@ go 1.21
require (
github.com/improbable-eng/grpc-web v0.15.0
github.com/sirupsen/logrus v1.9.3
go.temporal.io/sdk v1.24.0
golang.org/x/net v0.15.0
google.golang.org/grpc v1.58.1
google.golang.org/protobuf v1.31.0
logur.dev/adapter/logrus v0.5.0
logur.dev/logur v0.17.0
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/rs/cors v1.10.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.3 // indirect
go.temporal.io/api v1.21.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230913181813-007df8e322eb // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
)
1,400 changes: 1,397 additions & 3 deletions go.sum

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions internal/checkers/activity_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package checkers

import (
"context"
"fmt"

"github.com/sirupsen/logrus"
)

type CheckActivityParameters struct {
Team string
Service string
}

type CheckActivityResult struct {
Success bool
}

func CheckActivityDefinition(ctx context.Context, params CheckActivityParameters) (*CheckActivityResult, error) {

Check warning on line 19 in internal/checkers/activity_check.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 19 in internal/checkers/activity_check.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
logrus.Infof("running check %s/%s", params.Team, params.Service)
if params.Team == "team1" && params.Service == "service1" {
return &CheckActivityResult{Success: true}, nil
}
if params.Team == "team2" && params.Service == "service1" {
return &CheckActivityResult{Success: true}, nil
}
return nil, fmt.Errorf("unknown check %s/%s", params.Team, params.Service)

Check failure on line 27 in internal/checkers/activity_check.go

View workflow job for this annotation

GitHub Actions / lint-go

error returned from external package is unwrapped: sig: func fmt.Errorf(format string, a ...any) error (wrapcheck)

Check failure on line 27 in internal/checkers/activity_check.go

View workflow job for this annotation

GitHub Actions / lint-go

error returned from external package is unwrapped: sig: func fmt.Errorf(format string, a ...any) error (wrapcheck)
}
21 changes: 21 additions & 0 deletions internal/checkers/activity_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package checkers

import (
"context"

"github.com/sirupsen/logrus"
)

type GetActivityParameters struct {
Team string
Service string
}

type GetActivityResult struct {
Success bool
}

func GetActivityDefinition(ctx context.Context, params GetActivityParameters) (*GetActivityResult, error) {

Check warning on line 18 in internal/checkers/activity_get.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 18 in internal/checkers/activity_get.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
logrus.Infof("running get %s/%s", params.Team, params.Service)
return nil, nil
}
24 changes: 24 additions & 0 deletions internal/checkers/activity_last.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package checkers

import (
"context"

"github.com/sirupsen/logrus"
)

type LastActivityParameters struct {
Team string
Service string

CheckResult *CheckActivityResult
PutResults []*PutActivityResult
GetResults []*GetActivityResult
}

func LastActivityDefinition(ctx context.Context, params LastActivityParameters) error {

Check warning on line 18 in internal/checkers/activity_last.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 18 in internal/checkers/activity_last.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
logrus.Infof("running last %s/%s", params.Team, params.Service)
logrus.Infof("received check result: %v", params.CheckResult)
logrus.Infof("received put results: %v", params.PutResults)
logrus.Infof("received get results: %v", params.GetResults)
return nil
}
21 changes: 21 additions & 0 deletions internal/checkers/activity_put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package checkers

import (
"context"

"github.com/sirupsen/logrus"
)

type PutActivityParameters struct {
Team string
Service string
}

type PutActivityResult struct {
Success bool
}

func PutActivityDefinition(ctx context.Context, params PutActivityParameters) (*PutActivityResult, error) {

Check warning on line 18 in internal/checkers/activity_put.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 18 in internal/checkers/activity_put.go

View workflow job for this annotation

GitHub Actions / lint-go

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
logrus.Infof("running put %s/%s", params.Team, params.Service)
return nil, nil
}
138 changes: 138 additions & 0 deletions internal/checkers/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package checkers

import (
"time"

"go.temporal.io/sdk/workflow"
)

type WorkflowParameters struct {
Teams []string
Services []string
}

func WorkflowDefinition(ctx workflow.Context, params WorkflowParameters) error {
logger := workflow.GetLogger(ctx)
logger.Info("starting workflow")

wg := workflow.NewWaitGroup(ctx)
wg.Add(len(params.Teams) * len(params.Services))
for _, team := range params.Teams {
for _, service := range params.Services {
team := team
service := service
workflow.Go(ctx, func(ctx workflow.Context) {
defer wg.Done()
runCheckers(ctx, team, service)
})
}
}
wg.Wait(ctx)

return nil
}

func runCheckers(ctx workflow.Context, team string, service string) {
logger := workflow.GetLogger(ctx)

checkActivityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second,
}
checkCtx := workflow.WithActivityOptions(ctx, checkActivityOptions)
var checkResult *CheckActivityResult
if err := workflow.ExecuteActivity(checkCtx, CheckActivityDefinition, CheckActivityParameters{
Team: team,
Service: service,
}).Get(ctx, &checkResult); err != nil {
logger.Error("error in check", "team", team, "service", service, "error", err)
return
}

if !checkResult.Success {
logger.Info("check failed", "team", team, "service", service)
return
}

putResultsChan := workflow.NewBufferedChannel(ctx, 3)

putActivityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second,
}
putCtx := workflow.WithActivityOptions(ctx, putActivityOptions)
for i := 0; i < 3; i++ {
workflow.Go(putCtx, func(ctx workflow.Context) {
var putResult *PutActivityResult
if err := workflow.ExecuteActivity(
putCtx,
PutActivityDefinition,
PutActivityParameters{
Team: team,
Service: service,
},
); err != nil {
logger.Error("error in put", "team", team, "service", service, "error", err)
putResult = &PutActivityResult{Success: false}
}
putResultsChan.Send(ctx, putResult)
})
}

getResultsChan := workflow.NewBufferedChannel(ctx, 3)

getActivityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second,
}
getCtx := workflow.WithActivityOptions(ctx, getActivityOptions)
for i := 0; i < 2; i++ {
workflow.Go(getCtx, func(ctx workflow.Context) {
var getResult *GetActivityResult
if err := workflow.ExecuteActivity(
getCtx,
GetActivityDefinition,
GetActivityParameters{
Team: team,
Service: service,
},
); err != nil {
logger.Error("error in get", "team", team, "service", service, "error", err)
getResult = &GetActivityResult{Success: false}
}
getResultsChan.Send(ctx, getResult)
})
}

putResults := make([]*PutActivityResult, 0, 3)
for i := 0; i < 3; i++ {
var putResult *PutActivityResult
putResultsChan.Receive(ctx, &putResult)
putResults = append(putResults, putResult)
}

getResults := make([]*GetActivityResult, 0, 2)
for i := 0; i < 2; i++ {
var getResult *GetActivityResult
getResultsChan.Receive(ctx, &getResult)
getResults = append(getResults, getResult)
}

lastActivityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second,
}
lastCtx := workflow.WithActivityOptions(ctx, lastActivityOptions)
if err := workflow.ExecuteActivity(
lastCtx,
LastActivityDefinition,
LastActivityParameters{
Team: team,
Service: service,

CheckResult: checkResult,
PutResults: putResults,
GetResults: getResults,
},
); err != nil {
logger.Error("error in last", "team", team, "service", service, "error", err)
}

logger.Info("iteration finished", "team", team, "service", service)
}
Loading

0 comments on commit d0a4e6b

Please sign in to comment.