forked from jaegertracing/jaeger
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
646 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package dependencystore | ||
|
||
var ( | ||
errNotImplemented = errors.New("not implemented") | ||
) | ||
|
||
type Reader interface { | ||
GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) | ||
} | ||
|
||
// DependencyStore handles all queries and insertions to Clickhouse dependencies | ||
type DependencyStore struct { | ||
reader spanstore.Reader | ||
} | ||
|
||
// NewDependencyStore returns a DependencyStore | ||
func NewDependencyStore(store spanstore.Reader) *DependencyStore { | ||
return &DependencyStore{ | ||
reader: store, | ||
} | ||
} | ||
|
||
// GetDependencies returns all interservice dependencies, implements DependencyReader | ||
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { | ||
return nil, errNotImplemented | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package clickhouse | ||
|
||
import ( | ||
"database/sql" | ||
"flag" | ||
"fmt" | ||
|
||
_ "github.com/ClickHouse/clickhouse-go" | ||
|
||
"github.com/spf13/viper" | ||
"github.com/uber/jaeger-lib/metrics" | ||
"go.uber.org/zap" | ||
|
||
dependencyStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore" | ||
store "github.com/jaegertracing/jaeger/plugin/storage/clickhouse/spanstore" | ||
"github.com/jaegertracing/jaeger/storage/dependencystore" | ||
"github.com/jaegertracing/jaeger/storage/spanstore" | ||
) | ||
|
||
// Factory implements storage.Factory for Clickhouse backend. | ||
type Factory struct { | ||
Options *Options | ||
db *sql.DB | ||
logger *zap.Logger | ||
} | ||
|
||
// NewFactory creates a new Factory. | ||
func NewFactory() *Factory { | ||
return &Factory{ | ||
Options: NewOptions("clickhouse"), | ||
} | ||
} | ||
|
||
// Initialize implements storage.Factory | ||
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { | ||
f.logger = logger | ||
|
||
cfg := f.Options.GetPrimary() | ||
|
||
if cfg.Encoding != store.EncodingJSON && cfg.Encoding != store.EncodingProto { | ||
return fmt.Errorf("unknown encoding %q, supported: %q, %q", store.EncodingJSON, store.EncodingProto) | ||
} | ||
|
||
db, err := sql.Open("clickhouse", cfg.Datasource) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err := db.Ping(); err != nil { | ||
return err | ||
} | ||
|
||
f.db = db | ||
|
||
return nil | ||
} | ||
|
||
// AddFlags implements plugin.Configurable | ||
func (f *Factory) AddFlags(flagSet *flag.FlagSet) { | ||
f.Options.AddFlags(flagSet) | ||
} | ||
|
||
// InitFromViper implements plugin.Configurable | ||
func (f *Factory) InitFromViper(v *viper.Viper) { | ||
f.Options.InitFromViper(v) | ||
} | ||
|
||
// CreateSpanReader implements storage.Factory | ||
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { | ||
return store.NewTraceReader(f.db), nil | ||
} | ||
|
||
// CreateSpanWriter implements storage.Factory | ||
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { | ||
cfg := f.Options.GetPrimary() | ||
return store.NewSpanWriter(f.db, cfg.Encoding, cfg.WriteBatchDelay, cfg.WriteBatchSize), nil | ||
} | ||
|
||
// CreateDependencyReader implements storage.Factory | ||
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { | ||
spanReader, _ := f.CreateSpanReader() // err is always nil | ||
return dependencyStore.NewDependencyStore(spanReader), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package clickhouse | ||
|
||
import ( | ||
"flag" | ||
"time" | ||
|
||
"github.com/spf13/viper" | ||
|
||
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse/spanstore" | ||
) | ||
|
||
// Options store storage plugin related configs | ||
type Options struct { | ||
primary *NamespaceConfig | ||
// This storage plugin does not support additional namespaces | ||
} | ||
|
||
// NamespaceConfig is Clickhouse's internal configuration data | ||
type NamespaceConfig struct { | ||
namespace string | ||
Datasource string | ||
WriteBatchDelay time.Duration | ||
WriteBatchSize int | ||
Encoding spanstore.Encoding | ||
} | ||
|
||
const ( | ||
defaultDatasource string = "tcp://localhost:9000" | ||
defaultWriteBatchDelay time.Duration = 5 * time.Second | ||
defaultWriteBatchSize int = 1000 | ||
defaultEncoding spanstore.Encoding = spanstore.EncodingProto | ||
) | ||
|
||
const ( | ||
suffixDatasource = ".datasource" | ||
suffixWriteBatchDelay = ".write-batch-delay" | ||
suffixWriteBatchSize = ".write-batch-size" | ||
suffixEncoding = ".encoding" | ||
) | ||
|
||
// NewOptions creates a new Options struct. | ||
func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { | ||
options := &Options{ | ||
primary: &NamespaceConfig{ | ||
namespace: primaryNamespace, | ||
Datasource: defaultDatasource, | ||
WriteBatchDelay: defaultWriteBatchDelay, | ||
WriteBatchSize: defaultWriteBatchSize, | ||
Encoding: defaultEncoding, | ||
}, | ||
} | ||
|
||
return options | ||
} | ||
|
||
// AddFlags adds flags for Options | ||
func (opt *Options) AddFlags(flagSet *flag.FlagSet) { | ||
nsConfig := opt.primary | ||
|
||
flagSet.String( | ||
nsConfig.namespace+suffixDatasource, | ||
nsConfig.Datasource, | ||
"Clickhouse datasource string.", | ||
) | ||
|
||
flagSet.Duration( | ||
nsConfig.namespace+suffixWriteBatchDelay, | ||
nsConfig.WriteBatchDelay, | ||
"A duration after which spans are flushed to Clickhouse", | ||
) | ||
|
||
flagSet.Int( | ||
nsConfig.namespace+suffixWriteBatchSize, | ||
nsConfig.WriteBatchSize, | ||
"A number of spans buffered before they are flushed to Clickhouse", | ||
) | ||
|
||
flagSet.String( | ||
nsConfig.namespace+suffixEncoding, | ||
string(nsConfig.Encoding), | ||
"Encoding to store spans (json allows out of band queries, protobuf is more compact)", | ||
) | ||
} | ||
|
||
// InitFromViper initializes Options with properties from viper | ||
func (opt *Options) InitFromViper(v *viper.Viper) { | ||
cfg := opt.primary | ||
|
||
cfg.Datasource = v.GetString(cfg.namespace + suffixDatasource) | ||
cfg.WriteBatchDelay = v.GetDuration(cfg.namespace + suffixWriteBatchDelay) | ||
cfg.WriteBatchSize = v.GetInt(cfg.namespace + suffixWriteBatchSize) | ||
cfg.Encoding = spanstore.Encoding(v.GetString(cfg.namespace + suffixEncoding)) | ||
} | ||
|
||
// GetPrimary returns the primary namespace configuration | ||
func (opt *Options) GetPrimary() *NamespaceConfig { | ||
return opt.primary | ||
} |
Oops, something went wrong.