Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CarbonV2 gRPC streaming render #476

Merged
merged 12 commits into from
Jul 18, 2022
Merged
3 changes: 2 additions & 1 deletion api/sample/cache-query/cache-query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/go-graphite/go-carbon/helper/carbonpb"
)
Expand All @@ -20,7 +21,7 @@ func main() {
flag.Parse()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, *server, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.DialContext(ctx, *server, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ func (app *App) Start(version string) (err error) {
return
}

if conf.Carbonserver.Grpc.Enabled {
if err = carbonserver.ListenGRPC(conf.Carbonserver.Grpc.Listen); err != nil {
return
}
}

app.Carbonserver = carbonserver
}
/* CARBONSERVER end */
Expand Down
39 changes: 20 additions & 19 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,26 @@ type tagsConfig struct {
}

type carbonserverConfig struct {
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
EmptyResultOk bool `toml:"empty-result-ok"`
MetricsAsCounters bool `toml:"metrics-as-counters"`
TrigramIndex bool `toml:"trigram-index"`
InternalStatsDir string `toml:"internal-stats-dir"`
Percentiles []int `toml:"stats-percentiles"`
CacheScan bool `toml:"cache-scan"`
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
Grpc grpcConfig `toml:"grpc"`
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
EmptyResultOk bool `toml:"empty-result-ok"`
MetricsAsCounters bool `toml:"metrics-as-counters"`
TrigramIndex bool `toml:"trigram-index"`
InternalStatsDir string `toml:"internal-stats-dir"`
Percentiles []int `toml:"stats-percentiles"`
CacheScan bool `toml:"cache-scan"`

MaxMetricsGlobbed int `toml:"max-metrics-globbed"`
MaxMetricsRendered int `toml:"max-metrics-rendered"`
Expand Down
26 changes: 26 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ import (
"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/helper/stat"
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/zapwriter"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"google.golang.org/grpc"
)

type metricStruct struct {
Expand Down Expand Up @@ -208,6 +210,7 @@ func (q *queryCache) getQueryItem(k string, size uint64, expire int32) *QueryIte
}

type CarbonserverListener struct {
grpcv2.UnimplementedCarbonV2Server
helper.Stoppable
cacheGet func(key string) []points.Point
readTimeout time.Duration
Expand All @@ -224,6 +227,7 @@ type CarbonserverListener struct {
forceScanChan chan struct{}
metricsAsCounters bool
tcpListener *net.TCPListener
grpcListener *net.TCPListener
logger *zap.Logger
accessLogger *zap.Logger
internalStatsDir string
Expand Down Expand Up @@ -1566,6 +1570,7 @@ func (listener *CarbonserverListener) Stop() error {
listener.db.Close()
}
listener.tcpListener.Close()
listener.grpcListener.Close()
return nil
}

Expand Down Expand Up @@ -1976,3 +1981,24 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *
timeout: timeout,
}
}

func (listener *CarbonserverListener) ListenGRPC(listen string) error {
var err error
var grpcAddr *net.TCPAddr
grpcAddr, err = net.ResolveTCPAddr("tcp", listen)
if err != nil {
return err
}

listener.grpcListener, err = net.ListenTCP("tcp", grpcAddr)
if err != nil {
return err
}

var opts []grpc.ServerOption

grpcServer := grpc.NewServer(opts...)
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}