Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
Merge branch 'DFEXP-118' of github.com:goto/dex into DFEXP-118
Browse files Browse the repository at this point in the history
  • Loading branch information
Lifosmin Simon authored and Lifosmin Simon committed Oct 23, 2023
2 parents 16f2557 + 7d351fb commit d8a0024
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 10 deletions.
208 changes: 208 additions & 0 deletions generated/models/dlq_job_form.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion internal/server/v1/dlq/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ var (
ErrFirehoseNamespaceNotFound = errors.New("could not find firehose namespace from resource output")
ErrFirehoseNamespaceInvalid = errors.New("invalid firehose namespace from resource output")
ErrFirehoseNotFound = errors.New("firehose not found")
ErrInternal = errors.New("internal_error")
)
26 changes: 21 additions & 5 deletions internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,33 @@ func (h *Handler) listDlqJobs(w http.ResponseWriter, r *http.Request) {
}

func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) {
// transform request body into DlqJob (validation?)
ctx := r.Context()
reqCtx := reqctx.From(ctx)
var dlqJob models.DlqJob
if reqCtx.UserEmail == "" {
utils.WriteErrMsg(w, http.StatusUnauthorized, "user header is required")
return
}

if err := utils.ReadJSON(r, &dlqJob); err != nil {
var body models.DlqJobForm
if err := utils.ReadJSON(r, &body); err != nil {
utils.WriteErr(w, err)
return
}
if err := body.Validate(nil); err != nil {
utils.WriteErrMsg(w, http.StatusBadRequest, err.Error())
return
}

dlqJob := models.DlqJob{
BatchSize: *body.BatchSize,
Date: *body.Date,
ErrorTypes: body.ErrorTypes,
NumThreads: *body.NumThreads,
ResourceID: *body.ResourceID,
ResourceType: *body.ResourceType,
Topic: *body.Topic,
}

// call service.CreateDLQJob
err := h.service.CreateDLQJob(ctx, reqCtx.UserEmail, &dlqJob)
if err != nil {
if errors.Is(err, ErrFirehoseNotFound) {
Expand All @@ -109,7 +125,7 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) {
utils.WriteErr(w, err)
return
}
// return

utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_job": dlqJob,
})
Expand Down
2 changes: 1 addition & 1 deletion internal/server/v1/dlq/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobCo
job.Status = status
job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime())
job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime())

job.Project = res.Project
job.KubeCluster = kubeCluster
job.ContainerImage = cfg.DlqJobImage
job.PrometheusHost = cfg.PrometheusHost
Expand Down
6 changes: 3 additions & 3 deletions internal/server/v1/dlq/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob *mo
entropyCtx := metadata.AppendToOutgoingContext(ctx, "user-id", userEmail)
rpcReq := &entropyv1beta1.CreateResourceRequest{Resource: res}
rpcResp, err := s.client.CreateResource(entropyCtx, rpcReq)
dlqJob.Urn = rpcResp.Resource.Urn
if err != nil {
outErr := ErrInternal
return outErr
return err
}

dlqJob.Urn = rpcResp.Resource.Urn

return nil
}

Expand Down
31 changes: 31 additions & 0 deletions swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,37 @@ definitions:
type: string
dlq_gcs_credential_path:
type: string
DlqJobForm:
type: object
required:
- date
- topic
- resource_id
- resource_type
- batch_size
- num_threads
properties:
date:
type: string
example: "2012-10-30"
minLength: 1
topic:
type: string
minLength: 1
resource_id:
type: string
minLength: 1
resource_type:
type: string
enum:
- firehose
error_types:
type: string
description: "List of firehose error types, comma separated"
batch_size:
type: integer
num_threads:
type: integer
Subscription:
type: object
description: "Siren subscription model: https://github.com/goto/siren/blob/v0.6.4/proto/siren.swagger.yaml#L1325"
Expand Down

0 comments on commit d8a0024

Please sign in to comment.