From e6da1362c031c45719de98d64c8ea334a85f4565 Mon Sep 17 00:00:00 2001 From: Stewart Jingga Date: Tue, 24 Oct 2023 17:32:50 +0700 Subject: [PATCH] feat: fix issues on create APIs --- .golangci.yml | 1 + cli/server/configs.go | 3 +- cli/server/server.go | 3 +- generated/models/dlq_job.go | 5 +- internal/server/v1/dlq/handler.go | 6 +- internal/server/v1/dlq/handler_test.go | 13 +- internal/server/v1/dlq/mapper.go | 93 +++++---- internal/server/v1/dlq/routes.go | 2 +- internal/server/v1/dlq/service.go | 14 +- internal/server/v1/dlq/service_test.go | 257 +------------------------ swagger.yml | 2 + 11 files changed, 89 insertions(+), 310 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 11ae540..95c7aad 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -10,6 +10,7 @@ output: linters: enable-all: true disable: + - goerr113 - gomnd - depguard - godot diff --git a/cli/server/configs.go b/cli/server/configs.go index ef9d335..bb09a3c 100644 --- a/cli/server/configs.go +++ b/cli/server/configs.go @@ -56,8 +56,7 @@ type optimusConfig struct { } type dlqConfig struct { - DlqJobImage string `mapstructure:"dlq_job_image"` - PrometheusHost string `mapstructure:"prometheus_host"` + JobImage string `mapstructure:"job_image"` } type serveConfig struct { diff --git a/cli/server/server.go b/cli/server/server.go index 080f02f..bda84f8 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -105,8 +105,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap wardenClient := warden.NewClient(cfg.Warden.Addr) dlqConfig := dlq.DlqJobConfig{ - DlqJobImage: cfg.Dlq.DlqJobImage, - PrometheusHost: cfg.Dlq.PrometheusHost, + JobImage: cfg.Dlq.JobImage, } return server.Serve(ctx, cfg.Service.Addr(), nrApp, zapLog, diff --git a/generated/models/dlq_job.go b/generated/models/dlq_job.go index 2121a64..7be57f8 100644 --- a/generated/models/dlq_job.go +++ b/generated/models/dlq_job.go @@ -54,7 +54,7 @@ type DlqJob struct { // name Name string `json:"name,omitempty"` - + // namespace Namespace string `json:"namespace,omitempty"` @@ -81,6 +81,9 @@ type DlqJob struct { // Enum: [pending error running stopped] Status string `json:"status,omitempty"` + // team + Team string `json:"team,omitempty"` + // topic Topic string `json:"topic,omitempty"` diff --git a/internal/server/v1/dlq/handler.go b/internal/server/v1/dlq/handler.go index 2ca60f1..e747f7e 100644 --- a/internal/server/v1/dlq/handler.go +++ b/internal/server/v1/dlq/handler.go @@ -131,7 +131,7 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { }) } -func (h *Handler) GetDlqJob(w http.ResponseWriter, r *http.Request) { +func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() jobURN := h.jobURN(r) @@ -145,7 +145,9 @@ func (h *Handler) GetDlqJob(w http.ResponseWriter, r *http.Request) { return } - utils.WriteJSON(w, http.StatusOK, dlqJob) + utils.WriteJSON(w, http.StatusOK, map[string]any{ + "dlq_job": dlqJob, + }) } func (*Handler) firehoseURN(r *http.Request) string { diff --git a/internal/server/v1/dlq/handler_test.go b/internal/server/v1/dlq/handler_test.go index 8422acc..69b5302 100644 --- a/internal/server/v1/dlq/handler_test.go +++ b/internal/server/v1/dlq/handler_test.go @@ -397,8 +397,7 @@ func skipTestCreateDlqJob(t *testing.T) { kubeCluster := "test-kube-cluster" userEmail := "test@example.com" config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", + JobImage: "test-image", } envVars := map[string]string{ "SINK_TYPE": "bigquery", @@ -508,7 +507,7 @@ func skipTestCreateDlqJob(t *testing.T) { Containers: []entropy.JobContainer{ { Name: "dlq-job", - Image: config.DlqJobImage, + Image: config.JobImage, ImagePullPolicy: "Always", SecretsVolumes: []entropy.JobSecret{ { @@ -538,7 +537,7 @@ func skipTestCreateDlqJob(t *testing.T) { EnvVariables: map[string]string{ // To be updated by streaming "APP_NAME": "", // TBA - "PROMETHEUS_HOST": config.PrometheusHost, + "PROMETHEUS_HOST": "", "DEPLOYMENT_NAME": "deployment-name", "TEAM": "", "TOPIC": topic, @@ -599,7 +598,7 @@ func skipTestCreateDlqJob(t *testing.T) { "topic": topic, "job_type": "dlq", "group": group, - "prometheus_host": config.PrometheusHost, + "prometheus_host": "", }, CreatedBy: jobResource.CreatedBy, UpdatedBy: jobResource.UpdatedBy, @@ -657,14 +656,14 @@ func skipTestCreateDlqJob(t *testing.T) { ErrorTypes: errorTypes, // firehose resource - ContainerImage: config.DlqJobImage, + ContainerImage: config.JobImage, DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], EnvVars: expectedEnvVars, Group: "", // KubeCluster: kubeCluster, Namespace: namespace, Project: firehoseResource.Project, - PrometheusHost: config.PrometheusHost, + PrometheusHost: "", // hardcoded Replicas: 0, diff --git a/internal/server/v1/dlq/mapper.go b/internal/server/v1/dlq/mapper.go index 9db8c0d..f285790 100644 --- a/internal/server/v1/dlq/mapper.go +++ b/internal/server/v1/dlq/mapper.go @@ -1,9 +1,10 @@ package dlq import ( + "errors" "fmt" "strconv" - "time" + "strings" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" "github.com/go-openapi/strfmt" @@ -21,7 +22,7 @@ const ( dlqTelegrafConfigName = "dlq-processor-telegraf" ) -func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobConfig) error { +func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource) error { var kubeCluster string for _, dep := range res.Spec.GetDependencies() { if dep.GetKey() == kubeClusterDependenciesKey { @@ -42,34 +43,68 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo if !ok { return ErrFirehoseNamespaceInvalid } - status := res.GetState().GetStatus().String() + + // TODO: populate prometheus host using firehose's + promRemoteWriteAny, exists := modConf.Telegraf.Config.Output["prometheus_remote_write"] + if !exists { + return errors.New("missing prometheus_remote_write") + } + promRemoteWriteMap, ok := promRemoteWriteAny.(map[string]any) + if !ok { + return errors.New("invalid prometheus_remote_write") + } + promURLAny, exists := promRemoteWriteMap["url"] + if !exists { + return errors.New("missing prometheus_remote_write.url") + } + promURLString, ok := promURLAny.(string) + if !ok { + return errors.New("invalid prometheus_remote_write.url") + } envs := modConf.EnvVariables - job.Name = buildEntropyResourceName(res.Name, "firehose", job.Topic, job.Date) + firehoseLabels := res.Labels + sinkType := envs["SINK_TYPE"] + dataType := envs["INPUT_SCHEMA_PROTO_CLASS"] + groupID := firehoseLabels["group"] + groupSlug := firehoseLabels["team"] + dlqPrefixDirectory := strings.Replace(envs["DLQ_GCS_DIRECTORY_PREFIX"], "{{ .name }}", res.Name, 1) + metricStatsDTag := fmt.Sprintf( + "namespace=%s,app=%s,sink=%s,team=%s,proto=%s,firehose=%s", + namespace, + job.Name, + sinkType, + groupSlug, + dataType, + res.Urn, + ) + + job.PrometheusHost = promURLString job.Namespace = namespace - job.Status = status - job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime()) - job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime()) job.Project = res.Project job.KubeCluster = kubeCluster - job.ContainerImage = cfg.DlqJobImage - job.PrometheusHost = cfg.PrometheusHost + job.Group = groupID + job.Team = groupSlug + job.DlqGcsCredentialPath = envs["DLQ_GCS_CREDENTIAL_PATH"] + if job.DlqGcsCredentialPath == "" { + return errors.New("missing DLQ_GCS_CREDENTIAL_PATH") + } job.EnvVars = map[string]string{ "DLQ_BATCH_SIZE": fmt.Sprintf("%d", job.BatchSize), "DLQ_NUM_THREADS": fmt.Sprintf("%d", job.NumThreads), "DLQ_ERROR_TYPES": job.ErrorTypes, "DLQ_INPUT_DATE": job.Date, "DLQ_TOPIC_NAME": job.Topic, - "METRIC_STATSD_TAGS": "a=b", // TBA - "SINK_TYPE": envs["SINK_TYPE"], - "DLQ_PREFIX_DIR": "test-firehose", + "METRIC_STATSD_TAGS": metricStatsDTag, + "SINK_TYPE": sinkType, + "DLQ_PREFIX_DIR": dlqPrefixDirectory, "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", "DLQ_GCS_BUCKET_NAME": envs["DLQ_GCS_BUCKET_NAME"], - "DLQ_GCS_CREDENTIAL_PATH": envs["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_CREDENTIAL_PATH": job.DlqGcsCredentialPath, "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envs["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], "JAVA_TOOL_OPTIONS": envs["JAVA_TOOL_OPTIONS"], "_JAVA_OPTIONS": envs["_JAVA_OPTIONS"], - "INPUT_SCHEMA_PROTO_CLASS": envs["INPUT_SCHEMA_PROTO_CLASS"], + "INPUT_SCHEMA_PROTO_CLASS": dataType, "SCHEMA_REGISTRY_STENCIL_ENABLE": envs["SCHEMA_REGISTRY_STENCIL_ENABLE"], "SCHEMA_REGISTRY_STENCIL_URLS": envs["SCHEMA_REGISTRY_STENCIL_URLS"], "SINK_BIGQUERY_ADD_METADATA_ENABLED": envs["SINK_BIGQUERY_ADD_METADATA_ENABLED"], @@ -88,7 +123,6 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo "SINK_BIGQUERY_TABLE_PARTITION_KEY": envs["SINK_BIGQUERY_TABLE_PARTITION_KEY"], "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envs["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], } - job.DlqGcsCredentialPath = modConf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"] return nil } @@ -113,7 +147,7 @@ func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) { Dependencies: []*entropyv1beta1.ResourceDependency{ { Key: "kube_cluster", - Value: job.KubeCluster, // from firehose configs.kube_cluster + Value: job.KubeCluster, }, }, }, @@ -155,14 +189,11 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { }, }, EnvVariables: map[string]string{ - // To be updated by streaming - "APP_NAME": "", // TBA + "APP_NAME": job.ResourceID, "PROMETHEUS_HOST": job.PrometheusHost, - "DEPLOYMENT_NAME": "deployment-name", - "TEAM": job.Group, + "TEAM": job.Team, "TOPIC": job.Topic, - "environment": "production", // TBA - "organization": "de", // TBA + "environment": "production", "projectID": job.Project, }, Command: []string{ @@ -183,9 +214,8 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { }, }, JobLabels: map[string]string{ - "firehose": job.ResourceID, - "topic": job.Topic, - "date": job.Date, + "topic": job.Topic, + "date": job.Date, }, Volumes: []entropy.JobVolume{ { @@ -284,16 +314,3 @@ func buildResourceLabels(job models.DlqJob) map[string]string { "namespace": job.Namespace, } } - -func buildEntropyResourceName(resourceTitle, resourceType, topic, date string) string { - timestamp := time.Now().Unix() - - return fmt.Sprintf( - "%s-%s-%s-%s-%d", - resourceTitle, // firehose title - resourceType, // firehose / dagger - topic, // - date, // - timestamp, - ) -} diff --git a/internal/server/v1/dlq/routes.go b/internal/server/v1/dlq/routes.go index f6073a5..328f138 100644 --- a/internal/server/v1/dlq/routes.go +++ b/internal/server/v1/dlq/routes.go @@ -18,7 +18,7 @@ func Routes( return func(r chi.Router) { r.Get("/firehose/{firehose_urn}", handler.ListFirehoseDLQ) r.Get("/jobs", handler.listDlqJobs) - r.Get("/jobs/{job_urn}", handler.GetDlqJob) + r.Get("/jobs/{job_urn}", handler.getDlqJob) r.Post("/jobs", handler.createDlqJob) } } diff --git a/internal/server/v1/dlq/service.go b/internal/server/v1/dlq/service.go index adc26e9..5ef4df8 100644 --- a/internal/server/v1/dlq/service.go +++ b/internal/server/v1/dlq/service.go @@ -3,6 +3,7 @@ package dlq import ( "context" "fmt" + "time" entropyv1beta1rpc "buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" @@ -16,8 +17,7 @@ import ( ) type DlqJobConfig struct { - DlqJobImage string `mapstructure:"dlq_job_image"` - PrometheusHost string `mapstructure:"prometheus_host"` + JobImage string } type Service struct { @@ -36,14 +36,14 @@ func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.Bl // TODO: replace *DlqJob with a generated models.DlqJob func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob models.DlqJob) (models.DlqJob, error) { - if s.cfg.DlqJobImage == "" { + if s.cfg.JobImage == "" { return models.DlqJob{}, ErrEmptyConfigImage } - if s.cfg.PrometheusHost == "" { - return models.DlqJob{}, ErrEmptyConfigPrometheusHost - } + timestamp := time.Now().Unix() + dlqJob.Name = fmt.Sprintf("dlq-%s-%d", dlqJob.Date, timestamp) dlqJob.Replicas = 1 + dlqJob.ContainerImage = s.cfg.JobImage def, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}) if err != nil { @@ -54,7 +54,7 @@ func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob mod return models.DlqJob{}, fmt.Errorf("error getting firehose resource: %w", err) } // enrich DlqJob with firehose details - if err := enrichDlqJob(&dlqJob, def.GetResource(), s.cfg); err != nil { + if err := enrichDlqJob(&dlqJob, def.GetResource()); err != nil { return models.DlqJob{}, fmt.Errorf("error enriching dlq job: %w", err) } diff --git a/internal/server/v1/dlq/service_test.go b/internal/server/v1/dlq/service_test.go index ef44a99..d3fb0db 100644 --- a/internal/server/v1/dlq/service_test.go +++ b/internal/server/v1/dlq/service_test.go @@ -258,8 +258,7 @@ func skipTestServiceCreateDLQJob(t *testing.T) { kubeCluster := "test-kube-cluster" userEmail := "test@example.com" config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", + JobImage: "test-image", } dlqJob := models.DlqJob{ @@ -366,7 +365,7 @@ func skipTestServiceCreateDLQJob(t *testing.T) { Containers: []entropy.JobContainer{ { Name: "dlq-job", - Image: config.DlqJobImage, + Image: config.JobImage, ImagePullPolicy: "Always", SecretsVolumes: []entropy.JobSecret{ { @@ -396,7 +395,7 @@ func skipTestServiceCreateDLQJob(t *testing.T) { EnvVariables: map[string]string{ // To be updated by streaming "APP_NAME": "", // TBA - "PROMETHEUS_HOST": config.PrometheusHost, + "PROMETHEUS_HOST": "", "DEPLOYMENT_NAME": "deployment-name", "TEAM": dlqJob.Group, "TOPIC": dlqJob.Topic, @@ -449,12 +448,12 @@ func skipTestServiceCreateDLQJob(t *testing.T) { "topic": dlqJob.Topic, "job_type": "dlq", "group": dlqJob.Group, - "prometheus_host": config.PrometheusHost, + "prometheus_host": "", "batch_size": "5", "num_threads": "2", "error_types": "DESERILIAZATION_ERROR", "dlq_gcs_credential_path": updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], - "container_image": config.DlqJobImage, + "container_image": config.JobImage, "namespace": namespace, "replicas": "1", }, @@ -511,13 +510,13 @@ func skipTestServiceCreateDLQJob(t *testing.T) { ResourceID: dlqJob.ResourceID, ResourceType: dlqJob.ResourceType, Topic: dlqJob.Topic, - ContainerImage: config.DlqJobImage, + ContainerImage: config.JobImage, DlqGcsCredentialPath: updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], EnvVars: updatedEnvVars, KubeCluster: kubeCluster, Namespace: namespace, Project: firehoseResource.Project, - PrometheusHost: config.PrometheusHost, + PrometheusHost: "", CreatedAt: strfmt.DateTime(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), UpdatedAt: strfmt.DateTime(time.Date(2022, 2, 2, 1, 1, 1, 1, time.UTC)), CreatedBy: "test-created-by", @@ -526,245 +525,3 @@ func skipTestServiceCreateDLQJob(t *testing.T) { assert.Equal(t, expectedDlqJob, result) }) } - -func TestServiceGetDlqJob(t *testing.T) { - sampleJobURN := "test-dlq-job-urn" - config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", - } - t.Run("should return dlq job details", func(t *testing.T) { - entropyClient := new(mocks.ResourceServiceClient) - service := dlq.NewService(entropyClient, nil, config) - - envVars := map[string]string{ - "DLQ_BATCH_SIZE": "34", - "DLQ_ERROR_TYPES": "DEFAULT_ERROR", - "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", - "DLQ_GCS_BUCKET_NAME": "test_bucket_name", - "DLQ_GCS_CREDENTIAL_PATH": "/path/to/token", - "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilot-project", - "DLQ_INPUT_DATE": "2023-04-10", - "DLQ_NUM_THREADS": "10", - "DLQ_PREFIX_DIR": "test-firehose", - "DLQ_TOPIC_NAME": "test-topic", - "INPUT_SCHEMA_PROTO_CLASS": "test-proto", - "JAVA_TOOL_OPTIONS": "test-tool-options", - "METRIC_STATSD_TAGS": "a=b", - "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", - "SCHEMA_REGISTRY_STENCIL_URLS": "http://random-test-url", - "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", - "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", - "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", - "SINK_BIGQUERY_CREDENTIAL_PATH": "/path/to/token", - "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", - "SINK_BIGQUERY_DATASET_LOCATION": "US", - "SINK_BIGQUERY_DATASET_NAME": "bq_test", - "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilot-project", - "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", - "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", - "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", - "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", - "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true", - "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", - "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", - "SINK_TYPE": "bigquery", - "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", - } - - jobContainers := []entropy.JobContainer{ - { - Name: "dlq-job", - Image: "test/image/path", - ImagePullPolicy: "Always", - Limits: entropy.UsageSpec{ - CPU: "1", - Memory: "2000Mi", - }, - Requests: entropy.UsageSpec{ - CPU: "1", - Memory: "2000Mi", - }, - EnvVariables: envVars, - SecretsVolumes: []entropy.JobSecret{ - { - Name: "firehose-bigquery-sink-credential", - Mount: "/etc/secret/gcp", - }, - }, - }, - { - Args: []string{ - "-c", - "test args random args", - }, - Command: []string{ - "/bin/bash", - }, - ConfigMapsVolumes: []entropy.JobConfigMap{ - { - Name: "dlq-processor-telegraf", - Mount: "etc/telegraf", - }, - }, - EnvVariables: map[string]string{ - "APP_NAME": "testing", - "DEPLOYMENT_NAME": "deployment-name", - "PROMETHEUS_URL": "test/prometheus/url", - "TEAM": "test-team", - "TOPIC": "test-topic", - "environment": "production", - "organization": "test-team", "projectID": "projectID", - }, - Image: "telegraf:1.18.0-alpine", - Limits: entropy.UsageSpec{ - CPU: "100m", - Memory: "300Mi", - }, - Name: "telegraf", - Requests: entropy.UsageSpec{ - CPU: "100m", - Memory: "300Mi", - }, - }, - } - - jobConfig, err := utils.GoValToProtoStruct(entropy.JobConfig{ - Replicas: 1, - Namespace: "default", - Containers: jobContainers, - Volumes: []entropy.JobVolume{ - { - Name: "firehose-bigquery-sink-credential", - Kind: "secret", - }, - { - Name: "dlq-processor-telegraf", - Kind: "configMap", - }, - }, - }) - require.NoError(t, err) - - output, err := structpb.NewValue(map[string]any{ - "jobName": "test-dlq-job", - "namespace": "default", - "pods": nil, - }) - require.NoError(t, err) - - resourceResponse := &entropyv1beta1.GetResourceResponse{ - Resource: &entropyv1beta1.Resource{ - Urn: "test-dlq-job-urn", - Kind: "job", - Name: "gofood-test-dlq-4", - Project: "test-project", - Labels: map[string]string{ - "team": "test-team", - "prometheus_host": "http://sample-prom-host", - "topic": "test-topic", - "resource_type": "dlq", - "resource_id": "test_resource_id", - "date": "2023-10-22", - }, - CreatedAt: ×tamppb.Timestamp{Seconds: 1697176965, Nanos: 496343000}, - UpdatedAt: ×tamppb.Timestamp{Seconds: 1697215353, Nanos: 779372000}, - CreatedBy: "test@user.com", - UpdatedBy: "test@user.com", - Spec: &entropyv1beta1.ResourceSpec{ - Configs: jobConfig, - Dependencies: []*entropyv1beta1.ResourceDependency{ - { - Key: "kube_cluster", - Value: "test-kube-cluster", - }, - }, - }, - State: &entropyv1beta1.ResourceState{ - Status: entropyv1beta1.ResourceState_STATUS_COMPLETED, - Output: output, - }, - }, - } - - expectedDlqJob := &models.DlqJob{ - BatchSize: 34, - ContainerImage: "test/image/path", - CreatedAt: strfmt.DateTime(resourceResponse.Resource.CreatedAt.AsTime()), - UpdatedAt: strfmt.DateTime(resourceResponse.Resource.UpdatedAt.AsTime()), - CreatedBy: "test@user.com", - DlqGcsCredentialPath: "/path/to/token", - EnvVars: envVars, - ErrorTypes: "DEFAULT_ERROR", - KubeCluster: "test-kube-cluster", - Name: "gofood-test-dlq-4", - Namespace: "default", - NumThreads: 10, - Project: "test-project", - PrometheusHost: "http://sample-prom-host", - Replicas: 1, - ResourceType: "dlq", - Status: "\x04", - UpdatedBy: "test@user.com", - Urn: "test-dlq-job-urn", - ResourceID: "test_resource_id", - Date: "2023-10-22", - Topic: "test-topic", - } - - entropyClient.On("GetResource", context.TODO(), &entropyv1beta1.GetResourceRequest{Urn: sampleJobURN}).Return(&entropyv1beta1.GetResourceResponse{ - Resource: resourceResponse.GetResource(), - }, nil) - - dlqJob, err := service.GetDlqJob(context.TODO(), sampleJobURN) - require.NoError(t, err) - assert.Equal(t, expectedDlqJob, dlqJob) - }) - - t.Run("should return ErrJobNotFound if resource cannot be found in entropy", func(t *testing.T) { - ctx := context.TODO() - dlqJob := models.DlqJob{ - Urn: "test-job-urn", - ResourceType: "dlq", - } - config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", - } - expectedErr := status.Error(codes.NotFound, "Not found") - - entropyClient := new(mocks.ResourceServiceClient) - entropyClient.On( - "GetResource", ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.Urn}, - ).Return(nil, expectedErr) - defer entropyClient.AssertExpectations(t) - service := dlq.NewService(entropyClient, nil, config) - - _, err := service.GetDlqJob(ctx, "test-job-urn") - assert.ErrorIs(t, err, dlq.ErrJobNotFound) - }) - - t.Run("should return error when there is an error getting job in entropy", func(t *testing.T) { - // inputs - ctx := context.TODO() - dlqJob := models.DlqJob{ - Urn: "test-job-urn", - ResourceType: "dlq", - } - config := dlq.DlqJobConfig{ - PrometheusHost: "http://sample-prom-host", - DlqJobImage: "test-image", - } - expectedErr := status.Error(codes.Internal, "Any Error") - - entropyClient := new(mocks.ResourceServiceClient) - entropyClient.On( - "GetResource", ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.Urn}, - ).Return(nil, expectedErr) - defer entropyClient.AssertExpectations(t) - service := dlq.NewService(entropyClient, nil, config) - - _, err := service.GetDlqJob(ctx, "test-job-urn") - assert.ErrorIs(t, err, expectedErr) - }) -} diff --git a/swagger.yml b/swagger.yml index c226d18..54e6623 100644 --- a/swagger.yml +++ b/swagger.yml @@ -1337,6 +1337,8 @@ definitions: type: string group: type: string + team: + type: string kube_cluster: type: string namespace: