From 2e83d3265eeee364af1ac36c28482fa56efa9d6b Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Sun, 10 May 2020 13:41:13 -0700 Subject: [PATCH] Clickhouse storage support --- go.mod | 1 + go.sum | 12 + .../clickhouse/dependencystore/storage.go | 26 ++ plugin/storage/clickhouse/factory.go | 83 ++++++ plugin/storage/clickhouse/options.go | 98 ++++++++ plugin/storage/clickhouse/spanstore/reader.go | 236 ++++++++++++++++++ plugin/storage/clickhouse/spanstore/writer.go | 185 ++++++++++++++ plugin/storage/factory.go | 6 +- 8 files changed, 646 insertions(+), 1 deletion(-) create mode 100644 plugin/storage/clickhouse/dependencystore/storage.go create mode 100644 plugin/storage/clickhouse/factory.go create mode 100644 plugin/storage/clickhouse/options.go create mode 100644 plugin/storage/clickhouse/spanstore/reader.go create mode 100644 plugin/storage/clickhouse/spanstore/writer.go diff --git a/go.mod b/go.mod index e2b440cdf6d..48c6407e4a4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect + github.com/ClickHouse/clickhouse-go v1.4.1-0.20200504172624-7b0f96ec3e5c github.com/DataDog/zstd v1.4.4 // indirect github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f github.com/VividCortex/gohistogram v1.0.0 // indirect diff --git a/go.sum b/go.sum index e19fb3e8f0e..8b66857ccb5 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,10 @@ github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIo github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ClickHouse/clickhouse-go v1.4.0 h1:cC1DEZ1TL74QviZY4svlwow84X5r7/BGd78kf18swhI= +github.com/ClickHouse/clickhouse-go v1.4.0/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/ClickHouse/clickhouse-go v1.4.1-0.20200504172624-7b0f96ec3e5c h1:KeN7vuwwFTb7ozi+TMd5h4uVGrkWimj4krhSEKf2KSk= +github.com/ClickHouse/clickhouse-go v1.4.1-0.20200504172624-7b0f96ec3e5c/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/DataDog/zstd v1.4.4 h1:+IawcoXhCBylN7ccwdwf8LOH2jKq7NavGpEPanrlTzE= github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= @@ -38,6 +42,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= @@ -47,6 +52,8 @@ github.com/bsm/sarama-cluster v2.1.13+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -154,6 +161,7 @@ github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2K github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= github.com/go-openapi/validate v0.19.6 h1:WsKw9J1WzYBVxWRYwLqEk3325RL6G0SSWksuamkk6q0= github.com/go-openapi/validate v0.19.6/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4= +github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= @@ -249,6 +257,7 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -274,6 +283,7 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -286,6 +296,7 @@ github.com/mailru/easyjson v0.7.1 h1:mdxE1MF9o53iCb2Ghj1VfWvh7ZOwHpnVG/xwXrV90U8 github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= @@ -326,6 +337,7 @@ github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUr github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4= github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/plugin/storage/clickhouse/dependencystore/storage.go b/plugin/storage/clickhouse/dependencystore/storage.go new file mode 100644 index 00000000000..8b24d09d884 --- /dev/null +++ b/plugin/storage/clickhouse/dependencystore/storage.go @@ -0,0 +1,26 @@ +package dependencystore + +var ( + errNotImplemented = errors.New("not implemented") +) + +type Reader interface { + GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) +} + +// DependencyStore handles all queries and insertions to Clickhouse dependencies +type DependencyStore struct { + reader spanstore.Reader +} + +// NewDependencyStore returns a DependencyStore +func NewDependencyStore(store spanstore.Reader) *DependencyStore { + return &DependencyStore{ + reader: store, + } +} + +// GetDependencies returns all interservice dependencies, implements DependencyReader +func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + return nil, errNotImplemented +} diff --git a/plugin/storage/clickhouse/factory.go b/plugin/storage/clickhouse/factory.go new file mode 100644 index 00000000000..f161259fdf6 --- /dev/null +++ b/plugin/storage/clickhouse/factory.go @@ -0,0 +1,83 @@ +package clickhouse + +import ( + "database/sql" + "flag" + "fmt" + + _ "github.com/ClickHouse/clickhouse-go" + + "github.com/spf13/viper" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + dependencyStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore" + store "github.com/jaegertracing/jaeger/plugin/storage/clickhouse/spanstore" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// Factory implements storage.Factory for Clickhouse backend. +type Factory struct { + Options *Options + db *sql.DB + logger *zap.Logger +} + +// NewFactory creates a new Factory. +func NewFactory() *Factory { + return &Factory{ + Options: NewOptions("clickhouse"), + } +} + +// Initialize implements storage.Factory +func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { + f.logger = logger + + cfg := f.Options.GetPrimary() + + if cfg.Encoding != store.EncodingJSON && cfg.Encoding != store.EncodingProto { + return fmt.Errorf("unknown encoding %q, supported: %q, %q", store.EncodingJSON, store.EncodingProto) + } + + db, err := sql.Open("clickhouse", cfg.Datasource) + if err != nil { + return err + } + + if err := db.Ping(); err != nil { + return err + } + + f.db = db + + return nil +} + +// AddFlags implements plugin.Configurable +func (f *Factory) AddFlags(flagSet *flag.FlagSet) { + f.Options.AddFlags(flagSet) +} + +// InitFromViper implements plugin.Configurable +func (f *Factory) InitFromViper(v *viper.Viper) { + f.Options.InitFromViper(v) +} + +// CreateSpanReader implements storage.Factory +func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { + return store.NewTraceReader(f.db), nil +} + +// CreateSpanWriter implements storage.Factory +func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { + cfg := f.Options.GetPrimary() + return store.NewSpanWriter(f.db, cfg.Encoding, cfg.WriteBatchDelay, cfg.WriteBatchSize), nil +} + +// CreateDependencyReader implements storage.Factory +func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { + spanReader, _ := f.CreateSpanReader() // err is always nil + return dependencyStore.NewDependencyStore(spanReader), nil +} diff --git a/plugin/storage/clickhouse/options.go b/plugin/storage/clickhouse/options.go new file mode 100644 index 00000000000..fdb8d1a82ec --- /dev/null +++ b/plugin/storage/clickhouse/options.go @@ -0,0 +1,98 @@ +package clickhouse + +import ( + "flag" + "time" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/plugin/storage/clickhouse/spanstore" +) + +// Options store storage plugin related configs +type Options struct { + primary *NamespaceConfig + // This storage plugin does not support additional namespaces +} + +// NamespaceConfig is Clickhouse's internal configuration data +type NamespaceConfig struct { + namespace string + Datasource string + WriteBatchDelay time.Duration + WriteBatchSize int + Encoding spanstore.Encoding +} + +const ( + defaultDatasource string = "tcp://localhost:9000" + defaultWriteBatchDelay time.Duration = 5 * time.Second + defaultWriteBatchSize int = 1000 + defaultEncoding spanstore.Encoding = spanstore.EncodingProto +) + +const ( + suffixDatasource = ".datasource" + suffixWriteBatchDelay = ".write-batch-delay" + suffixWriteBatchSize = ".write-batch-size" + suffixEncoding = ".encoding" +) + +// NewOptions creates a new Options struct. +func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { + options := &Options{ + primary: &NamespaceConfig{ + namespace: primaryNamespace, + Datasource: defaultDatasource, + WriteBatchDelay: defaultWriteBatchDelay, + WriteBatchSize: defaultWriteBatchSize, + Encoding: defaultEncoding, + }, + } + + return options +} + +// AddFlags adds flags for Options +func (opt *Options) AddFlags(flagSet *flag.FlagSet) { + nsConfig := opt.primary + + flagSet.String( + nsConfig.namespace+suffixDatasource, + nsConfig.Datasource, + "Clickhouse datasource string.", + ) + + flagSet.Duration( + nsConfig.namespace+suffixWriteBatchDelay, + nsConfig.WriteBatchDelay, + "A duration after which spans are flushed to Clickhouse", + ) + + flagSet.Int( + nsConfig.namespace+suffixWriteBatchSize, + nsConfig.WriteBatchSize, + "A number of spans buffered before they are flushed to Clickhouse", + ) + + flagSet.String( + nsConfig.namespace+suffixEncoding, + string(nsConfig.Encoding), + "Encoding to store spans (json allows out of band queries, protobuf is more compact)", + ) +} + +// InitFromViper initializes Options with properties from viper +func (opt *Options) InitFromViper(v *viper.Viper) { + cfg := opt.primary + + cfg.Datasource = v.GetString(cfg.namespace + suffixDatasource) + cfg.WriteBatchDelay = v.GetDuration(cfg.namespace + suffixWriteBatchDelay) + cfg.WriteBatchSize = v.GetInt(cfg.namespace + suffixWriteBatchSize) + cfg.Encoding = spanstore.Encoding(v.GetString(cfg.namespace + suffixEncoding)) +} + +// GetPrimary returns the primary namespace configuration +func (opt *Options) GetPrimary() *NamespaceConfig { + return opt.primary +} diff --git a/plugin/storage/clickhouse/spanstore/reader.go b/plugin/storage/clickhouse/spanstore/reader.go new file mode 100644 index 00000000000..8e19c4f446d --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/reader.go @@ -0,0 +1,236 @@ +package spanstore + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + + "github.com/gogo/protobuf/proto" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" +) + +// SpanWriter for reading spans from ClickHouse +type TraceReader struct { + db *sql.DB +} + +// NewTraceReader returns a TraceReader for the database +func NewTraceReader(db *sql.DB) *TraceReader { + return &TraceReader{db} +} + +func (r *TraceReader) getTraces(ctx context.Context, traceIDs []model.TraceID) ([]*model.Trace, error) { + returning := make([]*model.Trace, 0, len(traceIDs)) + + if len(traceIDs) == 0 { + return returning, nil + } + + span, _ := opentracing.StartSpanFromContext(ctx, "getTraces") + span.LogFields(otlog.Object("trace_ids", traceIDs)) + span.SetTag("num_trace_ids", len(traceIDs)) + span.SetTag("weird", true) + span.SetTag("π", 3.14) + defer span.Finish() + + values := make([]interface{}, len(traceIDs)) + for i, traceID := range traceIDs { + values[i] = traceID.String() + } + + rows, err := r.db.QueryContext(ctx, "SELECT model FROM jaeger_spans WHERE traceID IN (?"+strings.Repeat(",?", len(values)-1)+")", values...) + if err != nil { + return nil, err + } + + defer rows.Close() + + traces := map[model.TraceID]*model.Trace{} + + for rows.Next() { + var serialized string + + if err := rows.Scan(&serialized); err != nil { + return nil, err + } + + span := model.Span{} + + if serialized[0] == '{' { + err = json.Unmarshal([]byte(serialized), &span) + } else { + err = proto.Unmarshal([]byte(serialized), &span) + } + + if err != nil { + return nil, err + } + + if _, ok := traces[span.TraceID]; !ok { + traces[span.TraceID] = &model.Trace{} + } + + traces[span.TraceID].Spans = append(traces[span.TraceID].Spans, &span) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + for _, traceID := range traceIDs { + if trace, ok := traces[traceID]; ok { + returning = append(returning, trace) + } + } + + return returning, nil +} + +// GetTrace takes a traceID and returns a Trace associated with that traceID +func (r *TraceReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") + defer span.Finish() + + traces, err := r.getTraces(ctx, []model.TraceID{traceID}) + if err != nil { + return nil, err + } + + if len(traces) == 0 { + return nil, spanstore.ErrTraceNotFound + } + + return traces[0], nil +} + +func (r *TraceReader) getStrings(ctx context.Context, sq string, args ...interface{}) ([]string, error) { + rows, err := r.db.QueryContext(ctx, sq, args...) + if err != nil { + return nil, err + } + + defer rows.Close() + + values := []string{} + + for rows.Next() { + var value string + if err := rows.Scan(&value); err != nil { + return nil, err + } + values = append(values, value) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return values, nil +} + +// GetServices fetches the sorted service list that have not expired +func (r *TraceReader) GetServices(ctx context.Context) ([]string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") + defer span.Finish() + + return r.getStrings(ctx, "SELECT service FROM jaeger_index_v7 GROUP BY service") +} + +// GetOperations fetches operations in the service and empty slice if service does not exists +func (r *TraceReader) GetOperations( + ctx context.Context, + query spanstore.OperationQueryParameters, +) ([]spanstore.Operation, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") + defer span.Finish() + + names, err := r.getStrings(ctx, "SELECT operation FROM jaeger_index_v7 WHERE service = ? GROUP BY operation", query.ServiceName) + if err != nil { + return nil, err + } + + operations := make([]spanstore.Operation, len(names)) + for i, name := range names { + operations[i].Name = name + } + + return operations, nil +} + +// FindTraces retrieves traces that match the traceQuery +func (r *TraceReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces") + defer span.Finish() + + traceIDs, err := r.FindTraceIDs(ctx, query) + if err != nil { + return nil, err + } + + return r.getTraces(ctx, traceIDs) +} + +// FindTraceIDs retrieves only the TraceIDs that match the traceQuery, but not the trace data +func (r *TraceReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs") + defer span.Finish() + + sql := "SELECT DISTINCT traceID FROM jaeger_index_v7 WHERE service = ?" + args := []interface{}{query.ServiceName} + + if query.OperationName != "" { + sql = sql + " AND operation = ?" + args = append(args, query.OperationName) + } + + if !query.StartTimeMin.IsZero() { + sql = sql + " AND timestamp >= toDateTime64(?, 6, 'UTC')" + args = append(args, query.StartTimeMin.UTC().Format("2006-01-02T15:04:05")) + } + + if !query.StartTimeMax.IsZero() { + sql = sql + " AND timestamp <= toDateTime64(?, 6, 'UTC')" + args = append(args, query.StartTimeMax.UTC().Format("2006-01-02T15:04:05")) + } + + if query.DurationMin != 0 { + sql = sql + " AND durationUs >= ?" + args = append(args, query.DurationMin.Microseconds()) + } + + if query.DurationMax != 0 { + sql = sql + " AND durationUs <= ?" + args = append(args, query.DurationMax.Microseconds()) + } + + for key, value := range query.Tags { + sql = sql + " AND has(tags, ?)" + args = append(args, fmt.Sprintf("%s=%s", key, value)) + } + + // Sorting by service is required for early termination of primary key scan: + // * https://github.com/ClickHouse/ClickHouse/issues/7102 + sql = sql + " ORDER BY service DESC,timestamp DESC LIMIT ?" + args = append(args, query.NumTraces) + + traceIDStrings, err := r.getStrings(ctx, sql, args...) + if err != nil { + return nil, err + } + + traceIDs := make([]model.TraceID, len(traceIDStrings)) + for i, traceIDString := range traceIDStrings { + traceID, err := model.TraceIDFromString(traceIDString) + if err != nil { + return nil, err + } + traceIDs[i] = traceID + } + + return traceIDs, nil +} diff --git a/plugin/storage/clickhouse/spanstore/writer.go b/plugin/storage/clickhouse/spanstore/writer.go new file mode 100644 index 00000000000..bcbcb3fd220 --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/writer.go @@ -0,0 +1,185 @@ +package spanstore + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/jaegertracing/jaeger/model" +) + +type Encoding string + +const ( + // EncodingJSON is used for spans encoded as JSON. + EncodingJSON Encoding = "json" + // EncodingProto is used for spans encoded as Protobuf. + EncodingProto Encoding = "protobuf" +) + +// SpanWriter for writing spans to ClickHouse +type SpanWriter struct { + db *sql.DB + encoding Encoding + delay time.Duration + size int + spans chan *model.Span +} + +// NewSpanWriter returns a SpanWriter for the database +func NewSpanWriter(db *sql.DB, encoding Encoding, delay time.Duration, size int) *SpanWriter { + writer := &SpanWriter{ + db: db, + encoding: encoding, + delay: delay, + size: size, + spans: make(chan *model.Span, 1000), + } + + go writer.backgroundWriter() + + return writer +} + +func (w *SpanWriter) backgroundWriter() { + batch := make([]*model.Span, 0, w.size) + + timer := time.After(w.delay) + + for { + flush := false + + select { + case span := <-w.spans: + batch = append(batch, span) + flush = len(batch) == cap(batch) + case <-timer: + timer = time.After(w.delay) + flush = len(batch) > 0 + } + + if flush { + if err := w.writeBatch(batch); err != nil { + log.Fatalf("Error writing batch: %s", err) + } + + batch = make([]*model.Span, 0, w.size) + } + } +} + +func (w *SpanWriter) writeBatch(batch []*model.Span) error { + if err := w.writeModelBatch(batch); err != nil { + return err + } + + if err := w.writeIndexBatch(batch); err != nil { + return err + } + + return nil +} + +func (w *SpanWriter) writeModelBatch(batch []*model.Span) error { + tx, err := w.db.Begin() + if err != nil { + return err + } + + commited := false + + defer func() { + if !commited { + tx.Rollback() + } + }() + + statement, err := tx.Prepare("INSERT INTO jaeger_spans (timestamp, traceID, model) VALUES (?, ?, ?)") + if err != nil { + return nil + } + + defer statement.Close() + + for _, span := range batch { + var serialized []byte + + if w.encoding == EncodingJSON { + serialized, err = json.Marshal(span) + } else { + serialized, err = proto.Marshal(span) + } + + if err != nil { + return err + } + + _, err = statement.Exec(span.StartTime, span.TraceID.String()[0:16], serialized) + if err != nil { + return err + } + } + + commited = true + + return tx.Commit() +} + +func (w *SpanWriter) writeIndexBatch(batch []*model.Span) error { + tx, err := w.db.Begin() + if err != nil { + return err + } + + commited := false + + defer func() { + if !commited { + tx.Rollback() + } + }() + + statement, err := tx.Prepare("INSERT INTO jaeger_index_v7 (timestamp, traceID, service, operation, durationUs, tags) VALUES (?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + + defer statement.Close() + + for _, span := range batch { + tags := make([]string, 0, len(span.Tags)+len(span.Process.Tags)) + + for _, kv := range span.Tags { + tags = append(tags, fmt.Sprintf("%s=%s", kv.Key, kv.AsString())) + } + + for _, kv := range span.Process.Tags { + tags = append(tags, fmt.Sprintf("%s=%s", kv.Key, kv.AsString())) + } + + _, err = statement.Exec( + span.StartTime, + span.TraceID.String()[0:16], + span.Process.ServiceName, + span.OperationName, + span.Duration.Microseconds(), + tags, + ) + if err != nil { + return err + } + } + + commited = true + + return tx.Commit() +} + +// WriteSpan writes the encoded span +func (w *SpanWriter) WriteSpan(span *model.Span) error { + w.spans <- span + return nil +} diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index 68d30f7a9f2..394cba0178c 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/plugin" "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" + "github.com/jaegertracing/jaeger/plugin/storage/clickhouse" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/grpc" "github.com/jaegertracing/jaeger/plugin/storage/kafka" @@ -42,6 +43,7 @@ const ( kafkaStorageType = "kafka" grpcPluginStorageType = "grpc-plugin" badgerStorageType = "badger" + clickhouseStorageType = "clickhouse" downsamplingRatio = "downsampling.ratio" downsamplingHashSalt = "downsampling.hashsalt" @@ -52,7 +54,7 @@ const ( ) // AllStorageTypes defines all available storage backends -var AllStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType, badgerStorageType, grpcPluginStorageType} +var AllStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType, badgerStorageType, clickhouseStorageType, grpcPluginStorageType} // Factory implements storage.Factory interface as a meta-factory for storage components. type Factory struct { @@ -94,6 +96,8 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.Factory, error) return kafka.NewFactory(), nil case badgerStorageType: return badger.NewFactory(), nil + case clickhouseStorageType: + return clickhouse.NewFactory(), nil case grpcPluginStorageType: return grpc.NewFactory(), nil default: