Skip to content

Commit

Permalink
Update jaeger std trf (#417)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->

Current implementation of jaeger tracing in UPI standard transformer is
broken, several issues on existing implementation
* Standard transformer can't propagate the parent context from the
upstream service
* UPI ST not propagate the metadata to the predictor

Below is the result screenshot:

<img width="1787" alt="Screen Shot 2023-07-03 at 11 03 20"
src="https://github.com/caraml-dev/merlin/assets/2369255/99f67d91-4efd-47e1-a022-e02a97378c46">

**Which issue(s) this PR 

fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->

Fixes #

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```

**Checklist**

- [ ] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduce API
changes
  • Loading branch information
tiopramayudi committed Jul 7, 2023
1 parent 84c4383 commit 9c2d806
Show file tree
Hide file tree
Showing 29 changed files with 236 additions and 205 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/merlin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ jobs:
LOCAL_REGISTRY_PORT: 12345
LOCAL_REGISTRY: "dev.localhost"
INGRESS_HOST: "127.0.0.1.nip.io"
MERLIN_CHART_VERSION: 0.11.1
MERLIN_CHART_VERSION: 0.11.4
E2E_PYTHON_VERSION: "3.10.6"
steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 1 addition & 3 deletions api/cluster/resource/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,8 @@ func (t *InferenceServiceTemplater) enrichStandardTransformerEnvVars(modelServic

jaegerCfg := t.standardTransformerConfig.Jaeger
jaegerEnvVars := []models.EnvVar{
{Name: transformerpkg.JaegerAgentHost, Value: jaegerCfg.AgentHost},
{Name: transformerpkg.JaegerAgentPort, Value: jaegerCfg.AgentPort},
{Name: transformerpkg.JaegerCollectorURL, Value: jaegerCfg.CollectorURL},
{Name: transformerpkg.JaegerSamplerParam, Value: jaegerCfg.SamplerParam},
{Name: transformerpkg.JaegerSamplerType, Value: jaegerCfg.SamplerType},
{Name: transformerpkg.JaegerDisabled, Value: jaegerCfg.Disabled},
}

Expand Down
22 changes: 6 additions & 16 deletions api/cluster/resource/templater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ var (
ImageName: "merlin-standard-transformer",
FeastCoreURL: "core.feast.dev:8081",
Jaeger: config.JaegerConfig{
AgentHost: "localhost",
AgentPort: "6831",
SamplerType: "const",
CollectorURL: "http://jaeger-tracing-collector.infrastructure:14268/api/traces",
SamplerParam: "1",
Disabled: "false",
},
Expand Down Expand Up @@ -1982,10 +1980,8 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) {
{Name: transformerpkg.FeastServingKeepAliveTime, Value: "30s"},
{Name: transformerpkg.FeastServingKeepAliveTimeout, Value: "1s"},
{Name: transformerpkg.FeastGRPCConnCount, Value: "5"},
{Name: transformerpkg.JaegerAgentHost, Value: standardTransformerConfig.Jaeger.AgentHost},
{Name: transformerpkg.JaegerAgentPort, Value: standardTransformerConfig.Jaeger.AgentPort},
{Name: transformerpkg.JaegerCollectorURL, Value: standardTransformerConfig.Jaeger.CollectorURL},
{Name: transformerpkg.JaegerSamplerParam, Value: standardTransformerConfig.Jaeger.SamplerParam},
{Name: transformerpkg.JaegerSamplerType, Value: standardTransformerConfig.Jaeger.SamplerType},
{Name: transformerpkg.JaegerDisabled, Value: standardTransformerConfig.Jaeger.Disabled},
{Name: transformerpkg.StandardTransformerConfigEnvName, Value: `{"standard_transformer":null}`},
}, createDefaultTransformerEnvVars(modelSvc)).ToKubernetesEnvVars(),
Expand Down Expand Up @@ -2094,10 +2090,8 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) {
{Name: transformerpkg.KafkaConnectTimeoutMS, Value: fmt.Sprintf("%v", standardTransformerConfig.Kafka.ConnectTimeoutMS)},
{Name: transformerpkg.KafkaSerialization, Value: string(standardTransformerConfig.Kafka.SerializationFmt)},
{Name: transformerpkg.ModelServerConnCount, Value: "3"},
{Name: transformerpkg.JaegerAgentHost, Value: standardTransformerConfig.Jaeger.AgentHost},
{Name: transformerpkg.JaegerAgentPort, Value: standardTransformerConfig.Jaeger.AgentPort},
{Name: transformerpkg.JaegerCollectorURL, Value: standardTransformerConfig.Jaeger.CollectorURL},
{Name: transformerpkg.JaegerSamplerParam, Value: standardTransformerConfig.Jaeger.SamplerParam},
{Name: transformerpkg.JaegerSamplerType, Value: standardTransformerConfig.Jaeger.SamplerType},
{Name: transformerpkg.JaegerDisabled, Value: standardTransformerConfig.Jaeger.Disabled},
{Name: transformerpkg.StandardTransformerConfigEnvName, Value: `{"standard_transformer":null}`},
}, createDefaultTransformerEnvVars(modelSvcGRPC)).ToKubernetesEnvVars(),
Expand Down Expand Up @@ -2215,10 +2209,8 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) {
{Name: transformerpkg.KafkaConnectTimeoutMS, Value: fmt.Sprintf("%v", standardTransformerConfig.Kafka.ConnectTimeoutMS)},
{Name: transformerpkg.KafkaSerialization, Value: string(standardTransformerConfig.Kafka.SerializationFmt)},
{Name: transformerpkg.ModelServerConnCount, Value: "3"},
{Name: transformerpkg.JaegerAgentHost, Value: standardTransformerConfig.Jaeger.AgentHost},
{Name: transformerpkg.JaegerAgentPort, Value: standardTransformerConfig.Jaeger.AgentPort},
{Name: transformerpkg.JaegerCollectorURL, Value: standardTransformerConfig.Jaeger.CollectorURL},
{Name: transformerpkg.JaegerSamplerParam, Value: standardTransformerConfig.Jaeger.SamplerParam},
{Name: transformerpkg.JaegerSamplerType, Value: standardTransformerConfig.Jaeger.SamplerType},
{Name: transformerpkg.JaegerDisabled, Value: standardTransformerConfig.Jaeger.Disabled},
{Name: transformerpkg.StandardTransformerConfigEnvName, Value: `{"standard_transformer":null}`},
}, createDefaultTransformerEnvVars(modelSvcGRPC)).ToKubernetesEnvVars(),
Expand Down Expand Up @@ -5670,7 +5662,7 @@ func TestCreateTransformerSpec(t *testing.T) {
MemoryRequest: memoryRequest,
},
EnvVars: models.EnvVars{
{Name: transformerpkg.JaegerAgentHost, Value: "NEW_HOST"}, // test user overwrite
{Name: transformerpkg.JaegerCollectorURL, Value: "NEW_HOST"}, // test user overwrite
},
},
&config.DeploymentConfig{},
Expand All @@ -5693,10 +5685,8 @@ func TestCreateTransformerSpec(t *testing.T) {
{Name: transformerpkg.FeastServingKeepAliveTime, Value: "30s"},
{Name: transformerpkg.FeastServingKeepAliveTimeout, Value: "1s"},
{Name: transformerpkg.FeastGRPCConnCount, Value: "5"},
{Name: transformerpkg.JaegerAgentHost, Value: "NEW_HOST"},
{Name: transformerpkg.JaegerAgentPort, Value: standardTransformerConfig.Jaeger.AgentPort},
{Name: transformerpkg.JaegerCollectorURL, Value: "NEW_HOST"},
{Name: transformerpkg.JaegerSamplerParam, Value: standardTransformerConfig.Jaeger.SamplerParam},
{Name: transformerpkg.JaegerSamplerType, Value: standardTransformerConfig.Jaeger.SamplerType},
{Name: transformerpkg.JaegerDisabled, Value: standardTransformerConfig.Jaeger.Disabled},
}, createDefaultTransformerEnvVars(modelSvc)).ToKubernetesEnvVars(),
Resources: corev1.ResourceRequirements{
Expand Down
70 changes: 44 additions & 26 deletions api/cmd/transformer/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package main

import (
"context"
"encoding/json"
"io"
"log"

metricCollector "github.com/afex/hystrix-go/hystrix/metric_collector"
"github.com/gorilla/mux"
"github.com/kelseyhightower/envconfig"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
jcfg "github.com/uber/jaeger-client-go/config"
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -60,6 +63,19 @@ type AppConfig struct {

// By default the value is 0, users should configure this value below the memory requested
InitHeapSizeInMB int `envconfig:"INIT_HEAP_SIZE_IN_MB" default:"0"`

// Tracing config
Tracing JaegerTracing
}

// JaegerConfig holds configuration about jaeger tracing
type JaegerTracing struct {
// Disabled the tracing, will create NoOpTracerProvider
Disabled bool `envconfig:"JAEGER_DISABLED" default:"true"`
// Collector endpoint URL
CollectorURL string `envconfig:"JAEGER_COLLECTOR_URL"`
// Probability the trace will be sampled
SamplerProbability float64 `envconfig:"JAEGER_SAMPLER_PARAM" default:"0.1"`
}

// Trick GC frequency based on this https://blog.twitch.tv/en/2019/04/10/go-memory-ballast-how-i-learnt-to-stop-worrying-and-love-the-heap-26c2462549a2/
Expand All @@ -83,13 +99,19 @@ func main() {

logger.Info("configuration loaded", zap.Any("appConfig", appConfig))

closer, err := initTracing(appConfig.Server.ModelFullName + "-transformer")
tracingProvider, err := initTracing(appConfig.Server.ModelFullName+"-transformer", appConfig.Tracing)
if err != nil {
logger.Error("Unable to initialize tracing", zap.Error(err))
}
if closer != nil {
defer closer.Close() //nolint:errcheck
}
otel.SetTracerProvider(tracingProvider)
propagator := b3.New(b3.WithInjectEncoding(b3.B3MultipleHeader | b3.B3SingleHeader))
otel.SetTextMapPropagator(propagator)

defer func() {
if err := tracingProvider.Shutdown(context.Background()); err != nil {
logger.Error("Error shutting down tracer provider", zap.Error(err))
}
}()

transformerConfig := &spec.StandardTransformerConfig{}
if err := protojson.Unmarshal([]byte(appConfig.StandardTransformerConfigJSON), transformerConfig); err != nil {
Expand Down Expand Up @@ -253,26 +275,22 @@ func runGrpcServer(opts *serverConf.Options, handler *pipeline.Handler, instrume
s.Run()
}

func initTracing(serviceName string) (io.Closer, error) {
// Set tracing configuration defaults.
cfg := &jcfg.Configuration{
ServiceName: serviceName,
func initTracing(serviceName string, tracingCfg JaegerTracing) (*tracesdk.TracerProvider, error) {
if tracingCfg.Disabled {
return tracesdk.NewTracerProvider(tracesdk.WithSampler(tracesdk.NeverSample())), nil
}

// Available options can be seen here:
// https://github.com/jaegertracing/jaeger-client-go#environment-variables
cfg, err := cfg.FromEnv()
// Create the Jaeger exporter
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(tracingCfg.CollectorURL)))
if err != nil {
return nil, errors.Wrap(err, "Unable to get tracing config from environment")
return nil, err
}

tracer, closer, err := cfg.NewTracer(
jcfg.Metrics(jprom.New()),
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithSampler(tracesdk.TraceIDRatioBased(tracingCfg.SamplerProbability)),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceName),
)),
)
if err != nil {
return nil, errors.Wrap(err, "Unable to initialize tracer")
}

opentracing.SetGlobalTracer(tracer)
return closer, nil
return tp, nil
}
4 changes: 1 addition & 3 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,7 @@ func (u *FeastServingURLs) URLs() []string {
}

type JaegerConfig struct {
AgentHost string
AgentPort string
SamplerType string `validate:"required" default:"probabilistic"`
CollectorURL string `validate:"required"`
SamplerParam string `validate:"required" default:"0.01"`
Disabled string `validate:"required" default:"true"`
}
Expand Down
27 changes: 16 additions & 11 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20220214044918-55732a6a392c
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/antihax/optional v1.0.0
github.com/antonmedv/expr v1.8.9
github.com/antonmedv/expr v1.12.5
github.com/bboughton/gcp-helpers v0.1.0
github.com/buger/jsonparser v1.1.1
github.com/caraml-dev/merlin-pyspark-app v0.0.3
Expand Down Expand Up @@ -37,6 +37,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/schema v1.1.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/jinzhu/copier v0.3.5
github.com/jinzhu/gorm v1.9.11
Expand All @@ -48,7 +49,6 @@ require (
github.com/mmcloughlin/geohash v0.10.0
github.com/newrelic/newrelic-client-go/v2 v2.17.0
github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
github.com/ory/viper v1.7.5
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand All @@ -62,17 +62,21 @@ require (
github.com/rs/cors v1.8.2
github.com/soheilhy/cmux v0.1.4
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.1
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/stretchr/testify v1.8.3
github.com/xanzy/go-gitlab v0.32.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
go.opentelemetry.io/contrib/propagators/b3 v1.17.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/jaeger v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/automaxprocs v1.4.0
go.uber.org/zap v1.21.0
golang.org/x/oauth2 v0.6.0
golang.org/x/text v0.9.0
google.golang.org/api v0.114.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -130,7 +134,8 @@ require (
github.com/go-kit/kit v0.9.0 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.19.5 // indirect
github.com/go-openapi/errors v0.19.4 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
Expand Down Expand Up @@ -209,13 +214,14 @@ require (
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
github.com/xanzy/ssh-agent v0.3.0 // indirect
go.mongodb.org/mongo-driver v1.1.2 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
gonum.org/v1/gonum v0.9.1 // indirect
Expand All @@ -230,7 +236,6 @@ require (
)

require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/caraml-dev/universal-prediction-interface v0.3.6
github.com/go-openapi/spec v0.19.9 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
Expand Down
Loading

0 comments on commit 9c2d806

Please sign in to comment.