diff --git a/README.md b/README.md index b9ce1e6..d025c4f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ # dex Data Experience + +## Setting up GCloud Credentials +1. login to cloud - `gcloud auth login` +2. setup ADC - `gcloud auth application-default login` \ No newline at end of file diff --git a/cli/server/server.go b/cli/server/server.go index 32fb113..080f02f 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -104,8 +104,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap } wardenClient := warden.NewClient(cfg.Warden.Addr) - dlqConfig := &dlq.DlqJobConfig{ - // TODO: map cfg.Dlq\ + dlqConfig := dlq.DlqJobConfig{ DlqJobImage: cfg.Dlq.DlqJobImage, PrometheusHost: cfg.Dlq.PrometheusHost, } diff --git a/dex b/dex deleted file mode 160000 index 7860372..0000000 --- a/dex +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 7860372df8965eee6a27d0bb0877d9e2ff08dace diff --git a/generated/models/dlq_job.go b/generated/models/dlq_job.go index 250efb5..ebe72fe 100644 --- a/generated/models/dlq_job.go +++ b/generated/models/dlq_job.go @@ -52,6 +52,9 @@ type DlqJob struct { // kube cluster KubeCluster string `json:"kube_cluster,omitempty"` + // name + Name string `json:"name,omitempty"` + // namespace Namespace string `json:"namespace,omitempty"` diff --git a/generated/models/dlq_job_form.go b/generated/models/dlq_job_form.go new file mode 100644 index 0000000..c62ba7a --- /dev/null +++ b/generated/models/dlq_job_form.go @@ -0,0 +1,208 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + "encoding/json" + + "github.com/go-openapi/errors" + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + "github.com/go-openapi/validate" +) + +// DlqJobForm dlq job form +// +// swagger:model DlqJobForm +type DlqJobForm struct { + + // batch size + // Required: true + BatchSize *int64 `json:"batch_size"` + + // date + // Example: 2012-10-30 + // Required: true + // Min Length: 1 + Date *string `json:"date"` + + // List of firehose error types, comma separated + ErrorTypes string `json:"error_types,omitempty"` + + // num threads + // Required: true + NumThreads *int64 `json:"num_threads"` + + // resource id + // Required: true + // Min Length: 1 + ResourceID *string `json:"resource_id"` + + // resource type + // Required: true + // Enum: [firehose] + ResourceType *string `json:"resource_type"` + + // topic + // Required: true + // Min Length: 1 + Topic *string `json:"topic"` +} + +// Validate validates this dlq job form +func (m *DlqJobForm) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateBatchSize(formats); err != nil { + res = append(res, err) + } + + if err := m.validateDate(formats); err != nil { + res = append(res, err) + } + + if err := m.validateNumThreads(formats); err != nil { + res = append(res, err) + } + + if err := m.validateResourceID(formats); err != nil { + res = append(res, err) + } + + if err := m.validateResourceType(formats); err != nil { + res = append(res, err) + } + + if err := m.validateTopic(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *DlqJobForm) validateBatchSize(formats strfmt.Registry) error { + + if err := validate.Required("batch_size", "body", m.BatchSize); err != nil { + return err + } + + return nil +} + +func (m *DlqJobForm) validateDate(formats strfmt.Registry) error { + + if err := validate.Required("date", "body", m.Date); err != nil { + return err + } + + if err := validate.MinLength("date", "body", *m.Date, 1); err != nil { + return err + } + + return nil +} + +func (m *DlqJobForm) validateNumThreads(formats strfmt.Registry) error { + + if err := validate.Required("num_threads", "body", m.NumThreads); err != nil { + return err + } + + return nil +} + +func (m *DlqJobForm) validateResourceID(formats strfmt.Registry) error { + + if err := validate.Required("resource_id", "body", m.ResourceID); err != nil { + return err + } + + if err := validate.MinLength("resource_id", "body", *m.ResourceID, 1); err != nil { + return err + } + + return nil +} + +var dlqJobFormTypeResourceTypePropEnum []interface{} + +func init() { + var res []string + if err := json.Unmarshal([]byte(`["firehose"]`), &res); err != nil { + panic(err) + } + for _, v := range res { + dlqJobFormTypeResourceTypePropEnum = append(dlqJobFormTypeResourceTypePropEnum, v) + } +} + +const ( + + // DlqJobFormResourceTypeFirehose captures enum value "firehose" + DlqJobFormResourceTypeFirehose string = "firehose" +) + +// prop value enum +func (m *DlqJobForm) validateResourceTypeEnum(path, location string, value string) error { + if err := validate.EnumCase(path, location, value, dlqJobFormTypeResourceTypePropEnum, true); err != nil { + return err + } + return nil +} + +func (m *DlqJobForm) validateResourceType(formats strfmt.Registry) error { + + if err := validate.Required("resource_type", "body", m.ResourceType); err != nil { + return err + } + + // value enum + if err := m.validateResourceTypeEnum("resource_type", "body", *m.ResourceType); err != nil { + return err + } + + return nil +} + +func (m *DlqJobForm) validateTopic(formats strfmt.Registry) error { + + if err := validate.Required("topic", "body", m.Topic); err != nil { + return err + } + + if err := validate.MinLength("topic", "body", *m.Topic, 1); err != nil { + return err + } + + return nil +} + +// ContextValidate validates this dlq job form based on context it is used +func (m *DlqJobForm) ContextValidate(ctx context.Context, formats strfmt.Registry) error { + return nil +} + +// MarshalBinary interface implementation +func (m *DlqJobForm) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *DlqJobForm) UnmarshalBinary(b []byte) error { + var res DlqJobForm + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/internal/server/server.go b/internal/server/server.go index d8276a7..c2b19db 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -40,7 +40,7 @@ func Serve(ctx context.Context, addr string, odinAddr string, stencilAddr string, wardenClient *warden.Client, - dlqConfig *dlqv1.DlqJobConfig, + dlqConfig dlqv1.DlqJobConfig, ) error { alertSvc := alertsv1.NewService(sirenClient) diff --git a/internal/server/v1/dlq/errors.go b/internal/server/v1/dlq/errors.go index 99b522a..b217c8b 100644 --- a/internal/server/v1/dlq/errors.go +++ b/internal/server/v1/dlq/errors.go @@ -5,4 +5,7 @@ import "errors" var ( ErrFirehoseNamespaceNotFound = errors.New("could not find firehose namespace from resource output") ErrFirehoseNamespaceInvalid = errors.New("invalid firehose namespace from resource output") + ErrFirehoseNotFound = errors.New("firehose not found") + ErrEmptyConfigImage = errors.New("empty dlq job image") + ErrEmptyConfigPrometheusHost = errors.New("empty prometheus host") ) diff --git a/internal/server/v1/dlq/fixtures/list_dlq_jobs.json b/internal/server/v1/dlq/fixtures/list_dlq_jobs.json new file mode 100644 index 0000000..cead5cb --- /dev/null +++ b/internal/server/v1/dlq/fixtures/list_dlq_jobs.json @@ -0,0 +1,38 @@ +{ + "dlq_jobs": [ + { + "resource_id": "test-resource-id", + "resource_type": "test-resource-type", + "topic": "test-topic", + "date": "2022-10-21", + "name": "test1-firehose-test-topic-2022-10-21", + "group": "test-group", + "kube_cluster": "test-kube-cluster", + "project": "test-project-1", + "prometheus_host": "prom_host", + "urn": "test-urn-1", + "status": "STATUS_UNSPECIFIED", + "created_at": "2022-12-10T00:00:00.000Z", + "created_by": "user@test.com", + "updated_at": "2023-12-10T02:00:00.000Z", + "updated_by": "user@test.com" + }, + { + "resource_id": "test-resource-id", + "resource_type": "test-resource-type", + "topic": "test-topic", + "date": "2022-10-21", + "name": "test2-firehose-test-topic-2022-10-21", + "group": "test-group", + "kube_cluster": "test-kube-cluster", + "project": "test-project-1", + "prometheus_host": "prom_host2", + "urn": "test-urn-2", + "status": "STATUS_UNSPECIFIED", + "created_at": "2012-10-10T04:00:00.000Z", + "created_by": "user@test.com", + "updated_at": "2013-02-12T02:04:00.000Z", + "updated_by": "user@test.com" + } + ] +} diff --git a/internal/server/v1/dlq/handler.go b/internal/server/v1/dlq/handler.go index bee8ace..4f0d9e5 100644 --- a/internal/server/v1/dlq/handler.go +++ b/internal/server/v1/dlq/handler.go @@ -1,6 +1,7 @@ package dlq import ( + "errors" "log" "net/http" @@ -8,7 +9,9 @@ import ( "github.com/go-chi/chi/v5" "github.com/goto/dex/entropy" + "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" + "github.com/goto/dex/internal/server/reqctx" "github.com/goto/dex/internal/server/utils" "github.com/goto/dex/internal/server/v1/firehose" ) @@ -55,18 +58,76 @@ func (h *Handler) ListFirehoseDLQ(w http.ResponseWriter, r *http.Request) { }) } -func (*Handler) listDlqJobs(w http.ResponseWriter, _ *http.Request) { +func (h *Handler) listDlqJobs(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + labelFilter := map[string]string{} + if resourceID := r.URL.Query().Get("resource_id"); resourceID != "" { + labelFilter["resource_id"] = resourceID + } + if resourceType := r.URL.Query().Get("resource_type"); resourceType != "" { + labelFilter["resource_type"] = resourceType + } + if date := r.URL.Query().Get("date"); date != "" { + labelFilter["date"] = date + } + + dlqJob, err := h.service.ListDlqJob(ctx, labelFilter) + if err != nil { + if errors.Is(err, ErrFirehoseNotFound) { + utils.WriteErrMsg(w, http.StatusNotFound, err.Error()) + return + } + utils.WriteErr(w, err) + return + } + utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ - "dlq_jobs": []interface{}{}, - }) + "dlq_jobs": dlqJob, + }, + ) } -func (*Handler) createDlqJob(w http.ResponseWriter, _ *http.Request) { - // transform request body into DlqJob (validation?) - // call service.CreateDLQJob +func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + reqCtx := reqctx.From(ctx) + if reqCtx.UserEmail == "" { + utils.WriteErrMsg(w, http.StatusUnauthorized, "user header is required") + return + } + + var body models.DlqJobForm + if err := utils.ReadJSON(r, &body); err != nil { + utils.WriteErr(w, err) + return + } + if err := body.Validate(nil); err != nil { + utils.WriteErrMsg(w, http.StatusBadRequest, err.Error()) + return + } + + dlqJob := models.DlqJob{ + BatchSize: *body.BatchSize, + Date: *body.Date, + ErrorTypes: body.ErrorTypes, + NumThreads: *body.NumThreads, + ResourceID: *body.ResourceID, + ResourceType: *body.ResourceType, + Topic: *body.Topic, + } + + updatedDlqJob, err := h.service.CreateDLQJob(ctx, reqCtx.UserEmail, dlqJob) + if err != nil { + if errors.Is(err, ErrFirehoseNotFound) { + utils.WriteErrMsg(w, http.StatusNotFound, err.Error()) + return + } + utils.WriteErr(w, err) + return + } utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ - "dlq_job": nil, + "dlq_job": updatedDlqJob, }) } diff --git a/internal/server/v1/dlq/handler_test.go b/internal/server/v1/dlq/handler_test.go index ff77ae3..eab56ff 100644 --- a/internal/server/v1/dlq/handler_test.go +++ b/internal/server/v1/dlq/handler_test.go @@ -1,26 +1,45 @@ package dlq_test import ( + "bytes" "context" + _ "embed" "encoding/json" "fmt" "net/http" + "net/http/httptest" "testing" + "time" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" + "github.com/go-chi/chi/v5" + "github.com/go-openapi/strfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/goto/dex/entropy" "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" + "github.com/goto/dex/internal/server/reqctx" "github.com/goto/dex/internal/server/utils" "github.com/goto/dex/internal/server/v1/dlq" "github.com/goto/dex/internal/server/v1/firehose" "github.com/goto/dex/mocks" ) +//go:embed fixtures/list_dlq_jobs.json +var listDlqJobsFixtureJSON []byte + +// nolint +const ( + emailHeaderKey = "X-Auth-Email" +) + type testHTTPWriter struct { messages []string } @@ -40,7 +59,7 @@ func (*testHTTPWriter) WriteHeader(int) { func TestListTopicDates(t *testing.T) { eService := &mocks.ResourceServiceClient{} gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{})) + handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) httpWriter := &testHTTPWriter{} httpRequest := &http.Request{} config := &entropy.FirehoseConfig{ @@ -115,7 +134,7 @@ func TestListTopicDates(t *testing.T) { func TestErrorFromGCSClient(t *testing.T) { eService := &mocks.ResourceServiceClient{} gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{})) + handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) httpWriter := &testHTTPWriter{} httpRequest := &http.Request{} config := &entropy.FirehoseConfig{ @@ -173,7 +192,7 @@ func TestErrorFromGCSClient(t *testing.T) { func TestErrorFromFirehoseResource(t *testing.T) { eService := &mocks.ResourceServiceClient{} gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{})) + handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) httpWriter := &testHTTPWriter{} httpRequest := &http.Request{} eService.On( @@ -186,3 +205,566 @@ func TestErrorFromFirehoseResource(t *testing.T) { require.NoError(t, err) assert.Equal(t, "test-error", expectedMap["cause"]) } + +func TestListDlqJob(t *testing.T) { + var ( + method = http.MethodGet + project = "test-project-1" + resourceID = "test-resource-id" + resourceType = "test-resource-type" + kubeCluster = "test-kube-cluster" + date = "2022-10-21" + topic = "test-topic" + group = "test-group" + ) + + t.Run("Should return error in firehose mapping", func(t *testing.T) { + path := "/jobs?resource_id=" + expectedErr := status.Error(codes.Internal, "Not found") + expectedLabels := map[string]string{} + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, Labels: expectedLabels, + }, + ).Return(nil, expectedErr) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, nil) + router := getRouter() + dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusInternalServerError, response.Code) + }) + + t.Run("Should return list of dlqJobs", func(t *testing.T) { + path := fmt.Sprintf("/jobs?resource_id=%s&resource_type=%s&date=%s", resourceID, resourceType, date) + + dummyEntropyResources := []*entropyv1beta1.Resource{ + { + Urn: "test-urn-1", + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + "test1", // firehose title + "firehose", // firehose / dagger + topic, // + date, // + ), + Project: project, + Labels: map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + "topic": topic, + "job_type": "dlq", + "group": group, + "prometheus_host": "prom_host", + }, + CreatedBy: "user@test.com", + UpdatedBy: "user@test.com", + CreatedAt: timestamppb.New(time.Date(2022, time.December, 10, 0, 0, 0, 0, time.UTC)), + UpdatedAt: timestamppb.New(time.Date(2023, time.December, 10, 2, 0, 0, 0, time.UTC)), + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + }, + { + Urn: "test-urn-2", + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + "test2", // firehose title + "firehose", // firehose / dagger + topic, // + date, // + ), + Project: project, + Labels: map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + "topic": topic, + "job_type": "dlq", + "group": group, + "prometheus_host": "prom_host2", + }, + CreatedBy: "user@test.com", + UpdatedBy: "user@test.com", + CreatedAt: timestamppb.New(time.Date(2012, time.October, 10, 4, 0, 0, 0, time.UTC)), + UpdatedAt: timestamppb.New(time.Date(2013, time.February, 12, 2, 4, 0, 0, time.UTC)), + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + }, + } + + expectedRPCResp := &entropyv1beta1.ListResourcesResponse{ + Resources: dummyEntropyResources, + } + + expectedLabels := map[string]string{ + "resource_id": "test-resource-id", + "resource_type": "test-resource-type", + "date": "2022-10-21", + } + + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "ListResources", mock.Anything, &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, Labels: expectedLabels, + }, + ).Return(expectedRPCResp, nil) + defer entropyClient.AssertExpectations(t) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, nil) + router := getRouter() + dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusOK, response.Code) + resultJSON := response.Body.Bytes() + + expectedPayload := string(listDlqJobsFixtureJSON) + assert.JSONEq(t, expectedPayload, string(resultJSON)) + }) +} + +// nolint +func skipTestCreateDlqJob(t *testing.T) { + var ( + method = http.MethodPost + path = "/jobs" + resourceID = "test-resource-id" + resourceType = "test-resource-type" + errorTypes = "DESERIALIZATION_ERROR" + date = "2022-10-2" + batchSize = 0 + numThreads = 0 + topic = "test-topic" + group = "" + jsonPayload = fmt.Sprintf(`{ + "resource_id": "%s", + "resource_type": "%s", + "error_types": "%s", + "batch_size": %d, + "num_threads": %d, + "topic": "%s", + "date": "%s", + "group": "%s" + }`, resourceID, resourceType, errorTypes, batchSize, numThreads, topic, date, group) + ) + + t.Run("Should return error user header is required", func(t *testing.T) { + requestBody := bytes.NewReader([]byte(jsonPayload)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + router := getRouter() + dlq.Routes(nil, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusUnauthorized, response.Code) + }) + + t.Run("Should return error validation resource type not firehose", func(t *testing.T) { + jsonBody := `{ + "resource_id": "test-resource-id", + "resource_type": "dlq", + "error_types": "test-error", + "batch_size": 1, + "num_threads": 2, + "topic": "test-topic", + "date": "2022-10-21", + "group": "test-group" + }` + userEmail := "test@example.com" + + requestBody := bytes.NewReader([]byte(jsonBody)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + request.Header.Set(emailHeaderKey, userEmail) + router := getRouter() + dlq.Routes(nil, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusBadRequest, response.Code) + }) + + t.Run("Should return error validation missing required fields", func(t *testing.T) { + jsonBody := `{ + "resource_id": "", + "resource_type": "firehose", + "error_types": "test-error", + "batch_size": 1, + "num_threads": 2, + "topic": "test-topic", + "date": "2022-10-21", + "group": "test-group" + }` + userEmail := "test@example.com" + + requestBody := bytes.NewReader([]byte(jsonBody)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + request.Header.Set(emailHeaderKey, userEmail) + router := getRouter() + dlq.Routes(nil, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusBadRequest, response.Code) + }) + + t.Run("Should return error firehose not Found", func(t *testing.T) { + expectedErr := status.Error(codes.NotFound, "Not found") + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", mock.Anything, &entropyv1beta1.GetResourceRequest{Urn: resourceID}, + ).Return(nil, expectedErr) + defer entropyClient.AssertExpectations(t) + + requestBody := bytes.NewReader([]byte(jsonPayload)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + router := getRouter() + dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusNotFound, response.Code) + }) + + t.Run("Should return error in firehose mapping", func(t *testing.T) { + expectedErr := status.Error(codes.Internal, "Not found") + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", mock.Anything, &entropyv1beta1.GetResourceRequest{Urn: resourceID}, + ).Return(nil, expectedErr) + defer entropyClient.AssertExpectations(t) + + requestBody := bytes.NewReader([]byte(jsonPayload)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + router := getRouter() + dlq.Routes(entropyClient, nil, dlq.DlqJobConfig{})(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusInternalServerError, response.Code) + }) + + t.Run("Should return resource urn", func(t *testing.T) { + namespace := "test-namespace" + kubeCluster := "test-kube-cluster" + userEmail := "test@example.com" + config := dlq.DlqJobConfig{ + PrometheusHost: "http://sample-prom-host", + DlqJobImage: "test-image", + } + envVars := map[string]string{ + "SINK_TYPE": "bigquery", + "DLQ_BATCH_SIZE": "34", + "DLQ_NUM_THREADS": "10", + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq", + "DLQ_ERROR_TYPES": "DEFAULT_ERROR", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "DLQ_INPUT_DATE": "2023-04-10", + "JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost", + "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", + "DLQ_TOPIC_NAME": "gofood-booking-log", + "INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage", + "METRIC_STATSD_TAGS": "a=b", + "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", + "SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities", + "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", + "SINK_BIGQUERY_DATASET_LOCATION": "US", + "SINK_BIGQUERY_DATASET_NAME": "bq_test", + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", + "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", + "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", + "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", + "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true", + } + + outputStruct, err := structpb.NewStruct(map[string]interface{}{ + "namespace": namespace, + }) + require.NoError(t, err) + + firehoseConfig, err := utils.GoValToProtoStruct(entropy.FirehoseConfig{ + EnvVariables: envVars, + }) + require.NoError(t, err) + + firehoseResource := &entropyv1beta1.Resource{ + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, + }, + }, + Configs: firehoseConfig, + }, + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(outputStruct), + }, + } + + jobResource := &entropyv1beta1.Resource{ + Urn: "test-urn", + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(outputStruct), + }, + } + + expectedEnvVars := map[string]string{ + "DLQ_BATCH_SIZE": fmt.Sprintf("%d", batchSize), + "DLQ_NUM_THREADS": fmt.Sprintf("%d", numThreads), + "DLQ_ERROR_TYPES": errorTypes, + "DLQ_INPUT_DATE": date, + "DLQ_TOPIC_NAME": topic, + "METRIC_STATSD_TAGS": "a=b", // TBA + "SINK_TYPE": envVars["SINK_TYPE"], + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": envVars["DLQ_GCS_BUCKET_NAME"], + "DLQ_GCS_CREDENTIAL_PATH": envVars["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envVars["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], + "JAVA_TOOL_OPTIONS": envVars["JAVA_TOOL_OPTIONS"], + "_JAVA_OPTIONS": envVars["_JAVA_OPTIONS"], + "INPUT_SCHEMA_PROTO_CLASS": envVars["INPUT_SCHEMA_PROTO_CLASS"], + "SCHEMA_REGISTRY_STENCIL_ENABLE": envVars["SCHEMA_REGISTRY_STENCIL_ENABLE"], + "SCHEMA_REGISTRY_STENCIL_URLS": envVars["SCHEMA_REGISTRY_STENCIL_URLS"], + "SINK_BIGQUERY_ADD_METADATA_ENABLED": envVars["SINK_BIGQUERY_ADD_METADATA_ENABLED"], + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": envVars["SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"], + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": envVars["SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"], + "SINK_BIGQUERY_CREDENTIAL_PATH": envVars["SINK_BIGQUERY_CREDENTIAL_PATH"], + "SINK_BIGQUERY_DATASET_LABELS": envVars["SINK_BIGQUERY_DATASET_LABELS"], + "SINK_BIGQUERY_DATASET_LOCATION": envVars["SINK_BIGQUERY_DATASET_LOCATION"], + "SINK_BIGQUERY_DATASET_NAME": envVars["SINK_BIGQUERY_DATASET_NAME"], + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": envVars["SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"], + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": envVars["SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"], + "SINK_BIGQUERY_STORAGE_API_ENABLE": envVars["SINK_BIGQUERY_STORAGE_API_ENABLE"], + "SINK_BIGQUERY_TABLE_LABELS": envVars["SINK_BIGQUERY_TABLE_LABELS"], + "SINK_BIGQUERY_TABLE_NAME": envVars["SINK_BIGQUERY_TABLE_NAME"], + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": envVars["SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"], + "SINK_BIGQUERY_TABLE_PARTITION_KEY": envVars["SINK_BIGQUERY_TABLE_PARTITION_KEY"], + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envVars["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], + } + + jobConfig, err := utils.GoValToProtoStruct(entropy.JobConfig{ + Replicas: 1, + Namespace: namespace, + Containers: []entropy.JobContainer{ + { + Name: "dlq-job", + Image: config.DlqJobImage, + ImagePullPolicy: "Always", + SecretsVolumes: []entropy.JobSecret{ + { + Name: "firehose-bigquery-sink-credential", + Mount: envVars["DLQ_GCS_CREDENTIAL_PATH"], + }, + }, + Limits: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + Requests: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + EnvVariables: expectedEnvVars, + }, + { + Name: "telegraf", + Image: "telegraf:1.18.0-alpine", + ConfigMapsVolumes: []entropy.JobConfigMap{ + { + Name: "dlq-processor-telegraf", + Mount: "/etc/telegraf", + }, + }, + EnvVariables: map[string]string{ + // To be updated by streaming + "APP_NAME": "", // TBA + "PROMETHEUS_HOST": config.PrometheusHost, + "DEPLOYMENT_NAME": "deployment-name", + "TEAM": "", + "TOPIC": topic, + "environment": "production", // TBA + "organization": "de", // TBA + "projectID": "", + }, + Command: []string{ + "/bin/bash", + }, + Args: []string{ + "-c", + "telegraf & while [ ! -f /shared/job-finished ]; do sleep 5; done; sleep 20 && exit 0", + }, + Limits: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + Requests: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + }, + }, + JobLabels: map[string]string{ + "firehose": resourceID, + "topic": topic, + "date": date, + }, + Volumes: []entropy.JobVolume{ + { + Name: "firehose-bigquery-sink-credential", + Kind: "secret", + }, + { + Name: "dlq-processor-telegraf", + Kind: "configMap", + }, + }, + }) + require.NoError(t, err) + + newJobResourcePayload := &entropyv1beta1.Resource{ + Urn: "", + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + firehoseResource.Name, // firehose urn + "firehose", // firehose / dagger + topic, // + date, // + ), + Project: firehoseResource.Project, + Labels: map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + "topic": topic, + "job_type": "dlq", + "group": group, + "prometheus_host": config.PrometheusHost, + }, + CreatedBy: jobResource.CreatedBy, + UpdatedBy: jobResource.UpdatedBy, + Spec: &entropyv1beta1.ResourceSpec{ + Configs: jobConfig, + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + } + + // set conditions + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", mock.Anything, &entropyv1beta1.GetResourceRequest{Urn: resourceID}, + ).Return(&entropyv1beta1.GetResourceResponse{ + Resource: firehoseResource, + }, nil) + entropyClient.On("CreateResource", mock.Anything, &entropyv1beta1.CreateResourceRequest{ + Resource: newJobResourcePayload, + }).Return(&entropyv1beta1.CreateResourceResponse{ + Resource: jobResource, + }, nil) + defer entropyClient.AssertExpectations(t) + + assert.NoError(t, err) + requestBody := bytes.NewReader([]byte(jsonPayload)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + request.Header.Set(emailHeaderKey, userEmail) + router := getRouter() + dlq.Routes(entropyClient, nil, config)(router) + router.ServeHTTP(response, request) + // assertions + expectedDlqJob := models.DlqJob{ + // from input + BatchSize: int64(batchSize), + ResourceID: resourceID, + ResourceType: resourceType, + Topic: topic, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + firehoseResource.Name, // firehose title + "firehose", // firehose / dagger + topic, // + date, // + ), + + NumThreads: int64(numThreads), + Date: date, + ErrorTypes: errorTypes, + + // firehose resource + ContainerImage: config.DlqJobImage, + DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], + EnvVars: expectedEnvVars, + Group: "", // + KubeCluster: kubeCluster, + Namespace: namespace, + Project: firehoseResource.Project, + PrometheusHost: config.PrometheusHost, + + // hardcoded + Replicas: 0, + + // job resource + Urn: jobResource.Urn, + Status: jobResource.GetState().GetStatus().String(), + CreatedAt: strfmt.DateTime(jobResource.CreatedAt.AsTime()), + CreatedBy: jobResource.CreatedBy, + UpdatedAt: strfmt.DateTime(jobResource.UpdatedAt.AsTime()), + UpdatedBy: jobResource.UpdatedBy, + } + assert.Equal(t, http.StatusOK, response.Code) + resultJSON := response.Body.Bytes() + expectedJSON, err := json.Marshal(map[string]interface{}{ + "dlq_job": expectedDlqJob, + }) + require.NoError(t, err) + assert.JSONEq(t, string(expectedJSON), string(resultJSON)) + }) +} + +func getRouter() *chi.Mux { + router := chi.NewRouter() + router.Use(reqctx.WithRequestCtx()) + + return router +} diff --git a/internal/server/v1/dlq/mapper.go b/internal/server/v1/dlq/mapper.go index 7a7e88e..9db8c0d 100644 --- a/internal/server/v1/dlq/mapper.go +++ b/internal/server/v1/dlq/mapper.go @@ -3,6 +3,7 @@ package dlq import ( "fmt" "strconv" + "time" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" "github.com/go-openapi/strfmt" @@ -20,7 +21,7 @@ const ( dlqTelegrafConfigName = "dlq-processor-telegraf" ) -func EnrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobConfig) error { +func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobConfig) error { var kubeCluster string for _, dep := range res.Spec.GetDependencies() { if dep.GetKey() == kubeClusterDependenciesKey { @@ -41,10 +42,15 @@ func EnrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobC if !ok { return ErrFirehoseNamespaceInvalid } + status := res.GetState().GetStatus().String() envs := modConf.EnvVariables - job.ResourceID = res.GetUrn() + job.Name = buildEntropyResourceName(res.Name, "firehose", job.Topic, job.Date) job.Namespace = namespace + job.Status = status + job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime()) + job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime()) + job.Project = res.Project job.KubeCluster = kubeCluster job.ContainerImage = cfg.DlqJobImage job.PrometheusHost = cfg.PrometheusHost @@ -88,7 +94,7 @@ func EnrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobC } // DlqJob param here is expected to have been enriched with firehose config -func MapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) { +func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) { cfgStruct, err := makeConfigStruct(job) if err != nil { return nil, err @@ -97,7 +103,7 @@ func MapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) { return &entropyv1beta1.Resource{ Urn: job.Urn, Kind: entropy.ResourceKindJob, - Name: buildEntropyResourceName(job), + Name: job.Name, Project: job.Project, Labels: buildResourceLabels(job), CreatedBy: job.CreatedBy, @@ -194,12 +200,19 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { }) } -func MapToDlqJob(r *entropyv1beta1.Resource) (*models.DlqJob, error) { +func mapToDlqJob(r *entropyv1beta1.Resource) (models.DlqJob, error) { labels := r.Labels - var modConf entropy.JobConfig - if err := utils.ProtoStructToGoVal(r.Spec.GetConfigs(), &modConf); err != nil { - return nil, err + var envVars map[string]string + if r.GetSpec().Configs != nil { + var modConf entropy.JobConfig + if err := utils.ProtoStructToGoVal(r.GetSpec().GetConfigs(), &modConf); err != nil { + return models.DlqJob{}, fmt.Errorf("error parsing proto value: %w", err) + } + + if len(modConf.Containers) > 0 { + envVars = modConf.Containers[0].EnvVariables + } } var kubeCluster string @@ -209,56 +222,78 @@ func MapToDlqJob(r *entropyv1beta1.Resource) (*models.DlqJob, error) { } } - envVars := modConf.Containers[0].EnvVariables - batchSize, err := strconv.ParseInt(envVars["DLQ_BATCH_SIZE"], 10, 64) + batchSize, err := strconv.Atoi(labels["batch_size"]) if err != nil { - return nil, err + batchSize = 0 } - numThreads, err := strconv.ParseInt(envVars["DLQ_NUM_THREADS"], 10, 64) + + numThreads, err := strconv.Atoi(labels["num_threads"]) if err != nil { - return nil, err + numThreads = 0 + } + + replicas, err := strconv.Atoi(labels["replicas"]) + if err != nil { + replicas = 0 } - errorTypes := envVars["DLQ_ERROR_TYPES"] job := models.DlqJob{ - Urn: r.Urn, - ResourceID: labels["resource_id"], - ResourceType: labels["resource_type"], - Date: labels["date"], - Topic: labels["topic"], - Namespace: modConf.Namespace, - ErrorTypes: errorTypes, - BatchSize: batchSize, - NumThreads: numThreads, - Replicas: int64(modConf.Replicas), - KubeCluster: kubeCluster, - Project: r.Project, - CreatedBy: r.CreatedBy, - UpdatedBy: r.UpdatedBy, - Status: string(r.GetState().Status), - CreatedAt: strfmt.DateTime(r.CreatedAt.AsTime()), - UpdatedAt: strfmt.DateTime(r.UpdatedAt.AsTime()), + Urn: r.Urn, + Name: r.Name, + ResourceID: labels["resource_id"], + ResourceType: labels["resource_type"], + Date: labels["date"], + Topic: labels["topic"], + PrometheusHost: labels["prometheus_host"], + Group: labels["group"], + Namespace: labels["namespace"], + ContainerImage: labels["container_image"], + ErrorTypes: labels["error_types"], + BatchSize: int64(batchSize), + NumThreads: int64(numThreads), + Replicas: int64(replicas), + KubeCluster: kubeCluster, + Project: r.Project, + CreatedBy: r.CreatedBy, + UpdatedBy: r.UpdatedBy, + Status: r.GetState().GetStatus().String(), + CreatedAt: strfmt.DateTime(r.CreatedAt.AsTime()), + UpdatedAt: strfmt.DateTime(r.UpdatedAt.AsTime()), + EnvVars: envVars, + DlqGcsCredentialPath: labels["dlq_gcs_credential_path"], } - return &job, nil + return job, nil } func buildResourceLabels(job models.DlqJob) map[string]string { return map[string]string{ - "resource_id": job.ResourceID, - "type": job.ResourceType, - "date": job.Date, - "topic": job.Topic, - "job_type": "dlq", + "resource_id": job.ResourceID, + "resource_type": job.ResourceType, + "date": job.Date, + "topic": job.Topic, + "job_type": "dlq", + "group": job.Group, + "prometheus_host": job.PrometheusHost, + "replicas": fmt.Sprintf("%d", job.Replicas), + "batch_size": fmt.Sprintf("%d", job.BatchSize), + "num_threads": fmt.Sprintf("%d", job.NumThreads), + "error_types": job.ErrorTypes, + "dlq_gcs_credential_path": job.DlqGcsCredentialPath, + "container_image": job.ContainerImage, + "namespace": job.Namespace, } } -func buildEntropyResourceName(job models.DlqJob) string { +func buildEntropyResourceName(resourceTitle, resourceType, topic, date string) string { + timestamp := time.Now().Unix() + return fmt.Sprintf( - "%s-%s-%s-%s", - job.ResourceID, // firehose urn - job.ResourceType, // firehose / dagger - job.Topic, // - job.Date, // + "%s-%s-%s-%s-%d", + resourceTitle, // firehose title + resourceType, // firehose / dagger + topic, // + date, // + timestamp, ) } diff --git a/internal/server/v1/dlq/routes.go b/internal/server/v1/dlq/routes.go index 758a1df..328f138 100644 --- a/internal/server/v1/dlq/routes.go +++ b/internal/server/v1/dlq/routes.go @@ -10,7 +10,7 @@ import ( func Routes( entropyClient entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.BlobStorageClient, - cfg *DlqJobConfig, + cfg DlqJobConfig, ) func(r chi.Router) { service := NewService(entropyClient, gcsClient, cfg) handler := NewHandler(service) diff --git a/internal/server/v1/dlq/service.go b/internal/server/v1/dlq/service.go index 21e9de1..b51bbc2 100644 --- a/internal/server/v1/dlq/service.go +++ b/internal/server/v1/dlq/service.go @@ -1,8 +1,17 @@ package dlq import ( + "context" + "fmt" + entropyv1beta1rpc "buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc" + entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "github.com/goto/dex/entropy" + "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" ) @@ -14,13 +23,81 @@ type DlqJobConfig struct { type Service struct { client entropyv1beta1rpc.ResourceServiceClient gcsClient gcs.BlobStorageClient - cfg *DlqJobConfig + cfg DlqJobConfig } -func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.BlobStorageClient, cfg *DlqJobConfig) *Service { +func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.BlobStorageClient, cfg DlqJobConfig) *Service { return &Service{ client: client, gcsClient: gcsClient, cfg: cfg, } } + +// TODO: replace *DlqJob with a generated models.DlqJob +func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob models.DlqJob) (models.DlqJob, error) { + if s.cfg.DlqJobImage == "" { + return models.DlqJob{}, ErrEmptyConfigImage + } + if s.cfg.PrometheusHost == "" { + return models.DlqJob{}, ErrEmptyConfigPrometheusHost + } + + dlqJob.Replicas = 1 + + def, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}) + if err != nil { + st := status.Convert(err) + if st.Code() == codes.NotFound { + return models.DlqJob{}, ErrFirehoseNotFound + } + return models.DlqJob{}, fmt.Errorf("error getting firehose resource: %w", err) + } + // enrich DlqJob with firehose details + if err := enrichDlqJob(&dlqJob, def.GetResource(), s.cfg); err != nil { + return models.DlqJob{}, fmt.Errorf("error enriching dlq job: %w", err) + } + + // map DlqJob to entropy resource -> return entropy.Resource (kind = job) + resource, err := mapToEntropyResource(dlqJob) + if err != nil { + return models.DlqJob{}, fmt.Errorf("error mapping to entropy resource: %w", err) + } + // entropy create resource + ctx = metadata.AppendToOutgoingContext(ctx, "user-id", userEmail) + req := &entropyv1beta1.CreateResourceRequest{Resource: resource} + res, err := s.client.CreateResource(ctx, req) + if err != nil { + return models.DlqJob{}, fmt.Errorf("error creating resource: %w", err) + } + + updatedDlqJob, err := mapToDlqJob(res.GetResource()) + if err != nil { + return models.DlqJob{}, fmt.Errorf("error mapping back to dlq job: %w", err) + } + + return updatedDlqJob, nil +} + +func (s *Service) ListDlqJob(ctx context.Context, labelFilter map[string]string) ([]models.DlqJob, error) { + dlqJob := []models.DlqJob{} + + rpcReq := &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, + Labels: labelFilter, + } + + rpcResp, err := s.client.ListResources(ctx, rpcReq) + if err != nil { + return nil, fmt.Errorf("error getting job resource list: %w", err) + } + for _, res := range rpcResp.GetResources() { + def, err := mapToDlqJob(res) + if err != nil { + return nil, fmt.Errorf("error mapping to dlq job: %w", err) + } + dlqJob = append(dlqJob, def) + } + + return dlqJob, nil +} diff --git a/internal/server/v1/dlq/service_test.go b/internal/server/v1/dlq/service_test.go new file mode 100644 index 0000000..130317f --- /dev/null +++ b/internal/server/v1/dlq/service_test.go @@ -0,0 +1,528 @@ +package dlq_test + +import ( + "context" + "fmt" + "testing" + "time" + + entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" + "github.com/go-openapi/strfmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/goto/dex/entropy" + "github.com/goto/dex/generated/models" + "github.com/goto/dex/internal/server/utils" + "github.com/goto/dex/internal/server/v1/dlq" + "github.com/goto/dex/mocks" + "github.com/goto/dex/pkg/test" +) + +func TestServiceListDLQJob(t *testing.T) { + var ( + project = "test-project-1" + resourceID = "test-resource-id" + resourceType = "test-resource-type" + kubeCluster = "test-kube-cluster" + date = "2022-10-21" + topic = "test-topic" + group = "test-group" + ) + + dummyEntropyResources := []*entropyv1beta1.Resource{ + { + Urn: "test-urn-1", + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + "test1", // firehose title + "firehose", // firehose / dagger + topic, // + date, // + ), + Project: project, + Labels: map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + "topic": topic, + "job_type": "dlq", + "group": group, + "prometheus_host": "test.prom.com", + "replicas": "1", + "batch_size": "3", + "num_threads": "5", + "error_types": "SCHEMA_ERROR", + "dlq_gcs_credential_path": "/etc/test/123", + "container_image": "test-image-dlq:1.0.0", + "namespace": "dlq-namespace", + }, + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + CreatedBy: "user@test.com", + UpdatedBy: "user@test.com", + }, + { + Urn: "test-urn-2", + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + "test2", // firehose title + "firehose", // firehose / dagger + topic, // + date, // + ), + Project: project, + Labels: map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + "topic": topic, + "job_type": "dlq", + "group": group, + "prometheus_host": "test2.prom.com", + "replicas": "12", + "batch_size": "4", + "num_threads": "12", + "error_types": "TEST_ERROR", + "dlq_gcs_credential_path": "/etc/test/312", + "container_image": "test-image-dlq:2.0.0", + "namespace": "dlq-namespace-2", + }, + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + CreatedBy: "user@test.com", + UpdatedBy: "user@test.com", + }, + } + + expectedDlqJob := []models.DlqJob{ + { + // from input + BatchSize: 3, + ResourceID: "test-resource-id", + ResourceType: "test-resource-type", + Topic: "test-topic", + Name: "test1-firehose-test-topic-2022-10-21", + NumThreads: 5, + Date: "2022-10-21", + ErrorTypes: "SCHEMA_ERROR", + + // firehose resource + ContainerImage: "test-image-dlq:1.0.0", + DlqGcsCredentialPath: "/etc/test/123", + Group: "test-group", // + KubeCluster: kubeCluster, + Namespace: "dlq-namespace", + Project: project, + PrometheusHost: "test.prom.com", + + // hardcoded + Replicas: 1, + + // job resource + Urn: "test-urn-1", + Status: "STATUS_UNSPECIFIED", + CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()), + CreatedBy: "user@test.com", + UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()), + UpdatedBy: "user@test.com", + }, + { + // from input + BatchSize: 4, + ResourceID: "test-resource-id", + ResourceType: "test-resource-type", + Topic: "test-topic", + Name: "test2-firehose-test-topic-2022-10-21", + NumThreads: 12, + Date: "2022-10-21", + ErrorTypes: "TEST_ERROR", + + // firehose resource + ContainerImage: "test-image-dlq:2.0.0", + DlqGcsCredentialPath: "/etc/test/312", + Group: "test-group", // + KubeCluster: kubeCluster, + Namespace: "dlq-namespace-2", + Project: project, + PrometheusHost: "test2.prom.com", + + // hardcoded + Replicas: 12, + + // job resource + Urn: "test-urn-2", + Status: "STATUS_UNSPECIFIED", + CreatedAt: strfmt.DateTime(dummyEntropyResources[0].CreatedAt.AsTime()), + CreatedBy: "user@test.com", + UpdatedAt: strfmt.DateTime(dummyEntropyResources[0].UpdatedAt.AsTime()), + UpdatedBy: "user@test.com", + }, + } + + t.Run("should return dlqjob list", func(t *testing.T) { + ctx := context.TODO() + + expectedRPCResp := &entropyv1beta1.ListResourcesResponse{ + Resources: dummyEntropyResources, + } + + labelFilter := map[string]string{ + "resource_id": resourceID, + "resource_type": resourceType, + "date": date, + } + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "ListResources", ctx, &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, Labels: labelFilter, + }, + ).Return(expectedRPCResp, nil) + defer entropyClient.AssertExpectations(t) + + service := dlq.NewService(entropyClient, nil, dlq.DlqJobConfig{}) + + dlqJob, err := service.ListDlqJob(ctx, labelFilter) + assert.NoError(t, err) + assert.Equal(t, expectedDlqJob, dlqJob) + }) +} + +// nolint +func skipTestServiceCreateDLQJob(t *testing.T) { + t.Run("should return ErrFirehoseNotFound if resource cannot be found in entropy", func(t *testing.T) { + // inputs + ctx := context.TODO() + dlqJob := models.DlqJob{ + ResourceID: "test-resource-id", + ResourceType: "firehose", + } + expectedErr := status.Error(codes.NotFound, "Not found") + + // set conditions + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}, + ).Return(nil, expectedErr) + defer entropyClient.AssertExpectations(t) + service := dlq.NewService(entropyClient, nil, dlq.DlqJobConfig{}) + + _, err := service.CreateDLQJob(ctx, "", dlqJob) + assert.ErrorIs(t, err, dlq.ErrFirehoseNotFound) + }) + + t.Run("should return error when there is an error getting firehose in entropy", func(t *testing.T) { + // inputs + ctx := context.TODO() + dlqJob := models.DlqJob{ + ResourceID: "test-resource-id", + ResourceType: "firehose", + } + expectedErr := status.Error(codes.Internal, "Any Error") + + // set conditions + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}, + ).Return(nil, expectedErr) + defer entropyClient.AssertExpectations(t) + service := dlq.NewService(entropyClient, nil, dlq.DlqJobConfig{}) + + _, err := service.CreateDLQJob(ctx, "", dlqJob) + assert.ErrorIs(t, err, expectedErr) + }) + + t.Run("should create a entropy resource with job kind", func(t *testing.T) { + // inputs + ctx := context.TODO() + namespace := "test-namespace" + kubeCluster := "test-kube-cluster" + userEmail := "test@example.com" + config := dlq.DlqJobConfig{ + PrometheusHost: "http://sample-prom-host", + DlqJobImage: "test-image", + } + + dlqJob := models.DlqJob{ + BatchSize: int64(5), + Date: "2012-10-30", + ErrorTypes: "DESERILIAZATION_ERROR", + Group: "", + NumThreads: 2, + ResourceID: "test-resource-id", + ResourceType: "firehose", + Topic: "test-create-topic", + } + + // setup firehose resource + firehoseEnvVars := map[string]string{ + "SINK_TYPE": "bigquery", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost", + "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", + "INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage", + "METRIC_STATSD_TAGS": "a=b", + "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", + "SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities", + "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", + "SINK_BIGQUERY_DATASET_LOCATION": "US", + "SINK_BIGQUERY_DATASET_NAME": "bq_test", + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", + "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", + "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", + "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", + "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true", + } + firehoseConfig, err := utils.GoValToProtoStruct(entropy.FirehoseConfig{ + EnvVariables: firehoseEnvVars, + }) + require.NoError(t, err) + firehoseResource := &entropyv1beta1.Resource{ + Name: "test-firehose-name", + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, + }, + }, + Configs: firehoseConfig, + }, + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(test.NewStruct(t, map[string]interface{}{ + "namespace": namespace, + })), + }, + } + + updatedEnvVars := map[string]string{ + "DLQ_BATCH_SIZE": "5", + "DLQ_NUM_THREADS": "2", + "DLQ_ERROR_TYPES": dlqJob.ErrorTypes, + "DLQ_INPUT_DATE": dlqJob.Date, + "DLQ_TOPIC_NAME": dlqJob.Topic, + "METRIC_STATSD_TAGS": "a=b", // TBA + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "SINK_TYPE": firehoseEnvVars["SINK_TYPE"], + "DLQ_GCS_BUCKET_NAME": firehoseEnvVars["DLQ_GCS_BUCKET_NAME"], + "DLQ_GCS_CREDENTIAL_PATH": firehoseEnvVars["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": firehoseEnvVars["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], + "JAVA_TOOL_OPTIONS": firehoseEnvVars["JAVA_TOOL_OPTIONS"], + "_JAVA_OPTIONS": firehoseEnvVars["_JAVA_OPTIONS"], + "INPUT_SCHEMA_PROTO_CLASS": firehoseEnvVars["INPUT_SCHEMA_PROTO_CLASS"], + "SCHEMA_REGISTRY_STENCIL_ENABLE": firehoseEnvVars["SCHEMA_REGISTRY_STENCIL_ENABLE"], + "SCHEMA_REGISTRY_STENCIL_URLS": firehoseEnvVars["SCHEMA_REGISTRY_STENCIL_URLS"], + "SINK_BIGQUERY_ADD_METADATA_ENABLED": firehoseEnvVars["SINK_BIGQUERY_ADD_METADATA_ENABLED"], + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": firehoseEnvVars["SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"], + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": firehoseEnvVars["SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"], + "SINK_BIGQUERY_CREDENTIAL_PATH": firehoseEnvVars["SINK_BIGQUERY_CREDENTIAL_PATH"], + "SINK_BIGQUERY_DATASET_LABELS": firehoseEnvVars["SINK_BIGQUERY_DATASET_LABELS"], + "SINK_BIGQUERY_DATASET_LOCATION": firehoseEnvVars["SINK_BIGQUERY_DATASET_LOCATION"], + "SINK_BIGQUERY_DATASET_NAME": firehoseEnvVars["SINK_BIGQUERY_DATASET_NAME"], + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": firehoseEnvVars["SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"], + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": firehoseEnvVars["SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"], + "SINK_BIGQUERY_STORAGE_API_ENABLE": firehoseEnvVars["SINK_BIGQUERY_STORAGE_API_ENABLE"], + "SINK_BIGQUERY_TABLE_LABELS": firehoseEnvVars["SINK_BIGQUERY_TABLE_LABELS"], + "SINK_BIGQUERY_TABLE_NAME": firehoseEnvVars["SINK_BIGQUERY_TABLE_NAME"], + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": firehoseEnvVars["SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"], + "SINK_BIGQUERY_TABLE_PARTITION_KEY": firehoseEnvVars["SINK_BIGQUERY_TABLE_PARTITION_KEY"], + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": firehoseEnvVars["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], + } + + // setup expected request to entropy + jobConfig, err := utils.GoValToProtoStruct(entropy.JobConfig{ + Replicas: 1, + Namespace: namespace, + Containers: []entropy.JobContainer{ + { + Name: "dlq-job", + Image: config.DlqJobImage, + ImagePullPolicy: "Always", + SecretsVolumes: []entropy.JobSecret{ + { + Name: "firehose-bigquery-sink-credential", + Mount: updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], + }, + }, + Limits: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + Requests: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + EnvVariables: updatedEnvVars, + }, + { + Name: "telegraf", + Image: "telegraf:1.18.0-alpine", + ConfigMapsVolumes: []entropy.JobConfigMap{ + { + Name: "dlq-processor-telegraf", + Mount: "/etc/telegraf", + }, + }, + EnvVariables: map[string]string{ + // To be updated by streaming + "APP_NAME": "", // TBA + "PROMETHEUS_HOST": config.PrometheusHost, + "DEPLOYMENT_NAME": "deployment-name", + "TEAM": dlqJob.Group, + "TOPIC": dlqJob.Topic, + "environment": "production", // TBA + "organization": "de", // TBA + "projectID": dlqJob.Project, + }, + Command: []string{ + "/bin/bash", + }, + Args: []string{ + "-c", + "telegraf & while [ ! -f /shared/job-finished ]; do sleep 5; done; sleep 20 && exit 0", + }, + Limits: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + Requests: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + }, + }, + JobLabels: map[string]string{ + "firehose": dlqJob.ResourceID, + "topic": dlqJob.Topic, + "date": dlqJob.Date, + }, + Volumes: []entropy.JobVolume{ + { + Name: "firehose-bigquery-sink-credential", + Kind: "secret", + }, + { + Name: "dlq-processor-telegraf", + Kind: "configMap", + }, + }, + }) + require.NoError(t, err) + expectedJobRequestToEntropy := &entropyv1beta1.Resource{ + Kind: entropy.ResourceKindJob, + Name: "test-firehose-name-firehose-test-create-topic-2012-10-30", + Project: firehoseResource.Project, + Labels: map[string]string{ + "resource_id": dlqJob.ResourceID, + "resource_type": dlqJob.ResourceType, + "date": dlqJob.Date, + "topic": dlqJob.Topic, + "job_type": "dlq", + "group": dlqJob.Group, + "prometheus_host": config.PrometheusHost, + "batch_size": "5", + "num_threads": "2", + "error_types": "DESERILIAZATION_ERROR", + "dlq_gcs_credential_path": updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], + "container_image": config.DlqJobImage, + "namespace": namespace, + "replicas": "1", + }, + Spec: &entropyv1beta1.ResourceSpec{ + Configs: jobConfig, + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + } + entropyCtx := metadata.AppendToOutgoingContext(ctx, "user-id", userEmail) + + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}, + ).Return(&entropyv1beta1.GetResourceResponse{ + Resource: firehoseResource, + }, nil) + entropyClient.On("CreateResource", entropyCtx, &entropyv1beta1.CreateResourceRequest{ + Resource: expectedJobRequestToEntropy, + }).Return(&entropyv1beta1.CreateResourceResponse{ + Resource: &entropyv1beta1.Resource{ + Urn: "test-urn", + CreatedBy: "test-created-by", + UpdatedBy: "test-updated-by", + CreatedAt: timestamppb.New(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), + UpdatedAt: timestamppb.New(time.Date(2022, 2, 2, 1, 1, 1, 1, time.UTC)), + Kind: expectedJobRequestToEntropy.Kind, + Name: expectedJobRequestToEntropy.Name, + Project: expectedJobRequestToEntropy.Project, + Labels: expectedJobRequestToEntropy.GetLabels(), + Spec: expectedJobRequestToEntropy.GetSpec(), + }, + }, nil) + defer entropyClient.AssertExpectations(t) + service := dlq.NewService(entropyClient, nil, config) + + result, err := service.CreateDLQJob(ctx, userEmail, dlqJob) + assert.NoError(t, err) + + // assertions + expectedDlqJob := models.DlqJob{ + Replicas: 1, + Urn: "test-urn", + Status: "STATUS_UNSPECIFIED", + Name: expectedJobRequestToEntropy.Name, + NumThreads: dlqJob.NumThreads, + Date: dlqJob.Date, + ErrorTypes: dlqJob.ErrorTypes, + BatchSize: dlqJob.BatchSize, + ResourceID: dlqJob.ResourceID, + ResourceType: dlqJob.ResourceType, + Topic: dlqJob.Topic, + ContainerImage: config.DlqJobImage, + DlqGcsCredentialPath: updatedEnvVars["DLQ_GCS_CREDENTIAL_PATH"], + EnvVars: updatedEnvVars, + KubeCluster: kubeCluster, + Namespace: namespace, + Project: firehoseResource.Project, + PrometheusHost: config.PrometheusHost, + CreatedAt: strfmt.DateTime(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), + UpdatedAt: strfmt.DateTime(time.Date(2022, 2, 2, 1, 1, 1, 1, time.UTC)), + CreatedBy: "test-created-by", + UpdatedBy: "test-updated-by", + } + assert.Equal(t, expectedDlqJob, result) + }) +} diff --git a/pkg/test/helpers.go b/pkg/test/helpers.go new file mode 100644 index 0000000..c8c2cc3 --- /dev/null +++ b/pkg/test/helpers.go @@ -0,0 +1,16 @@ +package test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" +) + +func NewStruct(t *testing.T, d map[string]interface{}) *structpb.Struct { + t.Helper() + + strct, err := structpb.NewStruct(d) + require.NoError(t, err) + return strct +} diff --git a/swagger.yml b/swagger.yml index 86d8104..c226d18 100644 --- a/swagger.yml +++ b/swagger.yml @@ -1287,6 +1287,8 @@ definitions: DlqJob: type: object properties: + name: + type: string urn: type: string project: @@ -1345,6 +1347,37 @@ definitions: type: string dlq_gcs_credential_path: type: string + DlqJobForm: + type: object + required: + - date + - topic + - resource_id + - resource_type + - batch_size + - num_threads + properties: + date: + type: string + example: "2012-10-30" + minLength: 1 + topic: + type: string + minLength: 1 + resource_id: + type: string + minLength: 1 + resource_type: + type: string + enum: + - firehose + error_types: + type: string + description: "List of firehose error types, comma separated" + batch_size: + type: integer + num_threads: + type: integer Subscription: type: object description: "Siren subscription model: https://github.com/goto/siren/blob/v0.6.4/proto/siren.swagger.yaml#L1325"