Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat: filter for listDLQ
Browse files Browse the repository at this point in the history
  • Loading branch information
Lifosmin Simon authored and Lifosmin Simon committed Oct 23, 2023
1 parent 24a9a34 commit 16f2557
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 133 deletions.
120 changes: 120 additions & 0 deletions internal/server/v1/dlq/fixtures/list_dlq_jobs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
{
"dlq_jobs": [
{
"batch_size": 1,
"resource_id": "test-resource-id",
"resource_type": "test-resource-type",
"topic": "test-topic",
"date": "2022-10-21",
"name": "test1-firehose-test-topic-2022-10-21",
"num_threads": 1,
"error_types": "DESERIALIZATION_ERROR",
"container_image": "test-image",
"dlq_gcs_credential_path": "/etc/secret/gcp/token",
"env_vars": {
"DLQ_BATCH_SIZE": "1",
"DLQ_NUM_THREADS": "1",
"DLQ_ERROR_TYPES": "DESERIALIZATION_ERROR",
"DLQ_INPUT_DATE": "2022-10-21",
"DLQ_TOPIC_NAME": "test-topic",
"METRIC_STATSD_TAGS": "a=b",
"SINK_TYPE": "bigquery",
"DLQ_PREFIX_DIR": "test-firehose",
"DLQ_FINISHED_STATUS_FILE": "/shared/job-finished",
"DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq",
"DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token",
"DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration",
"JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost",
"_JAVA_OPTIONS": "-Xmx1800m -Xms1800m",
"INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage",
"SCHEMA_REGISTRY_STENCIL_ENABLE": "true",
"SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities",
"SINK_BIGQUERY_ADD_METADATA_ENABLED": "true",
"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1",
"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1",
"SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token",
"SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort",
"SINK_BIGQUERY_DATASET_LOCATION": "US",
"SINK_BIGQUERY_DATASET_NAME": "bq_test",
"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration",
"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false",
"SINK_BIGQUERY_STORAGE_API_ENABLE": "true",
"SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe",
"SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1",
"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000",
"SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp",
"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true"
},
"group": "test-group",
"kube_cluster": "test-kube-cluster",
"namespace": "test-namespace",
"project": "test-project-1",
"prometheus_host": "http://sample-prom-host",
"replicas": 1,
"urn": "test-urn-1",
"status": "STATUS_UNSPECIFIED",
"created_at": "2022-12-10T00:00:00.000Z",
"created_by": "user@test.com",
"updated_at": "2023-12-10T02:00:00.000Z",
"updated_by": "user@test.com"
},
{
"batch_size": 1,
"resource_id": "test-resource-id",
"resource_type": "test-resource-type",
"topic": "test-topic",
"date": "2022-10-21",
"name": "test2-firehose-test-topic-2022-10-21",
"num_threads": 1,
"error_types": "DESERIALIZATION_ERROR",
"container_image": "test-image",
"dlq_gcs_credential_path": "/etc/secret/gcp/token",
"env_vars": {
"DLQ_BATCH_SIZE": "1",
"DLQ_NUM_THREADS": "1",
"DLQ_ERROR_TYPES": "DESERIALIZATION_ERROR",
"DLQ_INPUT_DATE": "2022-10-21",
"DLQ_TOPIC_NAME": "test-topic",
"METRIC_STATSD_TAGS": "a=b",
"SINK_TYPE": "bigquery",
"DLQ_PREFIX_DIR": "test-firehose",
"DLQ_FINISHED_STATUS_FILE": "/shared/job-finished",
"DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq",
"DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token",
"DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration",
"JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost",
"_JAVA_OPTIONS": "-Xmx1800m -Xms1800m",
"INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage",
"SCHEMA_REGISTRY_STENCIL_ENABLE": "true",
"SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities",
"SINK_BIGQUERY_ADD_METADATA_ENABLED": "true",
"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1",
"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1",
"SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token",
"SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort",
"SINK_BIGQUERY_DATASET_LOCATION": "US",
"SINK_BIGQUERY_DATASET_NAME": "bq_test",
"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration",
"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false",
"SINK_BIGQUERY_STORAGE_API_ENABLE": "true",
"SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe",
"SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1",
"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000",
"SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp",
"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true"
},
"group": "test-group",
"kube_cluster": "test-kube-cluster",
"namespace": "test-namespace",
"project": "test-project-1",
"prometheus_host": "http://sample-prom-host",
"replicas": 1,
"urn": "test-urn-2",
"status": "STATUS_UNSPECIFIED",
"created_at": "2012-10-10T04:00:00.000Z",
"created_by": "user@test.com",
"updated_at": "2013-02-12T02:04:00.000Z",
"updated_by": "user@test.com"
}
]
}
25 changes: 20 additions & 5 deletions internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,31 @@ func (h *Handler) ListFirehoseDLQ(w http.ResponseWriter, r *http.Request) {
func (h *Handler) listDlqJobs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// firehoseUrn := chi.URLParam(r, "firehoseURN")
// fetch py resource (kind = job)
// mapToDlqJob(entropyResource) -> DqlJob
dlqJob, err := h.service.ListDlqJob(ctx)
labelFilter := map[string]string{}
if resourceID := r.URL.Query().Get("resource_id"); resourceID != "" {
labelFilter["resource_id"] = resourceID
}
if resourceType := r.URL.Query().Get("resource_type"); resourceType != "" {
labelFilter["resource_type"] = resourceType
}
if date := r.URL.Query().Get("date"); date != "" {
labelFilter["date"] = date
}

dlqJob, err := h.service.ListDlqJob(ctx, labelFilter)
if err != nil {
if errors.Is(err, ErrFirehoseNotFound) {
utils.WriteErrMsg(w, http.StatusNotFound, err.Error())
return
}
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, dlqJob)
utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_jobs": dlqJob,
},
)
}

func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) {
Expand Down
159 changes: 70 additions & 89 deletions internal/server/v1/dlq/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package dlq_test
import (
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1"
"github.com/go-chi/chi/v5"
Expand All @@ -18,6 +20,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/goto/dex/entropy"
"github.com/goto/dex/generated/models"
Expand All @@ -29,6 +32,9 @@ import (
"github.com/goto/dex/mocks"
)

//go:embed fixtures/list_dlq_jobs.json
var listDlqJobsFixtureJSON []byte

const (
emailHeaderKey = "X-Auth-Email"
)
Expand Down Expand Up @@ -201,9 +207,7 @@ func TestErrorFromFirehoseResource(t *testing.T) {

func TestListDlqJob(t *testing.T) {
var (
userEmail = "user@test.com"
method = http.MethodGet
path = "/jobs"
project = "test-project-1"
namespace = "test-namespace"
resourceID = "test-resource-id"
Expand All @@ -214,10 +218,58 @@ func TestListDlqJob(t *testing.T) {
batchSize = 1
numThreads = 1
topic = "test-topic"
group = ""
group = "test-group"
)
t.Run("Should return error firehose not found because labels", func(t *testing.T) {
// initt input
path := fmt.Sprintf("/jobs?resource_id=%s&resource_type=%s&date=%s", "test-resource-id2", resourceType, date)
expectedLabels := map[string]string{
"resource_id": "test-resource-id2",
"resource_type": "test-resource-type",
"date": "2022-10-21",
}
expectedErr := status.Error(codes.NotFound, "Not found")
entropyClient := new(mocks.ResourceServiceClient)
entropyClient.On(
"ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{
Kind: entropy.ResourceKindJob, Labels: expectedLabels,
},
).Return(nil, expectedErr)
defer entropyClient.AssertExpectations(t)

response := httptest.NewRecorder()
request := httptest.NewRequest(method, path, nil)
router := getRouter()
dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router)
router.ServeHTTP(response, request)

assert.Equal(t, http.StatusNotFound, response.Code)
})

t.Run("Should return error in firehose mapping", func(t *testing.T) {
// initt input
path := fmt.Sprintf("/jobs?resource_id=")
expectedErr := status.Error(codes.Internal, "Not found")
expectedLabels := map[string]string{}
entropyClient := new(mocks.ResourceServiceClient)
entropyClient.On(
"ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{
Kind: entropy.ResourceKindJob, Labels: expectedLabels,
},
).Return(nil, expectedErr)

response := httptest.NewRecorder()
request := httptest.NewRequest(method, path, nil)
router := getRouter()
dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router)
router.ServeHTTP(response, request)

assert.Equal(t, http.StatusInternalServerError, response.Code)
})

t.Run("Should return list of dlqJobs", func(t *testing.T) {
path := fmt.Sprintf("/jobs?resource_id=%s&resource_type=%s&date=%s", resourceID, resourceType, date)

config := dlq.DlqJobConfig{
PrometheusHost: "http://sample-prom-host",
DlqJobImage: "test-image",
Expand Down Expand Up @@ -392,6 +444,8 @@ func TestListDlqJob(t *testing.T) {
},
CreatedBy: "user@test.com",
UpdatedBy: "user@test.com",
CreatedAt: timestamppb.New(time.Date(2022, time.December, 10, 0, 0, 0, 0, time.UTC)),
UpdatedAt: timestamppb.New(time.Date(2023, time.December, 10, 2, 0, 0, 0, time.UTC)),
Spec: &entropyv1beta1.ResourceSpec{
Configs: jobConfig,
Dependencies: []*entropyv1beta1.ResourceDependency{
Expand Down Expand Up @@ -424,6 +478,8 @@ func TestListDlqJob(t *testing.T) {
},
CreatedBy: "user@test.com",
UpdatedBy: "user@test.com",
CreatedAt: timestamppb.New(time.Date(2012, time.October, 10, 4, 0, 0, 0, time.UTC)),
UpdatedAt: timestamppb.New(time.Date(2013, time.February, 12, 2, 4, 0, 0, time.UTC)),
Spec: &entropyv1beta1.ResourceSpec{
Configs: jobConfig,
Dependencies: []*entropyv1beta1.ResourceDependency{
Expand All @@ -436,110 +492,35 @@ func TestListDlqJob(t *testing.T) {
},
}

expectedDlqJob := []models.DlqJob{
{
// from input
BatchSize: int64(batchSize),
ResourceID: resourceID,
ResourceType: resourceType,
Topic: topic,
Name: fmt.Sprintf(
"%s-%s-%s-%s",
"test1", // firehose title
"firehose", // firehose / dagger
topic, //
date, //
),

NumThreads: int64(numThreads),
Date: date,
ErrorTypes: errorTypes,

// firehose resource
ContainerImage: config.DlqJobImage,
DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"],
EnvVars: expectedEnvVars,
Group: "", //
KubeCluster: kubeCluster,
Namespace: namespace,
Project: project,
PrometheusHost: config.PrometheusHost,

// hardcoded
Replicas: 1,

// job resource
Urn: "test-urn-1",
Status: dummyEntropyResources[0].GetState().GetStatus().String(),
CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()),
CreatedBy: "user@test.com",
UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()),
UpdatedBy: "user@test.com",
},
{
// from input
BatchSize: int64(batchSize),
ResourceID: resourceID,
ResourceType: resourceType,
Topic: topic,
Name: fmt.Sprintf(
"%s-%s-%s-%s",
"test2", // firehose title
"firehose", // firehose / dagger
topic, //
date, //
),

NumThreads: int64(numThreads),
Date: date,
ErrorTypes: errorTypes,

// firehose resource
ContainerImage: config.DlqJobImage,
DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"],
EnvVars: expectedEnvVars,
Group: "", //
KubeCluster: kubeCluster,
Namespace: namespace,
Project: project,
PrometheusHost: config.PrometheusHost,

// hardcoded
Replicas: 1,

// job resource
Urn: "test-urn-2",
Status: dummyEntropyResources[0].GetState().GetStatus().String(),
CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()),
CreatedBy: "user@test.com",
UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()),
UpdatedBy: "user@test.com",
},
}

expectedRPCResp := &entropyv1beta1.ListResourcesResponse{
Resources: dummyEntropyResources,
}

expectedLabels := map[string]string{
"resource_id": "test-resource-id",
"resource_type": "test-resource-type",
"date": "2022-10-21",
}

entropyClient := new(mocks.ResourceServiceClient)
entropyClient.On(
"ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{
Kind: entropy.ResourceKindJob,
Kind: entropy.ResourceKindJob, Labels: expectedLabels,
},
).Return(expectedRPCResp, nil)
defer entropyClient.AssertExpectations(t)

response := httptest.NewRecorder()
request := httptest.NewRequest(method, path, nil)
request.Header.Set(emailHeaderKey, userEmail)
router := getRouter()
dlq.Routes(entropyClient, nil, config)(router)
router.ServeHTTP(response, request)

assert.Equal(t, http.StatusOK, response.Code)
resultJSON := response.Body.Bytes()
expectedJSON, err := json.Marshal(expectedDlqJob)
require.NoError(t, err)
assert.JSONEq(t, string(expectedJSON), string(resultJSON))

expectedPayload := string(listDlqJobsFixtureJSON)
assert.JSONEq(t, expectedPayload, string(resultJSON))
})
}

Expand Down
Loading

0 comments on commit 16f2557

Please sign in to comment.