Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat: fix issues on create APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
StewartJingga committed Oct 24, 2023
1 parent ef8aa6a commit e6da136
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 310 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ output:
linters:
enable-all: true
disable:
- goerr113
- gomnd
- depguard
- godot
Expand Down
3 changes: 1 addition & 2 deletions cli/server/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ type optimusConfig struct {
}

type dlqConfig struct {
DlqJobImage string `mapstructure:"dlq_job_image"`
PrometheusHost string `mapstructure:"prometheus_host"`
JobImage string `mapstructure:"job_image"`
}

type serveConfig struct {
Expand Down
3 changes: 1 addition & 2 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap

wardenClient := warden.NewClient(cfg.Warden.Addr)
dlqConfig := dlq.DlqJobConfig{
DlqJobImage: cfg.Dlq.DlqJobImage,
PrometheusHost: cfg.Dlq.PrometheusHost,
JobImage: cfg.Dlq.JobImage,
}

return server.Serve(ctx, cfg.Service.Addr(), nrApp, zapLog,
Expand Down
5 changes: 4 additions & 1 deletion generated/models/dlq_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) {
})
}

func (h *Handler) GetDlqJob(w http.ResponseWriter, r *http.Request) {
func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
jobURN := h.jobURN(r)

Expand All @@ -145,7 +145,9 @@ func (h *Handler) GetDlqJob(w http.ResponseWriter, r *http.Request) {
return
}

utils.WriteJSON(w, http.StatusOK, dlqJob)
utils.WriteJSON(w, http.StatusOK, map[string]any{
"dlq_job": dlqJob,
})
}

func (*Handler) firehoseURN(r *http.Request) string {
Expand Down
13 changes: 6 additions & 7 deletions internal/server/v1/dlq/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ func skipTestCreateDlqJob(t *testing.T) {
kubeCluster := "test-kube-cluster"
userEmail := "test@example.com"
config := dlq.DlqJobConfig{
PrometheusHost: "http://sample-prom-host",
DlqJobImage: "test-image",
JobImage: "test-image",
}
envVars := map[string]string{
"SINK_TYPE": "bigquery",
Expand Down Expand Up @@ -508,7 +507,7 @@ func skipTestCreateDlqJob(t *testing.T) {
Containers: []entropy.JobContainer{
{
Name: "dlq-job",
Image: config.DlqJobImage,
Image: config.JobImage,
ImagePullPolicy: "Always",
SecretsVolumes: []entropy.JobSecret{
{
Expand Down Expand Up @@ -538,7 +537,7 @@ func skipTestCreateDlqJob(t *testing.T) {
EnvVariables: map[string]string{
// To be updated by streaming
"APP_NAME": "", // TBA
"PROMETHEUS_HOST": config.PrometheusHost,
"PROMETHEUS_HOST": "",
"DEPLOYMENT_NAME": "deployment-name",
"TEAM": "",
"TOPIC": topic,
Expand Down Expand Up @@ -599,7 +598,7 @@ func skipTestCreateDlqJob(t *testing.T) {
"topic": topic,
"job_type": "dlq",
"group": group,
"prometheus_host": config.PrometheusHost,
"prometheus_host": "",
},
CreatedBy: jobResource.CreatedBy,
UpdatedBy: jobResource.UpdatedBy,
Expand Down Expand Up @@ -657,14 +656,14 @@ func skipTestCreateDlqJob(t *testing.T) {
ErrorTypes: errorTypes,

// firehose resource
ContainerImage: config.DlqJobImage,
ContainerImage: config.JobImage,
DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"],
EnvVars: expectedEnvVars,
Group: "", //
KubeCluster: kubeCluster,
Namespace: namespace,
Project: firehoseResource.Project,
PrometheusHost: config.PrometheusHost,
PrometheusHost: "",

// hardcoded
Replicas: 0,
Expand Down
93 changes: 55 additions & 38 deletions internal/server/v1/dlq/mapper.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package dlq

import (
"errors"
"fmt"
"strconv"
"time"
"strings"

entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1"
"github.com/go-openapi/strfmt"
Expand All @@ -21,7 +22,7 @@ const (
dlqTelegrafConfigName = "dlq-processor-telegraf"
)

func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobConfig) error {
func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource) error {
var kubeCluster string
for _, dep := range res.Spec.GetDependencies() {
if dep.GetKey() == kubeClusterDependenciesKey {
Expand All @@ -42,34 +43,68 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo
if !ok {
return ErrFirehoseNamespaceInvalid
}
status := res.GetState().GetStatus().String()

// TODO: populate prometheus host using firehose's
promRemoteWriteAny, exists := modConf.Telegraf.Config.Output["prometheus_remote_write"]
if !exists {
return errors.New("missing prometheus_remote_write")
}
promRemoteWriteMap, ok := promRemoteWriteAny.(map[string]any)
if !ok {
return errors.New("invalid prometheus_remote_write")
}
promURLAny, exists := promRemoteWriteMap["url"]
if !exists {
return errors.New("missing prometheus_remote_write.url")
}
promURLString, ok := promURLAny.(string)
if !ok {
return errors.New("invalid prometheus_remote_write.url")
}

envs := modConf.EnvVariables
job.Name = buildEntropyResourceName(res.Name, "firehose", job.Topic, job.Date)
firehoseLabels := res.Labels
sinkType := envs["SINK_TYPE"]
dataType := envs["INPUT_SCHEMA_PROTO_CLASS"]
groupID := firehoseLabels["group"]
groupSlug := firehoseLabels["team"]
dlqPrefixDirectory := strings.Replace(envs["DLQ_GCS_DIRECTORY_PREFIX"], "{{ .name }}", res.Name, 1)
metricStatsDTag := fmt.Sprintf(
"namespace=%s,app=%s,sink=%s,team=%s,proto=%s,firehose=%s",
namespace,
job.Name,
sinkType,
groupSlug,
dataType,
res.Urn,
)

job.PrometheusHost = promURLString
job.Namespace = namespace
job.Status = status
job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime())
job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime())
job.Project = res.Project
job.KubeCluster = kubeCluster
job.ContainerImage = cfg.DlqJobImage
job.PrometheusHost = cfg.PrometheusHost
job.Group = groupID
job.Team = groupSlug
job.DlqGcsCredentialPath = envs["DLQ_GCS_CREDENTIAL_PATH"]
if job.DlqGcsCredentialPath == "" {
return errors.New("missing DLQ_GCS_CREDENTIAL_PATH")
}
job.EnvVars = map[string]string{
"DLQ_BATCH_SIZE": fmt.Sprintf("%d", job.BatchSize),
"DLQ_NUM_THREADS": fmt.Sprintf("%d", job.NumThreads),
"DLQ_ERROR_TYPES": job.ErrorTypes,
"DLQ_INPUT_DATE": job.Date,
"DLQ_TOPIC_NAME": job.Topic,
"METRIC_STATSD_TAGS": "a=b", // TBA
"SINK_TYPE": envs["SINK_TYPE"],
"DLQ_PREFIX_DIR": "test-firehose",
"METRIC_STATSD_TAGS": metricStatsDTag,
"SINK_TYPE": sinkType,
"DLQ_PREFIX_DIR": dlqPrefixDirectory,
"DLQ_FINISHED_STATUS_FILE": "/shared/job-finished",
"DLQ_GCS_BUCKET_NAME": envs["DLQ_GCS_BUCKET_NAME"],
"DLQ_GCS_CREDENTIAL_PATH": envs["DLQ_GCS_CREDENTIAL_PATH"],
"DLQ_GCS_CREDENTIAL_PATH": job.DlqGcsCredentialPath,
"DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envs["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"],
"JAVA_TOOL_OPTIONS": envs["JAVA_TOOL_OPTIONS"],
"_JAVA_OPTIONS": envs["_JAVA_OPTIONS"],
"INPUT_SCHEMA_PROTO_CLASS": envs["INPUT_SCHEMA_PROTO_CLASS"],
"INPUT_SCHEMA_PROTO_CLASS": dataType,
"SCHEMA_REGISTRY_STENCIL_ENABLE": envs["SCHEMA_REGISTRY_STENCIL_ENABLE"],
"SCHEMA_REGISTRY_STENCIL_URLS": envs["SCHEMA_REGISTRY_STENCIL_URLS"],
"SINK_BIGQUERY_ADD_METADATA_ENABLED": envs["SINK_BIGQUERY_ADD_METADATA_ENABLED"],
Expand All @@ -88,7 +123,6 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo
"SINK_BIGQUERY_TABLE_PARTITION_KEY": envs["SINK_BIGQUERY_TABLE_PARTITION_KEY"],
"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envs["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"],
}
job.DlqGcsCredentialPath = modConf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"]

return nil
}
Expand All @@ -113,7 +147,7 @@ func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) {
Dependencies: []*entropyv1beta1.ResourceDependency{
{
Key: "kube_cluster",
Value: job.KubeCluster, // from firehose configs.kube_cluster
Value: job.KubeCluster,
},
},
},
Expand Down Expand Up @@ -155,14 +189,11 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) {
},
},
EnvVariables: map[string]string{
// To be updated by streaming
"APP_NAME": "", // TBA
"APP_NAME": job.ResourceID,
"PROMETHEUS_HOST": job.PrometheusHost,
"DEPLOYMENT_NAME": "deployment-name",
"TEAM": job.Group,
"TEAM": job.Team,
"TOPIC": job.Topic,
"environment": "production", // TBA
"organization": "de", // TBA
"environment": "production",
"projectID": job.Project,
},
Command: []string{
Expand All @@ -183,9 +214,8 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) {
},
},
JobLabels: map[string]string{
"firehose": job.ResourceID,
"topic": job.Topic,
"date": job.Date,
"topic": job.Topic,
"date": job.Date,
},
Volumes: []entropy.JobVolume{
{
Expand Down Expand Up @@ -284,16 +314,3 @@ func buildResourceLabels(job models.DlqJob) map[string]string {
"namespace": job.Namespace,
}
}

func buildEntropyResourceName(resourceTitle, resourceType, topic, date string) string {
timestamp := time.Now().Unix()

return fmt.Sprintf(
"%s-%s-%s-%s-%d",
resourceTitle, // firehose title
resourceType, // firehose / dagger
topic, //
date, //
timestamp,
)
}
2 changes: 1 addition & 1 deletion internal/server/v1/dlq/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Routes(
return func(r chi.Router) {
r.Get("/firehose/{firehose_urn}", handler.ListFirehoseDLQ)
r.Get("/jobs", handler.listDlqJobs)
r.Get("/jobs/{job_urn}", handler.GetDlqJob)
r.Get("/jobs/{job_urn}", handler.getDlqJob)
r.Post("/jobs", handler.createDlqJob)
}
}
14 changes: 7 additions & 7 deletions internal/server/v1/dlq/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dlq
import (
"context"
"fmt"
"time"

entropyv1beta1rpc "buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc"
entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1"
Expand All @@ -16,8 +17,7 @@ import (
)

type DlqJobConfig struct {
DlqJobImage string `mapstructure:"dlq_job_image"`
PrometheusHost string `mapstructure:"prometheus_host"`
JobImage string
}

type Service struct {
Expand All @@ -36,14 +36,14 @@ func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.Bl

// TODO: replace *DlqJob with a generated models.DlqJob
func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob models.DlqJob) (models.DlqJob, error) {
if s.cfg.DlqJobImage == "" {
if s.cfg.JobImage == "" {
return models.DlqJob{}, ErrEmptyConfigImage
}
if s.cfg.PrometheusHost == "" {
return models.DlqJob{}, ErrEmptyConfigPrometheusHost
}

timestamp := time.Now().Unix()
dlqJob.Name = fmt.Sprintf("dlq-%s-%d", dlqJob.Date, timestamp)
dlqJob.Replicas = 1
dlqJob.ContainerImage = s.cfg.JobImage

def, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID})
if err != nil {
Expand All @@ -54,7 +54,7 @@ func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob mod
return models.DlqJob{}, fmt.Errorf("error getting firehose resource: %w", err)
}
// enrich DlqJob with firehose details
if err := enrichDlqJob(&dlqJob, def.GetResource(), s.cfg); err != nil {
if err := enrichDlqJob(&dlqJob, def.GetResource()); err != nil {
return models.DlqJob{}, fmt.Errorf("error enriching dlq job: %w", err)
}

Expand Down
Loading

0 comments on commit e6da136

Please sign in to comment.