From 16f255796d124eed984d7763ba906db9108fa7a3 Mon Sep 17 00:00:00 2001 From: Lifosmin Simon Date: Mon, 23 Oct 2023 20:54:38 +0700 Subject: [PATCH] feat: filter for listDLQ --- .../server/v1/dlq/fixtures/list_dlq_jobs.json | 120 +++++++++++++ internal/server/v1/dlq/handler.go | 25 ++- internal/server/v1/dlq/handler_test.go | 159 ++++++++---------- internal/server/v1/dlq/mapper.go | 1 + internal/server/v1/dlq/service.go | 5 +- internal/server/v1/dlq/service_test.go | 87 ++++++---- 6 files changed, 264 insertions(+), 133 deletions(-) create mode 100644 internal/server/v1/dlq/fixtures/list_dlq_jobs.json diff --git a/internal/server/v1/dlq/fixtures/list_dlq_jobs.json b/internal/server/v1/dlq/fixtures/list_dlq_jobs.json new file mode 100644 index 0000000..6274919 --- /dev/null +++ b/internal/server/v1/dlq/fixtures/list_dlq_jobs.json @@ -0,0 +1,120 @@ +{ + "dlq_jobs": [ + { + "batch_size": 1, + "resource_id": "test-resource-id", + "resource_type": "test-resource-type", + "topic": "test-topic", + "date": "2022-10-21", + "name": "test1-firehose-test-topic-2022-10-21", + "num_threads": 1, + "error_types": "DESERIALIZATION_ERROR", + "container_image": "test-image", + "dlq_gcs_credential_path": "/etc/secret/gcp/token", + "env_vars": { + "DLQ_BATCH_SIZE": "1", + "DLQ_NUM_THREADS": "1", + "DLQ_ERROR_TYPES": "DESERIALIZATION_ERROR", + "DLQ_INPUT_DATE": "2022-10-21", + "DLQ_TOPIC_NAME": "test-topic", + "METRIC_STATSD_TAGS": "a=b", + "SINK_TYPE": "bigquery", + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost", + "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", + "INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage", + "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", + "SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities", + "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", + "SINK_BIGQUERY_DATASET_LOCATION": "US", + "SINK_BIGQUERY_DATASET_NAME": "bq_test", + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", + "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", + "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", + "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", + "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true" + }, + "group": "test-group", + "kube_cluster": "test-kube-cluster", + "namespace": "test-namespace", + "project": "test-project-1", + "prometheus_host": "http://sample-prom-host", + "replicas": 1, + "urn": "test-urn-1", + "status": "STATUS_UNSPECIFIED", + "created_at": "2022-12-10T00:00:00.000Z", + "created_by": "user@test.com", + "updated_at": "2023-12-10T02:00:00.000Z", + "updated_by": "user@test.com" + }, + { + "batch_size": 1, + "resource_id": "test-resource-id", + "resource_type": "test-resource-type", + "topic": "test-topic", + "date": "2022-10-21", + "name": "test2-firehose-test-topic-2022-10-21", + "num_threads": 1, + "error_types": "DESERIALIZATION_ERROR", + "container_image": "test-image", + "dlq_gcs_credential_path": "/etc/secret/gcp/token", + "env_vars": { + "DLQ_BATCH_SIZE": "1", + "DLQ_NUM_THREADS": "1", + "DLQ_ERROR_TYPES": "DESERIALIZATION_ERROR", + "DLQ_INPUT_DATE": "2022-10-21", + "DLQ_TOPIC_NAME": "test-topic", + "METRIC_STATSD_TAGS": "a=b", + "SINK_TYPE": "bigquery", + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost", + "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", + "INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage", + "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", + "SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities", + "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", + "SINK_BIGQUERY_DATASET_LOCATION": "US", + "SINK_BIGQUERY_DATASET_NAME": "bq_test", + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", + "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", + "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", + "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", + "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true" + }, + "group": "test-group", + "kube_cluster": "test-kube-cluster", + "namespace": "test-namespace", + "project": "test-project-1", + "prometheus_host": "http://sample-prom-host", + "replicas": 1, + "urn": "test-urn-2", + "status": "STATUS_UNSPECIFIED", + "created_at": "2012-10-10T04:00:00.000Z", + "created_by": "user@test.com", + "updated_at": "2013-02-12T02:04:00.000Z", + "updated_by": "user@test.com" + } + ] +} diff --git a/internal/server/v1/dlq/handler.go b/internal/server/v1/dlq/handler.go index b2c31cf..fa12117 100644 --- a/internal/server/v1/dlq/handler.go +++ b/internal/server/v1/dlq/handler.go @@ -61,16 +61,31 @@ func (h *Handler) ListFirehoseDLQ(w http.ResponseWriter, r *http.Request) { func (h *Handler) listDlqJobs(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // firehoseUrn := chi.URLParam(r, "firehoseURN") - // fetch py resource (kind = job) - // mapToDlqJob(entropyResource) -> DqlJob - dlqJob, err := h.service.ListDlqJob(ctx) + labelFilter := map[string]string{} + if resourceID := r.URL.Query().Get("resource_id"); resourceID != "" { + labelFilter["resource_id"] = resourceID + } + if resourceType := r.URL.Query().Get("resource_type"); resourceType != "" { + labelFilter["resource_type"] = resourceType + } + if date := r.URL.Query().Get("date"); date != "" { + labelFilter["date"] = date + } + + dlqJob, err := h.service.ListDlqJob(ctx, labelFilter) if err != nil { + if errors.Is(err, ErrFirehoseNotFound) { + utils.WriteErrMsg(w, http.StatusNotFound, err.Error()) + return + } utils.WriteErr(w, err) return } - utils.WriteJSON(w, http.StatusOK, dlqJob) + utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ + "dlq_jobs": dlqJob, + }, + ) } func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { diff --git a/internal/server/v1/dlq/handler_test.go b/internal/server/v1/dlq/handler_test.go index 5bda249..e184a90 100644 --- a/internal/server/v1/dlq/handler_test.go +++ b/internal/server/v1/dlq/handler_test.go @@ -3,11 +3,13 @@ package dlq_test import ( "bytes" "context" + _ "embed" "encoding/json" "fmt" "net/http" "net/http/httptest" "testing" + "time" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" "github.com/go-chi/chi/v5" @@ -18,6 +20,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/goto/dex/entropy" "github.com/goto/dex/generated/models" @@ -29,6 +32,9 @@ import ( "github.com/goto/dex/mocks" ) +//go:embed fixtures/list_dlq_jobs.json +var listDlqJobsFixtureJSON []byte + const ( emailHeaderKey = "X-Auth-Email" ) @@ -201,9 +207,7 @@ func TestErrorFromFirehoseResource(t *testing.T) { func TestListDlqJob(t *testing.T) { var ( - userEmail = "user@test.com" method = http.MethodGet - path = "/jobs" project = "test-project-1" namespace = "test-namespace" resourceID = "test-resource-id" @@ -214,10 +218,58 @@ func TestListDlqJob(t *testing.T) { batchSize = 1 numThreads = 1 topic = "test-topic" - group = "" + group = "test-group" ) + t.Run("Should return error firehose not found because labels", func(t *testing.T) { + // initt input + path := fmt.Sprintf("/jobs?resource_id=%s&resource_type=%s&date=%s", "test-resource-id2", resourceType, date) + expectedLabels := map[string]string{ + "resource_id": "test-resource-id2", + "resource_type": "test-resource-type", + "date": "2022-10-21", + } + expectedErr := status.Error(codes.NotFound, "Not found") + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, Labels: expectedLabels, + }, + ).Return(nil, expectedErr) + defer entropyClient.AssertExpectations(t) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, nil) + router := getRouter() + dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusNotFound, response.Code) + }) + + t.Run("Should return error in firehose mapping", func(t *testing.T) { + // initt input + path := fmt.Sprintf("/jobs?resource_id=") + expectedErr := status.Error(codes.Internal, "Not found") + expectedLabels := map[string]string{} + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, Labels: expectedLabels, + }, + ).Return(nil, expectedErr) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, nil) + router := getRouter() + dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusInternalServerError, response.Code) + }) t.Run("Should return list of dlqJobs", func(t *testing.T) { + path := fmt.Sprintf("/jobs?resource_id=%s&resource_type=%s&date=%s", resourceID, resourceType, date) + config := dlq.DlqJobConfig{ PrometheusHost: "http://sample-prom-host", DlqJobImage: "test-image", @@ -392,6 +444,8 @@ func TestListDlqJob(t *testing.T) { }, CreatedBy: "user@test.com", UpdatedBy: "user@test.com", + CreatedAt: timestamppb.New(time.Date(2022, time.December, 10, 0, 0, 0, 0, time.UTC)), + UpdatedAt: timestamppb.New(time.Date(2023, time.December, 10, 2, 0, 0, 0, time.UTC)), Spec: &entropyv1beta1.ResourceSpec{ Configs: jobConfig, Dependencies: []*entropyv1beta1.ResourceDependency{ @@ -424,6 +478,8 @@ func TestListDlqJob(t *testing.T) { }, CreatedBy: "user@test.com", UpdatedBy: "user@test.com", + CreatedAt: timestamppb.New(time.Date(2012, time.October, 10, 4, 0, 0, 0, time.UTC)), + UpdatedAt: timestamppb.New(time.Date(2013, time.February, 12, 2, 4, 0, 0, time.UTC)), Spec: &entropyv1beta1.ResourceSpec{ Configs: jobConfig, Dependencies: []*entropyv1beta1.ResourceDependency{ @@ -436,110 +492,35 @@ func TestListDlqJob(t *testing.T) { }, } - expectedDlqJob := []models.DlqJob{ - { - // from input - BatchSize: int64(batchSize), - ResourceID: resourceID, - ResourceType: resourceType, - Topic: topic, - Name: fmt.Sprintf( - "%s-%s-%s-%s", - "test1", // firehose title - "firehose", // firehose / dagger - topic, // - date, // - ), - - NumThreads: int64(numThreads), - Date: date, - ErrorTypes: errorTypes, - - // firehose resource - ContainerImage: config.DlqJobImage, - DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], - EnvVars: expectedEnvVars, - Group: "", // - KubeCluster: kubeCluster, - Namespace: namespace, - Project: project, - PrometheusHost: config.PrometheusHost, - - // hardcoded - Replicas: 1, - - // job resource - Urn: "test-urn-1", - Status: dummyEntropyResources[0].GetState().GetStatus().String(), - CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()), - CreatedBy: "user@test.com", - UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()), - UpdatedBy: "user@test.com", - }, - { - // from input - BatchSize: int64(batchSize), - ResourceID: resourceID, - ResourceType: resourceType, - Topic: topic, - Name: fmt.Sprintf( - "%s-%s-%s-%s", - "test2", // firehose title - "firehose", // firehose / dagger - topic, // - date, // - ), - - NumThreads: int64(numThreads), - Date: date, - ErrorTypes: errorTypes, - - // firehose resource - ContainerImage: config.DlqJobImage, - DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], - EnvVars: expectedEnvVars, - Group: "", // - KubeCluster: kubeCluster, - Namespace: namespace, - Project: project, - PrometheusHost: config.PrometheusHost, - - // hardcoded - Replicas: 1, - - // job resource - Urn: "test-urn-2", - Status: dummyEntropyResources[0].GetState().GetStatus().String(), - CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()), - CreatedBy: "user@test.com", - UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()), - UpdatedBy: "user@test.com", - }, - } - expectedRPCResp := &entropyv1beta1.ListResourcesResponse{ Resources: dummyEntropyResources, } + expectedLabels := map[string]string{ + "resource_id": "test-resource-id", + "resource_type": "test-resource-type", + "date": "2022-10-21", + } + entropyClient := new(mocks.ResourceServiceClient) entropyClient.On( "ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{ - Kind: entropy.ResourceKindJob, + Kind: entropy.ResourceKindJob, Labels: expectedLabels, }, ).Return(expectedRPCResp, nil) defer entropyClient.AssertExpectations(t) response := httptest.NewRecorder() request := httptest.NewRequest(method, path, nil) - request.Header.Set(emailHeaderKey, userEmail) router := getRouter() dlq.Routes(entropyClient, nil, config)(router) router.ServeHTTP(response, request) + assert.Equal(t, http.StatusOK, response.Code) resultJSON := response.Body.Bytes() - expectedJSON, err := json.Marshal(expectedDlqJob) - require.NoError(t, err) - assert.JSONEq(t, string(expectedJSON), string(resultJSON)) + + expectedPayload := string(listDlqJobsFixtureJSON) + assert.JSONEq(t, expectedPayload, string(resultJSON)) }) } diff --git a/internal/server/v1/dlq/mapper.go b/internal/server/v1/dlq/mapper.go index 66b7c39..1654e9d 100644 --- a/internal/server/v1/dlq/mapper.go +++ b/internal/server/v1/dlq/mapper.go @@ -233,6 +233,7 @@ func MapToDlqJob(r *entropyv1beta1.Resource) (*models.DlqJob, error) { Date: labels["date"], Topic: labels["topic"], PrometheusHost: labels["prometheus_host"], + Group: labels["group"], Namespace: modConf.Namespace, ContainerImage: modConf.Containers[0].Image, ErrorTypes: errorTypes, diff --git a/internal/server/v1/dlq/service.go b/internal/server/v1/dlq/service.go index 0205bca..fe46b50 100644 --- a/internal/server/v1/dlq/service.go +++ b/internal/server/v1/dlq/service.go @@ -68,11 +68,12 @@ func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob *mo return nil } -func (s *Service) ListDlqJob(ctx context.Context) ([]models.DlqJob, error) { +func (s *Service) ListDlqJob(ctx context.Context, labelFilter map[string]string) ([]models.DlqJob, error) { dlqJob := []models.DlqJob{} rpcReq := &entropyv1beta1.ListResourcesRequest{ - Kind: entropy.ResourceKindJob, + Kind: entropy.ResourceKindJob, + Labels: labelFilter, } rpcResp, err := s.client.ListResources(ctx, rpcReq) diff --git a/internal/server/v1/dlq/service_test.go b/internal/server/v1/dlq/service_test.go index ebfad5d..2640c64 100644 --- a/internal/server/v1/dlq/service_test.go +++ b/internal/server/v1/dlq/service_test.go @@ -33,7 +33,7 @@ func TestServiceListDLQJob(t *testing.T) { batchSize = 1 numThreads = 1 topic = "test-topic" - group = "" + group = "test-group" config = dlq.DlqJobConfig{ PrometheusHost: "http://sample-prom-host", DlqJobImage: "test-image", @@ -256,27 +256,20 @@ func TestServiceListDLQJob(t *testing.T) { expectedDlqJob := []models.DlqJob{ { // from input - BatchSize: int64(batchSize), - ResourceID: resourceID, - ResourceType: resourceType, - Topic: topic, - Name: fmt.Sprintf( - "%s-%s-%s-%s", - "test1", // firehose title - "firehose", // firehose / dagger - topic, // - date, // - ), - - NumThreads: int64(numThreads), - Date: date, - ErrorTypes: errorTypes, + BatchSize: 1, + ResourceID: "test-resource-id", + ResourceType: "test-resource-type", + Topic: "test-topic", + Name: "test1-firehose-test-topic-2022-10-21", + NumThreads: 1, + Date: "2022-10-21", + ErrorTypes: "DESERIALIZATION_ERROR", // firehose resource ContainerImage: config.DlqJobImage, DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], EnvVars: expectedEnvVars, - Group: "", // + Group: "test-group", // KubeCluster: kubeCluster, Namespace: namespace, Project: project, @@ -287,7 +280,7 @@ func TestServiceListDLQJob(t *testing.T) { // job resource Urn: "test-urn-1", - Status: dummyEntropyResources[0].GetState().GetStatus().String(), + Status: "STATUS_UNSPECIFIED", CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()), CreatedBy: "user@test.com", UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()), @@ -295,27 +288,20 @@ func TestServiceListDLQJob(t *testing.T) { }, { // from input - BatchSize: int64(batchSize), - ResourceID: resourceID, - ResourceType: resourceType, - Topic: topic, - Name: fmt.Sprintf( - "%s-%s-%s-%s", - "test2", // firehose title - "firehose", // firehose / dagger - topic, // - date, // - ), - - NumThreads: int64(numThreads), - Date: date, - ErrorTypes: errorTypes, + BatchSize: 1, + ResourceID: "test-resource-id", + ResourceType: "test-resource-type", + Topic: "test-topic", + Name: "test2-firehose-test-topic-2022-10-21", + NumThreads: 1, + Date: "2022-10-21", + ErrorTypes: "DESERIALIZATION_ERROR", // firehose resource ContainerImage: config.DlqJobImage, DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], EnvVars: expectedEnvVars, - Group: "", // + Group: "test-group", // KubeCluster: kubeCluster, Namespace: namespace, Project: project, @@ -326,7 +312,7 @@ func TestServiceListDLQJob(t *testing.T) { // job resource Urn: "test-urn-2", - Status: dummyEntropyResources[0].GetState().GetStatus().String(), + Status: "STATUS_UNSPECIFIED", CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()), CreatedBy: "user@test.com", UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()), @@ -334,6 +320,28 @@ func TestServiceListDLQJob(t *testing.T) { }, } + t.Run("Should return error firehose not found because labels", func(t *testing.T) { + ctx := context.TODO() + + labelFilter := map[string]string{ + "resource_id": "test-resource-id2", + "resource_type": "test-resource-type", + "date": "2022-10-21", + } + expectedErr := status.Error(codes.NotFound, "Not found") + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "ListResources", ctx, &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, Labels: labelFilter, + }, + ).Return(nil, expectedErr) + defer entropyClient.AssertExpectations(t) + service := dlq.NewService(entropyClient, nil, config) + + _, err := service.ListDlqJob(ctx, labelFilter) + assert.ErrorIs(t, err, dlq.ErrFirehoseNotFound) + }) + t.Run("should return dlqjob list", func(t *testing.T) { ctx := context.TODO() @@ -341,17 +349,22 @@ func TestServiceListDLQJob(t *testing.T) { Resources: dummyEntropyResources, } + labelFilter := map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + } entropyClient := new(mocks.ResourceServiceClient) entropyClient.On( "ListResources", ctx, &entropyv1beta1.ListResourcesRequest{ - Kind: entropy.ResourceKindJob, + Kind: entropy.ResourceKindJob, Labels: labelFilter, }, ).Return(expectedRPCResp, nil) defer entropyClient.AssertExpectations(t) service := dlq.NewService(entropyClient, nil, config) - dlqJob, err := service.ListDlqJob(ctx) + dlqJob, err := service.ListDlqJob(ctx, labelFilter) assert.NoError(t, err) assert.Equal(t, expectedDlqJob, dlqJob) })