From 9e4de5b3de75c617de1d77460573330f7b7fb286 Mon Sep 17 00:00:00 2001 From: Abhishek Verma Date: Tue, 24 Oct 2023 16:28:06 +0530 Subject: [PATCH] feat(dlq): get dlq job details api (#77) * feat(dlq): get dlq job details api * chore: resolve comments * feat(test): add tests for get dlq job details service * feat(test): add test for get dlq job handler * chore: fix linting issues * feat: fix issues on create APIs --------- Co-authored-by: Stewart Jingga --- .golangci.yml | 1 + cli/server/configs.go | 3 +- cli/server/server.go | 3 +- generated/models/dlq_job.go | 3 + internal/server/v1/dlq/errors.go | 1 + internal/server/v1/dlq/handler.go | 18 +++-- internal/server/v1/dlq/handler_test.go | 88 ++---------------------- internal/server/v1/dlq/mapper.go | 93 +++++++++++++++----------- internal/server/v1/dlq/service.go | 31 +++++++-- internal/server/v1/dlq/service_test.go | 15 ++--- swagger.yml | 2 + 11 files changed, 115 insertions(+), 143 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 11ae540..95c7aad 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -10,6 +10,7 @@ output: linters: enable-all: true disable: + - goerr113 - gomnd - depguard - godot diff --git a/cli/server/configs.go b/cli/server/configs.go index ef9d335..bb09a3c 100644 --- a/cli/server/configs.go +++ b/cli/server/configs.go @@ -56,8 +56,7 @@ type optimusConfig struct { } type dlqConfig struct { - DlqJobImage string `mapstructure:"dlq_job_image"` - PrometheusHost string `mapstructure:"prometheus_host"` + JobImage string `mapstructure:"job_image"` } type serveConfig struct { diff --git a/cli/server/server.go b/cli/server/server.go index 080f02f..bda84f8 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -105,8 +105,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap wardenClient := warden.NewClient(cfg.Warden.Addr) dlqConfig := dlq.DlqJobConfig{ - DlqJobImage: cfg.Dlq.DlqJobImage, - PrometheusHost: cfg.Dlq.PrometheusHost, + JobImage: cfg.Dlq.JobImage, } return server.Serve(ctx, cfg.Service.Addr(), nrApp, zapLog, diff --git a/generated/models/dlq_job.go b/generated/models/dlq_job.go index ebe72fe..7be57f8 100644 --- a/generated/models/dlq_job.go +++ b/generated/models/dlq_job.go @@ -81,6 +81,9 @@ type DlqJob struct { // Enum: [pending error running stopped] Status string `json:"status,omitempty"` + // team + Team string `json:"team,omitempty"` + // topic Topic string `json:"topic,omitempty"` diff --git a/internal/server/v1/dlq/errors.go b/internal/server/v1/dlq/errors.go index b217c8b..236a7a9 100644 --- a/internal/server/v1/dlq/errors.go +++ b/internal/server/v1/dlq/errors.go @@ -8,4 +8,5 @@ var ( ErrFirehoseNotFound = errors.New("firehose not found") ErrEmptyConfigImage = errors.New("empty dlq job image") ErrEmptyConfigPrometheusHost = errors.New("empty prometheus host") + ErrJobNotFound = errors.New("no dlq job is found for this URN") ) diff --git a/internal/server/v1/dlq/handler.go b/internal/server/v1/dlq/handler.go index 4f0d9e5..e747f7e 100644 --- a/internal/server/v1/dlq/handler.go +++ b/internal/server/v1/dlq/handler.go @@ -132,11 +132,21 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { } func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) { - // sample to get job urn from route params - _ = h.jobURN(r) + ctx := r.Context() + jobURN := h.jobURN(r) - utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ - "dlq_job": nil, + dlqJob, err := h.service.GetDlqJob(ctx, jobURN) + if err != nil { + if errors.Is(err, ErrJobNotFound) { + utils.WriteErrMsg(w, http.StatusNotFound, ErrJobNotFound.Error()) + return + } + utils.WriteErr(w, err) + return + } + + utils.WriteJSON(w, http.StatusOK, map[string]any{ + "dlq_job": dlqJob, }) } diff --git a/internal/server/v1/dlq/handler_test.go b/internal/server/v1/dlq/handler_test.go index eab56ff..69b5302 100644 --- a/internal/server/v1/dlq/handler_test.go +++ b/internal/server/v1/dlq/handler_test.go @@ -131,81 +131,6 @@ func TestListTopicDates(t *testing.T) { assert.Equal(t, topicDates, expectedMap["dlq_list"]) } -func TestErrorFromGCSClient(t *testing.T) { - eService := &mocks.ResourceServiceClient{} - gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) - httpWriter := &testHTTPWriter{} - httpRequest := &http.Request{} - config := &entropy.FirehoseConfig{ - Stopped: false, - StopTime: nil, - Replicas: 0, - Namespace: "", - DeploymentID: "", - EnvVariables: map[string]string{ - firehose.ConfigDLQBucket: "test-bucket", - firehose.ConfigDLQDirectoryPrefix: "test-prefix", - }, - ResetOffset: "", - Limits: entropy.UsageSpec{}, - Requests: entropy.UsageSpec{}, - Telegraf: nil, - ChartValues: nil, - InitContainer: entropy.FirehoseInitContainer{}, - } - configProto, _ := utils.GoValToProtoStruct(config) - eService.On( - "GetResource", - context.Background(), - &entropyv1beta1.GetResourceRequest{Urn: ""}).Return( - &entropyv1beta1.GetResourceResponse{ - Resource: &entropyv1beta1.Resource{ - Urn: "", - Kind: "", - Name: "", - Project: "", - Labels: nil, - CreatedAt: nil, - UpdatedAt: nil, - Spec: &entropyv1beta1.ResourceSpec{ - Configs: configProto, - Dependencies: nil, - }, - State: nil, - CreatedBy: "", - UpdatedBy: "", - }, - }, nil) - gClient.On("ListDlqMetadata", gcs.BucketInfo{ - BucketName: "test-bucket", - Prefix: "test-prefix", - Delim: "", - }).Return(nil, fmt.Errorf("test-error")) - handler.ListFirehoseDLQ(httpWriter, httpRequest) - expectedMap := make(map[string]interface{}) - err := json.Unmarshal([]byte(httpWriter.messages[0]), &expectedMap) - require.NoError(t, err) - assert.Equal(t, "test-error", expectedMap["cause"]) -} - -func TestErrorFromFirehoseResource(t *testing.T) { - eService := &mocks.ResourceServiceClient{} - gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) - httpWriter := &testHTTPWriter{} - httpRequest := &http.Request{} - eService.On( - "GetResource", - context.Background(), - mock.Anything).Return(nil, fmt.Errorf("test-error")) - handler.ListFirehoseDLQ(httpWriter, httpRequest) - expectedMap := make(map[string]interface{}) - err := json.Unmarshal([]byte(httpWriter.messages[0]), &expectedMap) - require.NoError(t, err) - assert.Equal(t, "test-error", expectedMap["cause"]) -} - func TestListDlqJob(t *testing.T) { var ( method = http.MethodGet @@ -472,8 +397,7 @@ func skipTestCreateDlqJob(t *testing.T) { kubeCluster := "test-kube-cluster" userEmail := "test@example.com" config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", + JobImage: "test-image", } envVars := map[string]string{ "SINK_TYPE": "bigquery", @@ -583,7 +507,7 @@ func skipTestCreateDlqJob(t *testing.T) { Containers: []entropy.JobContainer{ { Name: "dlq-job", - Image: config.DlqJobImage, + Image: config.JobImage, ImagePullPolicy: "Always", SecretsVolumes: []entropy.JobSecret{ { @@ -613,7 +537,7 @@ func skipTestCreateDlqJob(t *testing.T) { EnvVariables: map[string]string{ // To be updated by streaming "APP_NAME": "", // TBA - "PROMETHEUS_HOST": config.PrometheusHost, + "PROMETHEUS_HOST": "", "DEPLOYMENT_NAME": "deployment-name", "TEAM": "", "TOPIC": topic, @@ -674,7 +598,7 @@ func skipTestCreateDlqJob(t *testing.T) { "topic": topic, "job_type": "dlq", "group": group, - "prometheus_host": config.PrometheusHost, + "prometheus_host": "", }, CreatedBy: jobResource.CreatedBy, UpdatedBy: jobResource.UpdatedBy, @@ -732,14 +656,14 @@ func skipTestCreateDlqJob(t *testing.T) { ErrorTypes: errorTypes, // firehose resource - ContainerImage: config.DlqJobImage, + ContainerImage: config.JobImage, DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], EnvVars: expectedEnvVars, Group: "", // KubeCluster: kubeCluster, Namespace: namespace, Project: firehoseResource.Project, - PrometheusHost: config.PrometheusHost, + PrometheusHost: "", // hardcoded Replicas: 0, diff --git a/internal/server/v1/dlq/mapper.go b/internal/server/v1/dlq/mapper.go index 9db8c0d..f285790 100644 --- a/internal/server/v1/dlq/mapper.go +++ b/internal/server/v1/dlq/mapper.go @@ -1,9 +1,10 @@ package dlq import ( + "errors" "fmt" "strconv" - "time" + "strings" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" "github.com/go-openapi/strfmt" @@ -21,7 +22,7 @@ const ( dlqTelegrafConfigName = "dlq-processor-telegraf" ) -func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobConfig) error { +func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource) error { var kubeCluster string for _, dep := range res.Spec.GetDependencies() { if dep.GetKey() == kubeClusterDependenciesKey { @@ -42,34 +43,68 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo if !ok { return ErrFirehoseNamespaceInvalid } - status := res.GetState().GetStatus().String() + + // TODO: populate prometheus host using firehose's + promRemoteWriteAny, exists := modConf.Telegraf.Config.Output["prometheus_remote_write"] + if !exists { + return errors.New("missing prometheus_remote_write") + } + promRemoteWriteMap, ok := promRemoteWriteAny.(map[string]any) + if !ok { + return errors.New("invalid prometheus_remote_write") + } + promURLAny, exists := promRemoteWriteMap["url"] + if !exists { + return errors.New("missing prometheus_remote_write.url") + } + promURLString, ok := promURLAny.(string) + if !ok { + return errors.New("invalid prometheus_remote_write.url") + } envs := modConf.EnvVariables - job.Name = buildEntropyResourceName(res.Name, "firehose", job.Topic, job.Date) + firehoseLabels := res.Labels + sinkType := envs["SINK_TYPE"] + dataType := envs["INPUT_SCHEMA_PROTO_CLASS"] + groupID := firehoseLabels["group"] + groupSlug := firehoseLabels["team"] + dlqPrefixDirectory := strings.Replace(envs["DLQ_GCS_DIRECTORY_PREFIX"], "{{ .name }}", res.Name, 1) + metricStatsDTag := fmt.Sprintf( + "namespace=%s,app=%s,sink=%s,team=%s,proto=%s,firehose=%s", + namespace, + job.Name, + sinkType, + groupSlug, + dataType, + res.Urn, + ) + + job.PrometheusHost = promURLString job.Namespace = namespace - job.Status = status - job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime()) - job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime()) job.Project = res.Project job.KubeCluster = kubeCluster - job.ContainerImage = cfg.DlqJobImage - job.PrometheusHost = cfg.PrometheusHost + job.Group = groupID + job.Team = groupSlug + job.DlqGcsCredentialPath = envs["DLQ_GCS_CREDENTIAL_PATH"] + if job.DlqGcsCredentialPath == "" { + return errors.New("missing DLQ_GCS_CREDENTIAL_PATH") + } job.EnvVars = map[string]string{ "DLQ_BATCH_SIZE": fmt.Sprintf("%d", job.BatchSize), "DLQ_NUM_THREADS": fmt.Sprintf("%d", job.NumThreads), "DLQ_ERROR_TYPES": job.ErrorTypes, "DLQ_INPUT_DATE": job.Date, "DLQ_TOPIC_NAME": job.Topic, - "METRIC_STATSD_TAGS": "a=b", // TBA - "SINK_TYPE": envs["SINK_TYPE"], - "DLQ_PREFIX_DIR": "test-firehose", + "METRIC_STATSD_TAGS": metricStatsDTag, + "SINK_TYPE": sinkType, + "DLQ_PREFIX_DIR": dlqPrefixDirectory, "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", "DLQ_GCS_BUCKET_NAME": envs["DLQ_GCS_BUCKET_NAME"], - "DLQ_GCS_CREDENTIAL_PATH": envs["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_CREDENTIAL_PATH": job.DlqGcsCredentialPath, "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envs["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], "JAVA_TOOL_OPTIONS": envs["JAVA_TOOL_OPTIONS"], "_JAVA_OPTIONS": envs["_JAVA_OPTIONS"], - "INPUT_SCHEMA_PROTO_CLASS": envs["INPUT_SCHEMA_PROTO_CLASS"], + "INPUT_SCHEMA_PROTO_CLASS": dataType, "SCHEMA_REGISTRY_STENCIL_ENABLE": envs["SCHEMA_REGISTRY_STENCIL_ENABLE"], "SCHEMA_REGISTRY_STENCIL_URLS": envs["SCHEMA_REGISTRY_STENCIL_URLS"], "SINK_BIGQUERY_ADD_METADATA_ENABLED": envs["SINK_BIGQUERY_ADD_METADATA_ENABLED"], @@ -88,7 +123,6 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo "SINK_BIGQUERY_TABLE_PARTITION_KEY": envs["SINK_BIGQUERY_TABLE_PARTITION_KEY"], "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envs["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], } - job.DlqGcsCredentialPath = modConf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"] return nil } @@ -113,7 +147,7 @@ func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) { Dependencies: []*entropyv1beta1.ResourceDependency{ { Key: "kube_cluster", - Value: job.KubeCluster, // from firehose configs.kube_cluster + Value: job.KubeCluster, }, }, }, @@ -155,14 +189,11 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { }, }, EnvVariables: map[string]string{ - // To be updated by streaming - "APP_NAME": "", // TBA + "APP_NAME": job.ResourceID, "PROMETHEUS_HOST": job.PrometheusHost, - "DEPLOYMENT_NAME": "deployment-name", - "TEAM": job.Group, + "TEAM": job.Team, "TOPIC": job.Topic, - "environment": "production", // TBA - "organization": "de", // TBA + "environment": "production", "projectID": job.Project, }, Command: []string{ @@ -183,9 +214,8 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { }, }, JobLabels: map[string]string{ - "firehose": job.ResourceID, - "topic": job.Topic, - "date": job.Date, + "topic": job.Topic, + "date": job.Date, }, Volumes: []entropy.JobVolume{ { @@ -284,16 +314,3 @@ func buildResourceLabels(job models.DlqJob) map[string]string { "namespace": job.Namespace, } } - -func buildEntropyResourceName(resourceTitle, resourceType, topic, date string) string { - timestamp := time.Now().Unix() - - return fmt.Sprintf( - "%s-%s-%s-%s-%d", - resourceTitle, // firehose title - resourceType, // firehose / dagger - topic, // - date, // - timestamp, - ) -} diff --git a/internal/server/v1/dlq/service.go b/internal/server/v1/dlq/service.go index b51bbc2..5ef4df8 100644 --- a/internal/server/v1/dlq/service.go +++ b/internal/server/v1/dlq/service.go @@ -3,6 +3,7 @@ package dlq import ( "context" "fmt" + "time" entropyv1beta1rpc "buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" @@ -16,8 +17,7 @@ import ( ) type DlqJobConfig struct { - DlqJobImage string `mapstructure:"dlq_job_image"` - PrometheusHost string `mapstructure:"prometheus_host"` + JobImage string } type Service struct { @@ -36,14 +36,14 @@ func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.Bl // TODO: replace *DlqJob with a generated models.DlqJob func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob models.DlqJob) (models.DlqJob, error) { - if s.cfg.DlqJobImage == "" { + if s.cfg.JobImage == "" { return models.DlqJob{}, ErrEmptyConfigImage } - if s.cfg.PrometheusHost == "" { - return models.DlqJob{}, ErrEmptyConfigPrometheusHost - } + timestamp := time.Now().Unix() + dlqJob.Name = fmt.Sprintf("dlq-%s-%d", dlqJob.Date, timestamp) dlqJob.Replicas = 1 + dlqJob.ContainerImage = s.cfg.JobImage def, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}) if err != nil { @@ -54,7 +54,7 @@ func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob mod return models.DlqJob{}, fmt.Errorf("error getting firehose resource: %w", err) } // enrich DlqJob with firehose details - if err := enrichDlqJob(&dlqJob, def.GetResource(), s.cfg); err != nil { + if err := enrichDlqJob(&dlqJob, def.GetResource()); err != nil { return models.DlqJob{}, fmt.Errorf("error enriching dlq job: %w", err) } @@ -101,3 +101,20 @@ func (s *Service) ListDlqJob(ctx context.Context, labelFilter map[string]string) return dlqJob, nil } + +func (s *Service) GetDlqJob(ctx context.Context, jobURN string) (models.DlqJob, error) { + res, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: jobURN}) + if err != nil { + st := status.Convert(err) + if st.Code() == codes.NotFound { + return models.DlqJob{}, ErrJobNotFound + } + return models.DlqJob{}, fmt.Errorf("error getting entropy resource: %w", err) + } + + dlqJob, err := mapToDlqJob(res.GetResource()) + if err != nil { + return models.DlqJob{}, fmt.Errorf("error mapping resource to dlq job: %w", err) + } + return dlqJob, nil +} diff --git a/internal/server/v1/dlq/service_test.go b/internal/server/v1/dlq/service_test.go index 130317f..d3fb0db 100644 --- a/internal/server/v1/dlq/service_test.go +++ b/internal/server/v1/dlq/service_test.go @@ -258,8 +258,7 @@ func skipTestServiceCreateDLQJob(t *testing.T) { kubeCluster := "test-kube-cluster" userEmail := "test@example.com" config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", + JobImage: "test-image", } dlqJob := models.DlqJob{ @@ -366,7 +365,7 @@ func skipTestServiceCreateDLQJob(t *testing.T) { Containers: []entropy.JobContainer{ { Name: "dlq-job", - Image: config.DlqJobImage, + Image: config.JobImage, ImagePullPolicy: "Always", SecretsVolumes: []entropy.JobSecret{ { @@ -396,7 +395,7 @@ func skipTestServiceCreateDLQJob(t *testing.T) { EnvVariables: map[string]string{ // To be updated by streaming "APP_NAME": "", // TBA - "PROMETHEUS_HOST": config.PrometheusHost, + "PROMETHEUS_HOST": "", "DEPLOYMENT_NAME": "deployment-name", "TEAM": dlqJob.Group, "TOPIC": dlqJob.Topic, @@ -449,12 +448,12 @@ func skipTestServiceCreateDLQJob(t *testing.T) { "topic": dlqJob.Topic, "job_type": "dlq", "group": dlqJob.Group, - "prometheus_host": config.PrometheusHost, + "prometheus_host": "", "batch_size": "5", "num_threads": "2", "error_types": "DESERILIAZATION_ERROR", "dlq_gcs_credential_path": updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], - "container_image": config.DlqJobImage, + "container_image": config.JobImage, "namespace": namespace, "replicas": "1", }, @@ -511,13 +510,13 @@ func skipTestServiceCreateDLQJob(t *testing.T) { ResourceID: dlqJob.ResourceID, ResourceType: dlqJob.ResourceType, Topic: dlqJob.Topic, - ContainerImage: config.DlqJobImage, + ContainerImage: config.JobImage, DlqGcsCredentialPath: updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], EnvVars: updatedEnvVars, KubeCluster: kubeCluster, Namespace: namespace, Project: firehoseResource.Project, - PrometheusHost: config.PrometheusHost, + PrometheusHost: "", CreatedAt: strfmt.DateTime(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), UpdatedAt: strfmt.DateTime(time.Date(2022, 2, 2, 1, 1, 1, 1, time.UTC)), CreatedBy: "test-created-by", diff --git a/swagger.yml b/swagger.yml index c226d18..54e6623 100644 --- a/swagger.yml +++ b/swagger.yml @@ -1337,6 +1337,8 @@ definitions: type: string group: type: string + team: + type: string kube_cluster: type: string namespace: