Skip to content

Commit

Permalink
Merge pull request #346 from gwos/GROUNDWORK-3074
Browse files Browse the repository at this point in the history
GROUNDWORK-3074 Improve Nats service operations
  • Loading branch information
vannu-25 committed Apr 30, 2024
2 parents d17e12a + 14a3d96 commit 83a4208
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 265 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# NOTE:
# https://stackoverflow.com/questions/36279253/go-compiled-binary-wont-run-in-an-alpine-docker-container-on-ubuntu-host
#
FROM golang:1.21-bullseye AS build-libtransit
FROM golang:1.22-bullseye AS build-libtransit
ARG TRAVIS_TAG=
ENV TRAVIS_TAG=${TRAVIS_TAG:-master}
WORKDIR /go/src/
Expand All @@ -21,7 +21,7 @@ RUN make clean && make \
FROM scratch AS export-libtransit
COPY --from=build-libtransit /go/src/build /

FROM golang:1.21-alpine AS build
FROM golang:1.22-alpine AS build
ARG TRAVIS_TAG=
ENV TRAVIS_TAG=${TRAVIS_TAG:-master}
WORKDIR /go/src/
Expand Down
100 changes: 58 additions & 42 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,45 @@ func (l LogLevel) String() string {
return [...]string{"Error", "Warn", "Info", "Debug"}[l]
}

type Nats struct {
// NatsAckWait is the time the NATS server will wait before resending a message
// Should be greater then the GWClient request duration
NatsAckWait time.Duration `yaml:"-"`
// designates the maximum number of outstanding acknowledgements
// (messages that have been delivered but not acknowledged)
// that NATS Streaming will allow for a given subscription.
// When this limit is reached, NATS Streaming will suspend delivery of messages
// to this subscription until the number of unacknowledged messages falls below the specified limit
NatsMaxInflight int `yaml:"-"`
// NatsMaxPubAcksInflight accepts number of unacknowledged messages
// that a publisher may have in-flight at any given time.
// When this maximum is reached, further async publish calls will block
// until the number of unacknowledged messages falls below the specified limit
NatsMaxPubAcksInflight int `yaml:"-"`
NatsMaxPayload int32 `yaml:"-"`
// NatsMonitorPort enables monitoring on http port useful for debug
// curl 'localhost:8222/streaming/channelsz?limit=0&offset=0&subs=1'
// More info: https://docs.nats.io/nats-streaming-concepts/monitoring
NatsMonitorPort int `yaml:"-"`
NatsStoreDir string `yaml:"natsFilestoreDir"`
// NatsStoreType accepts "FILE"|"MEMORY"
NatsStoreType string `yaml:"natsStoreType"`
// How long messages are kept
NatsStoreMaxAge time.Duration `yaml:"natsStoreMaxAge"`
// How many bytes are allowed per-channel
NatsStoreMaxBytes int64 `yaml:"natsStoreMaxBytes"`
// How many messages are allowed per-channel
NatsStoreMaxMsgs int64 `yaml:"natsStoreMaxMsgs"`
// NatsServerConfigFile is used to override yaml values for
// NATS server configuration (debug only).
NatsServerConfigFile string `yaml:"natsServerConfigFile"`
}

// Hashsum calculates FNV non-cryptographic hash suitable for checking the equality
func (c Nats) Hashsum() ([]byte, error) {
return Hashsum(c)
}

// Connector defines TCG Connector configuration
// see GetConfig() for defaults
type Connector struct {
Expand Down Expand Up @@ -94,37 +133,7 @@ type Connector struct {
LogColors bool `yaml:"logColors"`
LogTimeFormat string `yaml:"logTimeFormat"`

// NatsAckWait is the time the NATS server will wait before resending a message
// Should be greater then the GWClient request duration
NatsAckWait time.Duration `yaml:"-"`
// designates the maximum number of outstanding acknowledgements
// (messages that have been delivered but not acknowledged)
// that NATS Streaming will allow for a given subscription.
// When this limit is reached, NATS Streaming will suspend delivery of messages
// to this subscription until the number of unacknowledged messages falls below the specified limit
NatsMaxInflight int `yaml:"-"`
// NatsMaxPubAcksInflight accepts number of unacknowledged messages
// that a publisher may have in-flight at any given time.
// When this maximum is reached, further async publish calls will block
// until the number of unacknowledged messages falls below the specified limit
NatsMaxPubAcksInflight int `yaml:"-"`
NatsMaxPayload int32 `yaml:"-"`
// NatsMonitorPort enables monitoring on http port useful for debug
// curl 'localhost:8222/streaming/channelsz?limit=0&offset=0&subs=1'
// More info: https://docs.nats.io/nats-streaming-concepts/monitoring
NatsMonitorPort int `yaml:"-"`
NatsStoreDir string `yaml:"natsFilestoreDir"`
// NatsStoreType accepts "FILE"|"MEMORY"
NatsStoreType string `yaml:"natsStoreType"`
// How long messages are kept
NatsStoreMaxAge time.Duration `yaml:"natsStoreMaxAge"`
// How many bytes are allowed per-channel
NatsStoreMaxBytes int64 `yaml:"natsStoreMaxBytes"`
// How many messages are allowed per-channel
NatsStoreMaxMsgs int64 `yaml:"natsStoreMaxMsgs"`
// NatsServerConfigFile is used to override yaml values for
// NATS server configuration (debug only).
NatsServerConfigFile string `yaml:"natsServerConfigFile"`
Nats `yaml:",inline"`

TransportStartRndDelay int `yaml:"-"`
}
Expand Down Expand Up @@ -232,17 +241,19 @@ func defaults() Config {
LogLevel: 1,
LogColors: false,
LogTimeFormat: time.RFC3339,
NatsAckWait: time.Second * 30,
NatsMaxInflight: 4,
NatsMaxPubAcksInflight: 4,
NatsMaxPayload: 1024 * 1024 * 8, // 8MB github.com/nats-io/nats-server/releases/tag/v2.3.4
NatsMonitorPort: 0,
NatsStoreDir: "natsstore",
NatsStoreType: "FILE",
NatsStoreMaxAge: time.Hour * 24 * 10, // 10days
NatsStoreMaxBytes: 1024 * 1024 * 1024 * 20, // 20GB
NatsStoreMaxMsgs: 1_000_000, // 1 000 000
NatsServerConfigFile: "",
Nats: Nats{
NatsAckWait: time.Second * 30,
NatsMaxInflight: 4,
NatsMaxPubAcksInflight: 4,
NatsMaxPayload: 1024 * 1024 * 8, // 8MB github.com/nats-io/nats-server/releases/tag/v2.3.4
NatsMonitorPort: 0,
NatsStoreDir: "natsstore",
NatsStoreType: "FILE",
NatsStoreMaxAge: time.Hour * 24 * 10, // 10days
NatsStoreMaxBytes: 1024 * 1024 * 1024 * 20, // 20GB
NatsStoreMaxMsgs: 1_000_000, // 1 000 000
NatsServerConfigFile: "",
},
TransportStartRndDelay: 60,
},
// create disabled connections to support partial setting with struct-path
Expand Down Expand Up @@ -466,6 +477,11 @@ func (cfg Config) InitTracerProvider() (*tracesdk.TracerProvider, error) {
cfg.Connector.AppType, cfg.Connector.AppName, cfg.Connector.AgentID))
}

// Hashsum calculates FNV non-cryptographic hash suitable for checking the equality
func (cfg Config) Hashsum() ([]byte, error) {
return Hashsum(cfg)
}

func (cfg Config) initLogger() {
opts := []logzer.Option{
logzer.WithCondense(cfg.Connector.LogCondense),
Expand Down
26 changes: 26 additions & 0 deletions config/hashsum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package config

import (
"bytes"
"encoding/json"
"hash/fnv"
)

// Hashsum calculates FNV non-cryptographic hash suitable for checking the equality
func Hashsum(args ...interface{}) ([]byte, error) {
var b bytes.Buffer
for _, arg := range args {
s, err := json.Marshal(arg)
if err != nil {
return nil, err
}
if _, err := b.Write(s); err != nil {
return nil, err
}
}
h := fnv.New128()
if _, err := h.Write(b.Bytes()); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
29 changes: 5 additions & 24 deletions connectors/connectors.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package connectors

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"regexp"
"strconv"
"strings"
"time"

"github.com/gwos/tcg/config"
"github.com/gwos/tcg/sdk/transit"
"github.com/gwos/tcg/services"
"github.com/gwos/tcg/tracing"
Expand Down Expand Up @@ -82,11 +81,9 @@ func Start() error {
// delay transport on application startup
go func() {
td := services.GetTransitService().TransportStartRndDelay
if td > 0 {
upSince := services.GetTransitService().Stats().UpSince.Value()
if time.Since(time.UnixMilli(upSince)).Round(time.Second) < 8 {
time.Sleep(DefaultCheckInterval + time.Second*time.Duration(rand.Intn(td)))
}
upSince := services.GetTransitService().Stats().UpSince.Value()
if td > 0 && time.Since(time.UnixMilli(upSince)).Round(time.Second) < 8 {
time.Sleep(DefaultCheckInterval + time.Second*time.Duration(rand.Intn(td)))
}
_ = services.GetTransitService().StartTransport()
}()
Expand Down Expand Up @@ -586,23 +583,7 @@ func Name(defaultName string, customName string) string {
}

// Hashsum calculates FNV non-cryptographic hash suitable for checking the equality
func Hashsum(args ...interface{}) ([]byte, error) {
var b bytes.Buffer
for _, arg := range args {
s, err := json.Marshal(arg)
if err != nil {
return nil, err
}
if _, err := b.Write(s); err != nil {
return nil, err
}
}
h := fnv.New128()
if _, err := h.Write(b.Bytes()); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
var Hashsum = config.Hashsum

// MaxDuration returns maximum value
func MaxDuration(x time.Duration, rest ...time.Duration) time.Duration {
Expand Down
94 changes: 48 additions & 46 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,61 +1,64 @@
module github.com/gwos/tcg

go 1.21
go 1.22.0

toolchain go1.22.2

require (
github.com/PaesslerAG/gval v1.2.2
github.com/PaesslerAG/jsonpath v0.1.1
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/gin-contrib/cors v1.7.0
github.com/gin-contrib/sessions v0.0.5
github.com/gin-contrib/cors v1.7.1
github.com/gin-contrib/sessions v1.0.0
github.com/gin-gonic/gin v1.9.1
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/gosnmp/gosnmp v1.37.0
github.com/hashicorp/go-uuid v1.0.3
github.com/markel1974/gokuery v1.0.3
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.33.1
github.com/nats-io/nats-server/v2 v2.10.14
github.com/nats-io/nats.go v1.34.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.0
github.com/prometheus/common v0.50.0
github.com/prometheus/prometheus v0.50.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.53.0
github.com/prometheus/prometheus v0.51.2
github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.32.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/shirou/gopsutil/v3 v3.24.2
github.com/shirou/gopsutil/v3 v3.24.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
github.com/tubemogul/nscatools v0.0.0-20170420204428-ca8ef6bde11e
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.49.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
golang.org/x/crypto v0.21.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.51.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0
go.opentelemetry.io/otel/sdk v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
golang.org/x/crypto v0.22.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
k8s.io/metrics v0.29.2
k8s.io/api v0.30.0
k8s.io/apimachinery v0.30.0
k8s.io/client-go v0.30.0
k8s.io/metrics v0.30.0
)

require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
Expand Down Expand Up @@ -83,7 +86,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -93,41 +96,40 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/nats-io/jwt/v2 v2.5.6 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect
github.com/shurcooL/vfsgen v0.0.0-20230704071429-0000e147ea92 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/grpc v1.62.1 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect
k8s.io/kube-openapi v0.0.0-20240423202451-8948a665c108 // indirect
k8s.io/utils v0.0.0-20240423183400-0849a56e8f22 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down

0 comments on commit 83a4208

Please sign in to comment.