Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat(dlq): get dlq job details api (#77)
Browse files Browse the repository at this point in the history
* feat(dlq): get dlq job details api

* chore: resolve comments

* feat(test): add tests for get dlq job details service

* feat(test): add test for get dlq job handler

* chore: fix linting issues

* feat: fix issues on create APIs

---------

Co-authored-by: Stewart Jingga <stewart_jingga@yahoo.com>
  • Loading branch information
abhishekv24 and StewartJingga committed Oct 24, 2023
1 parent cee9ad8 commit 9e4de5b
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 143 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ output:
linters:
enable-all: true
disable:
- goerr113
- gomnd
- depguard
- godot
Expand Down
3 changes: 1 addition & 2 deletions cli/server/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ type optimusConfig struct {
}

type dlqConfig struct {
DlqJobImage string `mapstructure:"dlq_job_image"`
PrometheusHost string `mapstructure:"prometheus_host"`
JobImage string `mapstructure:"job_image"`
}

type serveConfig struct {
Expand Down
3 changes: 1 addition & 2 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap

wardenClient := warden.NewClient(cfg.Warden.Addr)
dlqConfig := dlq.DlqJobConfig{
DlqJobImage: cfg.Dlq.DlqJobImage,
PrometheusHost: cfg.Dlq.PrometheusHost,
JobImage: cfg.Dlq.JobImage,
}

return server.Serve(ctx, cfg.Service.Addr(), nrApp, zapLog,
Expand Down
3 changes: 3 additions & 0 deletions generated/models/dlq_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/server/v1/dlq/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ var (
ErrFirehoseNotFound = errors.New("firehose not found")
ErrEmptyConfigImage = errors.New("empty dlq job image")
ErrEmptyConfigPrometheusHost = errors.New("empty prometheus host")
ErrJobNotFound = errors.New("no dlq job is found for this URN")
)
18 changes: 14 additions & 4 deletions internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,21 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) {
}

func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) {
// sample to get job urn from route params
_ = h.jobURN(r)
ctx := r.Context()
jobURN := h.jobURN(r)

utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_job": nil,
dlqJob, err := h.service.GetDlqJob(ctx, jobURN)
if err != nil {
if errors.Is(err, ErrJobNotFound) {
utils.WriteErrMsg(w, http.StatusNotFound, ErrJobNotFound.Error())
return
}
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, map[string]any{
"dlq_job": dlqJob,
})
}

Expand Down
88 changes: 6 additions & 82 deletions internal/server/v1/dlq/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,81 +131,6 @@ func TestListTopicDates(t *testing.T) {
assert.Equal(t, topicDates, expectedMap["dlq_list"])
}

func TestErrorFromGCSClient(t *testing.T) {
eService := &mocks.ResourceServiceClient{}
gClient := &mocks.BlobStorageClient{}
handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{}))
httpWriter := &testHTTPWriter{}
httpRequest := &http.Request{}
config := &entropy.FirehoseConfig{
Stopped: false,
StopTime: nil,
Replicas: 0,
Namespace: "",
DeploymentID: "",
EnvVariables: map[string]string{
firehose.ConfigDLQBucket: "test-bucket",
firehose.ConfigDLQDirectoryPrefix: "test-prefix",
},
ResetOffset: "",
Limits: entropy.UsageSpec{},
Requests: entropy.UsageSpec{},
Telegraf: nil,
ChartValues: nil,
InitContainer: entropy.FirehoseInitContainer{},
}
configProto, _ := utils.GoValToProtoStruct(config)
eService.On(
"GetResource",
context.Background(),
&entropyv1beta1.GetResourceRequest{Urn: ""}).Return(
&entropyv1beta1.GetResourceResponse{
Resource: &entropyv1beta1.Resource{
Urn: "",
Kind: "",
Name: "",
Project: "",
Labels: nil,
CreatedAt: nil,
UpdatedAt: nil,
Spec: &entropyv1beta1.ResourceSpec{
Configs: configProto,
Dependencies: nil,
},
State: nil,
CreatedBy: "",
UpdatedBy: "",
},
}, nil)
gClient.On("ListDlqMetadata", gcs.BucketInfo{
BucketName: "test-bucket",
Prefix: "test-prefix",
Delim: "",
}).Return(nil, fmt.Errorf("test-error"))
handler.ListFirehoseDLQ(httpWriter, httpRequest)
expectedMap := make(map[string]interface{})
err := json.Unmarshal([]byte(httpWriter.messages[0]), &expectedMap)
require.NoError(t, err)
assert.Equal(t, "test-error", expectedMap["cause"])
}

func TestErrorFromFirehoseResource(t *testing.T) {
eService := &mocks.ResourceServiceClient{}
gClient := &mocks.BlobStorageClient{}
handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{}))
httpWriter := &testHTTPWriter{}
httpRequest := &http.Request{}
eService.On(
"GetResource",
context.Background(),
mock.Anything).Return(nil, fmt.Errorf("test-error"))
handler.ListFirehoseDLQ(httpWriter, httpRequest)
expectedMap := make(map[string]interface{})
err := json.Unmarshal([]byte(httpWriter.messages[0]), &expectedMap)
require.NoError(t, err)
assert.Equal(t, "test-error", expectedMap["cause"])
}

func TestListDlqJob(t *testing.T) {
var (
method = http.MethodGet
Expand Down Expand Up @@ -472,8 +397,7 @@ func skipTestCreateDlqJob(t *testing.T) {
kubeCluster := "test-kube-cluster"
userEmail := "test@example.com"
config := dlq.DlqJobConfig{
PrometheusHost: "http://sample-prom-host",
DlqJobImage: "test-image",
JobImage: "test-image",
}
envVars := map[string]string{
"SINK_TYPE": "bigquery",
Expand Down Expand Up @@ -583,7 +507,7 @@ func skipTestCreateDlqJob(t *testing.T) {
Containers: []entropy.JobContainer{
{
Name: "dlq-job",
Image: config.DlqJobImage,
Image: config.JobImage,
ImagePullPolicy: "Always",
SecretsVolumes: []entropy.JobSecret{
{
Expand Down Expand Up @@ -613,7 +537,7 @@ func skipTestCreateDlqJob(t *testing.T) {
EnvVariables: map[string]string{
// To be updated by streaming
"APP_NAME": "", // TBA
"PROMETHEUS_HOST": config.PrometheusHost,
"PROMETHEUS_HOST": "",
"DEPLOYMENT_NAME": "deployment-name",
"TEAM": "",
"TOPIC": topic,
Expand Down Expand Up @@ -674,7 +598,7 @@ func skipTestCreateDlqJob(t *testing.T) {
"topic": topic,
"job_type": "dlq",
"group": group,
"prometheus_host": config.PrometheusHost,
"prometheus_host": "",
},
CreatedBy: jobResource.CreatedBy,
UpdatedBy: jobResource.UpdatedBy,
Expand Down Expand Up @@ -732,14 +656,14 @@ func skipTestCreateDlqJob(t *testing.T) {
ErrorTypes: errorTypes,

// firehose resource
ContainerImage: config.DlqJobImage,
ContainerImage: config.JobImage,
DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"],
EnvVars: expectedEnvVars,
Group: "", //
KubeCluster: kubeCluster,
Namespace: namespace,
Project: firehoseResource.Project,
PrometheusHost: config.PrometheusHost,
PrometheusHost: "",

// hardcoded
Replicas: 0,
Expand Down
93 changes: 55 additions & 38 deletions internal/server/v1/dlq/mapper.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package dlq

import (
"errors"
"fmt"
"strconv"
"time"
"strings"

entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1"
"github.com/go-openapi/strfmt"
Expand All @@ -21,7 +22,7 @@ const (
dlqTelegrafConfigName = "dlq-processor-telegraf"
)

func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobConfig) error {
func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource) error {
var kubeCluster string
for _, dep := range res.Spec.GetDependencies() {
if dep.GetKey() == kubeClusterDependenciesKey {
Expand All @@ -42,34 +43,68 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo
if !ok {
return ErrFirehoseNamespaceInvalid
}
status := res.GetState().GetStatus().String()

// TODO: populate prometheus host using firehose's
promRemoteWriteAny, exists := modConf.Telegraf.Config.Output["prometheus_remote_write"]
if !exists {
return errors.New("missing prometheus_remote_write")
}
promRemoteWriteMap, ok := promRemoteWriteAny.(map[string]any)
if !ok {
return errors.New("invalid prometheus_remote_write")
}
promURLAny, exists := promRemoteWriteMap["url"]
if !exists {
return errors.New("missing prometheus_remote_write.url")
}
promURLString, ok := promURLAny.(string)
if !ok {
return errors.New("invalid prometheus_remote_write.url")
}

envs := modConf.EnvVariables
job.Name = buildEntropyResourceName(res.Name, "firehose", job.Topic, job.Date)
firehoseLabels := res.Labels
sinkType := envs["SINK_TYPE"]
dataType := envs["INPUT_SCHEMA_PROTO_CLASS"]
groupID := firehoseLabels["group"]
groupSlug := firehoseLabels["team"]
dlqPrefixDirectory := strings.Replace(envs["DLQ_GCS_DIRECTORY_PREFIX"], "{{ .name }}", res.Name, 1)
metricStatsDTag := fmt.Sprintf(
"namespace=%s,app=%s,sink=%s,team=%s,proto=%s,firehose=%s",
namespace,
job.Name,
sinkType,
groupSlug,
dataType,
res.Urn,
)

job.PrometheusHost = promURLString
job.Namespace = namespace
job.Status = status
job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime())
job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime())
job.Project = res.Project
job.KubeCluster = kubeCluster
job.ContainerImage = cfg.DlqJobImage
job.PrometheusHost = cfg.PrometheusHost
job.Group = groupID
job.Team = groupSlug
job.DlqGcsCredentialPath = envs["DLQ_GCS_CREDENTIAL_PATH"]
if job.DlqGcsCredentialPath == "" {
return errors.New("missing DLQ_GCS_CREDENTIAL_PATH")
}
job.EnvVars = map[string]string{
"DLQ_BATCH_SIZE": fmt.Sprintf("%d", job.BatchSize),
"DLQ_NUM_THREADS": fmt.Sprintf("%d", job.NumThreads),
"DLQ_ERROR_TYPES": job.ErrorTypes,
"DLQ_INPUT_DATE": job.Date,
"DLQ_TOPIC_NAME": job.Topic,
"METRIC_STATSD_TAGS": "a=b", // TBA
"SINK_TYPE": envs["SINK_TYPE"],
"DLQ_PREFIX_DIR": "test-firehose",
"METRIC_STATSD_TAGS": metricStatsDTag,
"SINK_TYPE": sinkType,
"DLQ_PREFIX_DIR": dlqPrefixDirectory,
"DLQ_FINISHED_STATUS_FILE": "/shared/job-finished",
"DLQ_GCS_BUCKET_NAME": envs["DLQ_GCS_BUCKET_NAME"],
"DLQ_GCS_CREDENTIAL_PATH": envs["DLQ_GCS_CREDENTIAL_PATH"],
"DLQ_GCS_CREDENTIAL_PATH": job.DlqGcsCredentialPath,
"DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envs["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"],
"JAVA_TOOL_OPTIONS": envs["JAVA_TOOL_OPTIONS"],
"_JAVA_OPTIONS": envs["_JAVA_OPTIONS"],
"INPUT_SCHEMA_PROTO_CLASS": envs["INPUT_SCHEMA_PROTO_CLASS"],
"INPUT_SCHEMA_PROTO_CLASS": dataType,
"SCHEMA_REGISTRY_STENCIL_ENABLE": envs["SCHEMA_REGISTRY_STENCIL_ENABLE"],
"SCHEMA_REGISTRY_STENCIL_URLS": envs["SCHEMA_REGISTRY_STENCIL_URLS"],
"SINK_BIGQUERY_ADD_METADATA_ENABLED": envs["SINK_BIGQUERY_ADD_METADATA_ENABLED"],
Expand All @@ -88,7 +123,6 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo
"SINK_BIGQUERY_TABLE_PARTITION_KEY": envs["SINK_BIGQUERY_TABLE_PARTITION_KEY"],
"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envs["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"],
}
job.DlqGcsCredentialPath = modConf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"]

return nil
}
Expand All @@ -113,7 +147,7 @@ func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) {
Dependencies: []*entropyv1beta1.ResourceDependency{
{
Key: "kube_cluster",
Value: job.KubeCluster, // from firehose configs.kube_cluster
Value: job.KubeCluster,
},
},
},
Expand Down Expand Up @@ -155,14 +189,11 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) {
},
},
EnvVariables: map[string]string{
// To be updated by streaming
"APP_NAME": "", // TBA
"APP_NAME": job.ResourceID,
"PROMETHEUS_HOST": job.PrometheusHost,
"DEPLOYMENT_NAME": "deployment-name",
"TEAM": job.Group,
"TEAM": job.Team,
"TOPIC": job.Topic,
"environment": "production", // TBA
"organization": "de", // TBA
"environment": "production",
"projectID": job.Project,
},
Command: []string{
Expand All @@ -183,9 +214,8 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) {
},
},
JobLabels: map[string]string{
"firehose": job.ResourceID,
"topic": job.Topic,
"date": job.Date,
"topic": job.Topic,
"date": job.Date,
},
Volumes: []entropy.JobVolume{
{
Expand Down Expand Up @@ -284,16 +314,3 @@ func buildResourceLabels(job models.DlqJob) map[string]string {
"namespace": job.Namespace,
}
}

func buildEntropyResourceName(resourceTitle, resourceType, topic, date string) string {
timestamp := time.Now().Unix()

return fmt.Sprintf(
"%s-%s-%s-%s-%d",
resourceTitle, // firehose title
resourceType, // firehose / dagger
topic, //
date, //
timestamp,
)
}

0 comments on commit 9e4de5b

Please sign in to comment.