Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CarbonV2 gRPC streaming render #476

Merged
merged 12 commits into from
Jul 18, 2022
Merged
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,12 @@ stats-percentiles = [99, 98, 95, 75, 50]
# path = "/metrics/list_query/"
# max-inflight-requests = 3

# carbonserver.grpc is the configuration for listening for grpc clients.
# Note: currently, only CarbonV2 Render rpc is implemented.
# [carbonserver.grpc]
# enabled = true
# listen = ":7004"

[dump]
# Enable dump/restore function on USR2 signal
enabled = false
Expand Down
3 changes: 2 additions & 1 deletion api/sample/cache-query/cache-query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/go-graphite/go-carbon/helper/carbonpb"
)
Expand All @@ -20,7 +21,7 @@ func main() {
flag.Parse()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, *server, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.DialContext(ctx, *server, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ func (app *App) Start(version string) (err error) {
return
}

if conf.Carbonserver.Grpc.Enabled {
if err = carbonserver.ListenGRPC(conf.Carbonserver.Grpc.Listen); err != nil {
return
}
}

app.Carbonserver = carbonserver
}
/* CARBONSERVER end */
Expand Down
39 changes: 20 additions & 19 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,26 @@ type tagsConfig struct {
}

type carbonserverConfig struct {
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
EmptyResultOk bool `toml:"empty-result-ok"`
MetricsAsCounters bool `toml:"metrics-as-counters"`
TrigramIndex bool `toml:"trigram-index"`
InternalStatsDir string `toml:"internal-stats-dir"`
Percentiles []int `toml:"stats-percentiles"`
CacheScan bool `toml:"cache-scan"`
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
Grpc grpcConfig `toml:"grpc"`
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
EmptyResultOk bool `toml:"empty-result-ok"`
MetricsAsCounters bool `toml:"metrics-as-counters"`
TrigramIndex bool `toml:"trigram-index"`
InternalStatsDir string `toml:"internal-stats-dir"`
Percentiles []int `toml:"stats-percentiles"`
CacheScan bool `toml:"cache-scan"`

MaxMetricsGlobbed int `toml:"max-metrics-globbed"`
MaxMetricsRendered int `toml:"max-metrics-rendered"`
Expand Down
26 changes: 26 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ import (
"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/helper/stat"
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/zapwriter"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"google.golang.org/grpc"
)

type metricStruct struct {
Expand Down Expand Up @@ -208,6 +210,7 @@ func (q *queryCache) getQueryItem(k string, size uint64, expire int32) *QueryIte
}

type CarbonserverListener struct {
grpcv2.UnimplementedCarbonV2Server
helper.Stoppable
cacheGet func(key string) []points.Point
readTimeout time.Duration
Expand All @@ -224,6 +227,7 @@ type CarbonserverListener struct {
forceScanChan chan struct{}
metricsAsCounters bool
tcpListener *net.TCPListener
grpcListener *net.TCPListener
logger *zap.Logger
accessLogger *zap.Logger
internalStatsDir string
Expand Down Expand Up @@ -1566,6 +1570,7 @@ func (listener *CarbonserverListener) Stop() error {
listener.db.Close()
}
listener.tcpListener.Close()
listener.grpcListener.Close()
return nil
}

Expand Down Expand Up @@ -1976,3 +1981,24 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *
timeout: timeout,
}
}

func (listener *CarbonserverListener) ListenGRPC(listen string) error {
var err error
var grpcAddr *net.TCPAddr
grpcAddr, err = net.ResolveTCPAddr("tcp", listen)
if err != nil {
return err
}

listener.grpcListener, err = net.ListenTCP("tcp", grpcAddr)
if err != nil {
return err
}

var opts []grpc.ServerOption

grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}
13 changes: 8 additions & 5 deletions carbonserver/carbonserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ func generalFetchSingleMetricRemove(testData *FetchTest) {
os.Remove(filepath.Join(testData.path, testData.name+".wsp"))
}

func generalFetchSingleMetricHelper(testData *FetchTest, cache *cache.Cache, carbonserver *CarbonserverListener) (*pb.FetchResponse, error) {
data, err := carbonserver.fetchSingleMetricV2(testData.name, int32(testData.from), int32(testData.until))
return data, err
func generalFetchSingleMetricHelper(testData *FetchTest, carbonserver *CarbonserverListener) (*pb.FetchResponse, error) {
data, err := carbonserver.fetchSingleMetric(testData.name, "", int32(testData.from), int32(testData.until))
if err != nil {
return nil, err
}
return data.proto2(), nil
}

func testFetchSingleMetricHelper(testData *FetchTest, cache *cache.Cache, carbonserver *CarbonserverListener) (*pb.FetchResponse, error) {
Expand All @@ -182,7 +185,7 @@ func testFetchSingleMetricHelper(testData *FetchTest, cache *cache.Cache, carbon
return nil, err
}
defer generalFetchSingleMetricRemove(testData)
data, err := generalFetchSingleMetricHelper(testData, cache, carbonserver)
data, err := generalFetchSingleMetricHelper(testData, carbonserver)
return data, err
}

Expand Down Expand Up @@ -514,7 +517,7 @@ func benchmarkFetchSingleMetricCommon(b *testing.B, test *FetchTest) {

b.ResetTimer()
for runs := 0; runs < b.N; runs++ {
data, err := generalFetchSingleMetricHelper(test, cache, carbonserver)
data, err := generalFetchSingleMetricHelper(test, carbonserver)
if err != nil {
b.Errorf("Unexpected error: %v", err)
return
Expand Down
18 changes: 0 additions & 18 deletions carbonserver/fetchsinglemetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,3 @@ func (listener *CarbonserverListener) fetchSingleMetric(metric string, pathExpre
return response{}, err
}
}

func (listener *CarbonserverListener) fetchSingleMetricV2(metric string, fromTime, untilTime int32) (*protov2.FetchResponse, error) {
resp, err := listener.fetchSingleMetric(metric, "", fromTime, untilTime)
if err != nil {
return nil, err
}

return resp.proto2(), nil
}

func (listener *CarbonserverListener) fetchSingleMetricV3(metric string, pathExpression string, fromTime, untilTime int32) (*protov3.FetchResponse, error) {
resp, err := listener.fetchSingleMetric(metric, pathExpression, fromTime, untilTime)
if err != nil {
return nil, err
}

return resp.proto3(), nil
}