Skip to content

Commit

Permalink
Merge pull request #476 from go-graphite/emadolsky/grpc-streaming-render
Browse files Browse the repository at this point in the history
CarbonV2 gRPC streaming render
  • Loading branch information
emadolsky committed Jul 18, 2022
2 parents db912df + 3f53995 commit cc8eeda
Show file tree
Hide file tree
Showing 133 changed files with 10,952 additions and 2,742 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,12 @@ stats-percentiles = [99, 98, 95, 75, 50]
# path = "/metrics/list_query/"
# max-inflight-requests = 3

# carbonserver.grpc is the configuration for listening for grpc clients.
# Note: currently, only CarbonV2 Render rpc is implemented.
# [carbonserver.grpc]
# enabled = true
# listen = ":7004"

[dump]
# Enable dump/restore function on USR2 signal
enabled = false
Expand Down
3 changes: 2 additions & 1 deletion api/sample/cache-query/cache-query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/go-graphite/go-carbon/helper/carbonpb"
)
Expand All @@ -20,7 +21,7 @@ func main() {
flag.Parse()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, *server, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.DialContext(ctx, *server, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ func (app *App) Start(version string) (err error) {
return
}

if conf.Carbonserver.Grpc.Enabled {
if err = carbonserver.ListenGRPC(conf.Carbonserver.Grpc.Listen); err != nil {
return
}
}

app.Carbonserver = carbonserver
}
/* CARBONSERVER end */
Expand Down
39 changes: 20 additions & 19 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,26 @@ type tagsConfig struct {
}

type carbonserverConfig struct {
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
EmptyResultOk bool `toml:"empty-result-ok"`
MetricsAsCounters bool `toml:"metrics-as-counters"`
TrigramIndex bool `toml:"trigram-index"`
InternalStatsDir string `toml:"internal-stats-dir"`
Percentiles []int `toml:"stats-percentiles"`
CacheScan bool `toml:"cache-scan"`
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
Grpc grpcConfig `toml:"grpc"`
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
EmptyResultOk bool `toml:"empty-result-ok"`
MetricsAsCounters bool `toml:"metrics-as-counters"`
TrigramIndex bool `toml:"trigram-index"`
InternalStatsDir string `toml:"internal-stats-dir"`
Percentiles []int `toml:"stats-percentiles"`
CacheScan bool `toml:"cache-scan"`

MaxMetricsGlobbed int `toml:"max-metrics-globbed"`
MaxMetricsRendered int `toml:"max-metrics-rendered"`
Expand Down
26 changes: 26 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ import (
"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/helper/stat"
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/zapwriter"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"google.golang.org/grpc"
)

type metricStruct struct {
Expand Down Expand Up @@ -208,6 +210,7 @@ func (q *queryCache) getQueryItem(k string, size uint64, expire int32) *QueryIte
}

type CarbonserverListener struct {
grpcv2.UnimplementedCarbonV2Server
helper.Stoppable
cacheGet func(key string) []points.Point
readTimeout time.Duration
Expand All @@ -224,6 +227,7 @@ type CarbonserverListener struct {
forceScanChan chan struct{}
metricsAsCounters bool
tcpListener *net.TCPListener
grpcListener *net.TCPListener
logger *zap.Logger
accessLogger *zap.Logger
internalStatsDir string
Expand Down Expand Up @@ -1566,6 +1570,7 @@ func (listener *CarbonserverListener) Stop() error {
listener.db.Close()
}
listener.tcpListener.Close()
listener.grpcListener.Close()
return nil
}

Expand Down Expand Up @@ -1976,3 +1981,24 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *
timeout: timeout,
}
}

func (listener *CarbonserverListener) ListenGRPC(listen string) error {
var err error
var grpcAddr *net.TCPAddr
grpcAddr, err = net.ResolveTCPAddr("tcp", listen)
if err != nil {
return err
}

listener.grpcListener, err = net.ListenTCP("tcp", grpcAddr)
if err != nil {
return err
}

var opts []grpc.ServerOption

grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}
13 changes: 8 additions & 5 deletions carbonserver/carbonserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ func generalFetchSingleMetricRemove(testData *FetchTest) {
os.Remove(filepath.Join(testData.path, testData.name+".wsp"))
}

func generalFetchSingleMetricHelper(testData *FetchTest, cache *cache.Cache, carbonserver *CarbonserverListener) (*pb.FetchResponse, error) {
data, err := carbonserver.fetchSingleMetricV2(testData.name, int32(testData.from), int32(testData.until))
return data, err
func generalFetchSingleMetricHelper(testData *FetchTest, carbonserver *CarbonserverListener) (*pb.FetchResponse, error) {
data, err := carbonserver.fetchSingleMetric(testData.name, "", int32(testData.from), int32(testData.until))
if err != nil {
return nil, err
}
return data.proto2(), nil
}

func testFetchSingleMetricHelper(testData *FetchTest, cache *cache.Cache, carbonserver *CarbonserverListener) (*pb.FetchResponse, error) {
Expand All @@ -182,7 +185,7 @@ func testFetchSingleMetricHelper(testData *FetchTest, cache *cache.Cache, carbon
return nil, err
}
defer generalFetchSingleMetricRemove(testData)
data, err := generalFetchSingleMetricHelper(testData, cache, carbonserver)
data, err := generalFetchSingleMetricHelper(testData, carbonserver)
return data, err
}

Expand Down Expand Up @@ -514,7 +517,7 @@ func benchmarkFetchSingleMetricCommon(b *testing.B, test *FetchTest) {

b.ResetTimer()
for runs := 0; runs < b.N; runs++ {
data, err := generalFetchSingleMetricHelper(test, cache, carbonserver)
data, err := generalFetchSingleMetricHelper(test, carbonserver)
if err != nil {
b.Errorf("Unexpected error: %v", err)
return
Expand Down
18 changes: 0 additions & 18 deletions carbonserver/fetchsinglemetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,3 @@ func (listener *CarbonserverListener) fetchSingleMetric(metric string, pathExpre
return response{}, err
}
}

func (listener *CarbonserverListener) fetchSingleMetricV2(metric string, fromTime, untilTime int32) (*protov2.FetchResponse, error) {
resp, err := listener.fetchSingleMetric(metric, "", fromTime, untilTime)
if err != nil {
return nil, err
}

return resp.proto2(), nil
}

func (listener *CarbonserverListener) fetchSingleMetricV3(metric string, pathExpression string, fromTime, untilTime int32) (*protov3.FetchResponse, error) {
resp, err := listener.fetchSingleMetric(metric, pathExpression, fromTime, untilTime)
if err != nil {
return nil, err
}

return resp.proto3(), nil
}

0 comments on commit cc8eeda

Please sign in to comment.