Skip to content

Commit

Permalink
feat: added foundation for distributor
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Dec 27, 2020
1 parent 33d1969 commit 4773ffc
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 2 deletions.
13 changes: 11 additions & 2 deletions README.md
@@ -1,4 +1,5 @@
# delta
# Delta

![build](https://github.com/AndrewNeudegg/delta/workflows/build/badge.svg) [![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2FAndrewNeudegg%2Fdelta.svg?type=shield)](https://app.fossa.com/projects/git%2Bgithub.com%2FAndrewNeudegg%2Fdelta?ref=badge_shield) [![codecov](https://codecov.io/gh/AndrewNeudegg/delta/branch/main/graph/badge.svg?token=PZNGIZGN2V)](https://codecov.io/gh/AndrewNeudegg/delta) [![Go Report Card](https://goreportcard.com/badge/github.com/andrewneudegg/delta)](https://goreportcard.com/report/github.com/andrewneudegg/delta) ![coverage](https://github.com/AndrewNeudegg/delta/workflows/coverage/badge.svg)


Expand All @@ -11,12 +12,13 @@ will be a combination of the avalibale ingredients with some additions and some

## Table of Contents

- [delta](#delta)
- [Delta](#delta)
- [Overview](#overview)
- [Table of Contents](#table-of-contents)
- [Components](#components)
- [Sink](#sink)
- [Bridge](#bridge)
- [Distributor](#distributor)
- [License](#license)

## Components
Expand All @@ -33,6 +35,13 @@ To bridge events you must have an external source of event data that can be cons

Please see the [bridge](cmd/bridge/README.md) specific documentation for more information.

### Distributor

Once you have a source of events, either a sink or a bridge, you can distribute the events to your target applications.

Please see the [distributor](cmd/distributor/README.md) specific documentation for more information.


## License

[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2FAndrewNeudegg%2Fdelta.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2FAndrewNeudegg%2Fdelta?ref=badge_large)
4 changes: 4 additions & 0 deletions cmd/distributor/README.md
@@ -0,0 +1,4 @@
# Distributor

The distributor is the mechanism for transporting messages that have been bridged or sunk from their relative
inputs to target applications.
23 changes: 23 additions & 0 deletions cmd/distributor/apputil/state.go
@@ -0,0 +1,23 @@
package apputil

import (
"context"
"sync"

"github.com/andrewneudegg/delta/pkg/probes"
"github.com/andrewneudegg/delta/pkg/telemetry"
log "github.com/sirupsen/logrus"
)

// AppState contains the information for the app to run.
type AppState struct {
Probes *probes.ProbeServer
Logger *log.Logger
TelemetryServer *telemetry.PrometheusServer
wg sync.WaitGroup
}

// Block the application indefinitely on the given context.
func (a *AppState) Block(ctx context.Context) {
<-ctx.Done()
}
93 changes: 93 additions & 0 deletions cmd/distributor/main.go
@@ -0,0 +1,93 @@
package main

import (
"context"
"os"

"github.com/andrewneudegg/delta/cmd/distributor/apputil"
"github.com/andrewneudegg/delta/cmd/distributor/subcmd/naive"
"github.com/andrewneudegg/delta/pkg/probes"
"github.com/andrewneudegg/delta/pkg/telemetry"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

func main() {
app()
}

func configureLogger(verbose bool) *log.Logger {
logger := log.New()

logger.SetFormatter(&log.TextFormatter{
DisableColors: false,
FullTimestamp: true,
})
// Output to stdout instead of the default stderr
// Can be any io.Writer, see below for File example
logger.SetOutput(os.Stdout)

// Only log the warning severity or above.
if verbose {
logger.SetLevel(log.DebugLevel)
} else {
logger.SetLevel(log.InfoLevel)
}

return logger
}

func configureProbes(probesEnabled bool) *probes.ProbeServer {
probes := probes.ProbeServer{
ListenAddr: ":8082",
}

if probesEnabled {
go probes.StartProbeServer()
}

probes.AliveNow()
return &probes
}

func configureTelemetryServer(telemetryEnabled bool) *telemetry.PrometheusServer {
prometheusServer := telemetry.PrometheusServer{
ListenAddr: ":8081",
Route: "/metrics",
}

if telemetryEnabled {
go prometheusServer.Serve(context.TODO())
}

return &prometheusServer
}

func app() error {
var verboseMode bool
var telemetryEnabled bool
var probesEnabled bool

appState := apputil.AppState{
Probes: nil,
Logger: nil,
TelemetryServer: nil,
}

var rootCmd = &cobra.Command{
Use: "distributor",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
appState.Logger = configureLogger(verboseMode)
appState.Probes = configureProbes(probesEnabled)
appState.TelemetryServer = configureTelemetryServer(telemetryEnabled)
return nil
},
}

rootCmd.PersistentFlags().BoolVarP(&verboseMode, "verbose", "v", false, "verbose output")
rootCmd.PersistentFlags().BoolVarP(&telemetryEnabled, "telemetry", "", false, "telemetry server")
rootCmd.PersistentFlags().BoolVarP(&probesEnabled, "probes", "", false, "liveness / readiness probes")

rootCmd.AddCommand(naive.Cmd(&appState))
return rootCmd.Execute()
}
23 changes: 23 additions & 0 deletions cmd/distributor/subcmd/naive/distributor.go
@@ -0,0 +1,23 @@
package naive

import (
"context"

"github.com/spf13/cobra"

"github.com/andrewneudegg/delta/cmd/distributor/apputil"
)

// Cmd demonstrates how to configure a new subcommand.
func Cmd(appState *apputil.AppState) *cobra.Command {
return &cobra.Command{
Use: "naive",
Short: "naively distribute events to a specific target/s",
Long: `naively distribute events to a specific target/s without any leader election`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
appState.Logger.Debug("starting example cmd")
appState.Block(context.TODO())
},
}
}

0 comments on commit 4773ffc

Please sign in to comment.