Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/main' into DFEXP-118
Browse files Browse the repository at this point in the history
  • Loading branch information
StewartJingga committed Oct 24, 2023
2 parents 2d4d58f + cf39840 commit 58ea2a7
Show file tree
Hide file tree
Showing 23 changed files with 4,864 additions and 81 deletions.
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ generate:
@make format

generate-mocks:
@mockery --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/siren/v1beta1/sirenv1beta1grpc --name=SirenServiceClient
@mockery --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/shield/v1beta1/shieldv1beta1grpc --name=ShieldServiceClient
@mockery --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/optimus/core/v1beta1/corev1beta1grpc --name=JobSpecificationServiceClient
@mockery --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc --name=ResourceServiceClient
@mockery --srcpkg=./internal/server/gcs --name=BlobStorageClient
@mockery --srcpkg=./internal/server/gcs --name=BlobObjectClient
@mockery --srcpkg=./internal/server/gcs --name=ObjectIterator
@mockery --with-expecter --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/siren/v1beta1/sirenv1beta1grpc --name=SirenServiceClient
@mockery --with-expecter --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/shield/v1beta1/shieldv1beta1grpc --name=ShieldServiceClient
@mockery --with-expecter --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/optimus/core/v1beta1/corev1beta1grpc --name=JobSpecificationServiceClient
@mockery --with-expecter --srcpkg=buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc --name=ResourceServiceClient
@mockery --with-expecter --srcpkg=./internal/server/gcs --name=BlobStorageClient
@mockery --with-expecter --srcpkg=./internal/server/gcs --name=BlobObjectClient
@mockery --with-expecter --srcpkg=./internal/server/gcs --name=ObjectIterator

clean: tidy
@echo "Cleaning up build directories..."
Expand Down
5 changes: 5 additions & 0 deletions cli/server/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type serverConfig struct {
Optimus optimusConfig `mapstructure:"optimus"`
StencilAddr string `mapstructure:"stencil_addr"`
Dlq dlqConfig `mapstructure:"dlq"`
Warden wardenConfig `mapstructure:"warden"`
}

type wardenConfig struct {
Addr string `mapstructure:"addr"`
}

type odinConfig struct {
Expand Down
8 changes: 5 additions & 3 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/goto/dex/internal/server/v1/optimus"
"github.com/goto/dex/pkg/logger"
"github.com/goto/dex/pkg/telemetry"
"github.com/goto/dex/warden"
)

func Commands() *cobra.Command {
Expand Down Expand Up @@ -102,8 +103,8 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
return err
}

dlqConfig := &dlq.DlqJobConfig{
// TODO: map cfg.Dlq\
wardenClient := warden.NewClient(cfg.Warden.Addr)
dlqConfig := dlq.DlqJobConfig{
DlqJobImage: cfg.Dlq.DlqJobImage,
PrometheusHost: cfg.Dlq.PrometheusHost,
}
Expand All @@ -117,6 +118,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
&gcs.Client{StorageClient: gcsClient},
cfg.Odin.Addr,
cfg.StencilAddr,
*dlqConfig,
wardenClient,
dlqConfig,
)
}
4 changes: 4 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
alertsv1 "github.com/goto/dex/internal/server/v1/alert"
dlqv1 "github.com/goto/dex/internal/server/v1/dlq"
firehosev1 "github.com/goto/dex/internal/server/v1/firehose"
iamv1 "github.com/goto/dex/internal/server/v1/iam"
kubernetesv1 "github.com/goto/dex/internal/server/v1/kubernetes"
optimusv1 "github.com/goto/dex/internal/server/v1/optimus"
projectsv1 "github.com/goto/dex/internal/server/v1/project"
"github.com/goto/dex/warden"
)

// Serve initialises all the HTTP API routes, starts listening for requests at addr, and blocks until
Expand All @@ -37,6 +39,7 @@ func Serve(ctx context.Context, addr string,
gcsClient gcs.BlobStorageClient,
odinAddr string,
stencilAddr string,
wardenClient *warden.Client,
dlqConfig dlqv1.DlqJobConfig,
) error {
alertSvc := alertsv1.NewService(sirenClient)
Expand Down Expand Up @@ -66,6 +69,7 @@ func Serve(ctx context.Context, addr string,
r.Route("/dlq", dlqv1.Routes(entropyClient, gcsClient, dlqConfig))
r.Route("/firehoses", firehosev1.Routes(entropyClient, shieldClient, alertSvc, compassClient, odinAddr, stencilAddr))
r.Route("/kubernetes", kubernetesv1.Routes(entropyClient))
r.Route("/iam", iamv1.Routes(shieldClient, wardenClient))
})

logger.Info("starting server", zap.String("addr", addr))
Expand Down
11 changes: 5 additions & 6 deletions internal/server/v1/firehose/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ const (
// Some of firehose Configs used/modified in more than one place by dex
// Refer https://goto.github.io/firehose/advance/generic/
const (
configSourceKafkaTopic = "SOURCE_KAFKA_TOPIC"
configSourceKafkaBrokers = "SOURCE_KAFKA_BROKERS"
configSourceKafkaConsumerGroup = "SOURCE_KAFKA_CONSUMER_GROUP_ID"
configSinkType = "SINK_TYPE"
configStreamName = "STREAM_NAME"
configStencilURL = "SCHEMA_REGISTRY_STENCIL_URLS"
configSourceKafkaTopic = "SOURCE_KAFKA_TOPIC"
configSourceKafkaBrokers = "SOURCE_KAFKA_BROKERS"
configSinkType = "SINK_TYPE"
configStreamName = "STREAM_NAME"
configStencilURL = "SCHEMA_REGISTRY_STENCIL_URLS"
)

const (
Expand Down
65 changes: 27 additions & 38 deletions internal/server/v1/firehose/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ func (api *firehoseAPI) handleCreate(w http.ResponseWriter, r *http.Request) {
return
}

def.Labels = cloneAndMergeMaps(def.Labels, map[string]string{
labelTitle: *def.Title,
labelGroup: groupID,
labelTeam: groupSlug,
labelStream: *def.Configs.StreamName,
labelDescription: def.Description,
})
def.Labels = cloneAndMergeMaps(def.Labels, api.buildLabels(def, groupSlug))

prj, err := project.GetProject(ctx, def.Project, api.Shield)
if err != nil {
Expand Down Expand Up @@ -179,43 +173,31 @@ func (api *firehoseAPI) handleList(w http.ResponseWriter, r *http.Request) {
return
}

includeEnv := []string{
configSinkType,
configSourceKafkaTopic,
configSourceKafkaConsumerGroup,
}

topicName := q.Get("topic_name")
kubeCluster := q.Get("kube_cluster")
sinkTypes := sinkTypeSet(q.Get("sink_type"))
var arr []models.Firehose

arr := []models.Firehose{}
for _, res := range rpcResp.GetResources() {
def, err := mapEntropyResourceToFirehose(res)
if err != nil {
utils.WriteErr(w, err)
return
}

if kubeCluster != "" && *def.Configs.KubeCluster != kubeCluster {
if kubeCluster != "" && def.Labels[labelKubeCluster] != kubeCluster {
continue
}

if topicName != "" && def.Configs.EnvVars[configSourceKafkaTopic] != topicName {
if topicName != "" && def.Labels[labelTopic] != topicName {
continue
}

_, include := sinkTypes[def.Configs.EnvVars[configSinkType]]
_, include := sinkTypes[def.Labels[labelSinkType]]
if len(sinkTypes) > 0 && !include {
continue
}

// only return selected keys to reduce list response-size.
returnEnv := map[string]string{}
for _, key := range includeEnv {
returnEnv[key] = def.Configs.EnvVars[key]
}
def.Configs.EnvVars = returnEnv

arr = append(arr, def)
}

Expand Down Expand Up @@ -263,20 +245,15 @@ func (api *firehoseAPI) handleUpdate(w http.ResponseWriter, r *http.Request) {
utils.WriteErr(w, err)
return
}
labels := cloneAndMergeMaps(existingFirehose.Labels, map[string]string{
labelGroup: groupID,
labelTeam: groupSlug,
})
if updates.Description != "" {
labels[labelDescription] = existingFirehose.Description
}

err = api.buildEnvVars(r.Context(), &existingFirehose, reqCtx.UserID, false)
if err != nil {
utils.WriteErr(w, fmt.Errorf("error building env vars: %w", err))
return
}

labels := cloneAndMergeMaps(existingFirehose.Labels, api.buildLabels(existingFirehose, groupSlug))

cfgStruct, err := makeConfigStruct(existingFirehose.Configs)
if err != nil {
utils.WriteErr(w, err)
Expand Down Expand Up @@ -334,19 +311,16 @@ func (api *firehoseAPI) handlePartialUpdate(w http.ResponseWriter, r *http.Reque
return
}

labels := existing.Labels
if req.Description != "" {
labels[labelDescription] = req.Description
}
var groupSlug string
if req.Group != "" {
groupID := req.Group
groupSlug, err := api.getGroupSlug(ctx, groupID)
groupSlug, err = api.getGroupSlug(ctx, groupID)
if err != nil {
utils.WriteErr(w, err)
return
}
labels[labelGroup] = groupID
labels[labelTeam] = groupSlug
} else {
groupSlug = existing.Labels[labelGroup]
}

if req.Configs.Stopped != nil {
Expand Down Expand Up @@ -394,6 +368,8 @@ func (api *firehoseAPI) handlePartialUpdate(w http.ResponseWriter, r *http.Reque
return
}

labels := cloneAndMergeMaps(existing.Labels, api.buildLabels(existing, groupSlug))

cfgStruct, err := makeConfigStruct(existing.Configs)
if err != nil {
utils.WriteErr(w, err)
Expand Down Expand Up @@ -430,6 +406,19 @@ func (api *firehoseAPI) handlePartialUpdate(w http.ResponseWriter, r *http.Reque
utils.WriteJSON(w, http.StatusOK, updatedFirehose)
}

func (*firehoseAPI) buildLabels(firehose models.Firehose, groupSlug string) map[string]string {
return map[string]string{
labelTitle: *firehose.Title,
labelGroup: string(*firehose.Group),
labelTeam: groupSlug,
labelStream: *firehose.Configs.StreamName,
labelDescription: firehose.Description,
labelSinkType: firehose.Configs.EnvVars[configSinkType],
labelTopic: firehose.Configs.EnvVars[configSourceKafkaTopic],
labelKubeCluster: *firehose.Configs.KubeCluster,
}
}

func (api *firehoseAPI) handleGetHistory(w http.ResponseWriter, r *http.Request) {
urn := chi.URLParam(r, pathParamURN)

Expand Down
60 changes: 36 additions & 24 deletions internal/server/v1/firehose/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
shieldv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/shield/v1beta1"
"github.com/go-openapi/strfmt"
"github.com/mitchellh/mapstructure"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/goto/dex/entropy"
Expand All @@ -26,6 +27,9 @@ const (
labelTeam = "team"
labelStream = "stream_name"
labelDescription = "description"
labelSinkType = "sink_type"
labelKubeCluster = "kube_cluster"
labelTopic = "topic"
)

var nonAlphaNumPattern = regexp.MustCompile("[^a-zA-Z0-9]+")
Expand Down Expand Up @@ -120,17 +124,6 @@ func mapEntropySpecAndLabels(firehose models.Firehose, spec *entropyv1beta1.Reso
firehose.Labels = labels
firehose.Description = labels[labelDescription]

var modConf entropy.FirehoseConfig
if err := utils.ProtoStructToGoVal(spec.GetConfigs(), &modConf); err != nil {
return firehose, err
}

var stopTime *strfmt.DateTime
if modConf.StopTime != nil {
dt := strfmt.DateTime(*modConf.StopTime)
stopTime = &dt
}

var kubeCluster string
for _, dep := range spec.GetDependencies() {
if dep.GetKey() == kubeClusterDependencyKey {
Expand All @@ -139,20 +132,39 @@ func mapEntropySpecAndLabels(firehose models.Firehose, spec *entropyv1beta1.Reso
}

streamName := labels[labelStream]
if streamName == "" {
streamName = modConf.EnvVariables[configStreamName]
}

firehose.Configs = &models.FirehoseConfig{
Image: modConf.ChartValues.ImageTag,
EnvVars: modConf.EnvVariables,
Stopped: modConf.Stopped,
StopTime: stopTime,
ResetOffset: modConf.ResetOffset,
Replicas: float64(modConf.Replicas),
StreamName: &streamName,
DeploymentID: modConf.DeploymentID,
KubeCluster: &kubeCluster,
if proto.Equal(spec.GetConfigs(), structpb.NewNullValue()) {
// Handle the "null_value" case
firehose.Configs = &models.FirehoseConfig{
StreamName: &streamName,
KubeCluster: &kubeCluster,
}
} else {
var modConf entropy.FirehoseConfig
if err := utils.ProtoStructToGoVal(spec.GetConfigs(), &modConf); err != nil {
return firehose, err
}

stopTime := strfmt.DateTime{}
if modConf.StopTime != nil {
stopTime = strfmt.DateTime(*modConf.StopTime)
}

if streamName == "" {
streamName = modConf.EnvVariables[configStreamName]
}

firehose.Configs = &models.FirehoseConfig{
Image: modConf.ChartValues.ImageTag,
EnvVars: modConf.EnvVariables,
Stopped: modConf.Stopped,
StopTime: &stopTime,
ResetOffset: modConf.ResetOffset,
Replicas: float64(modConf.Replicas),
StreamName: &streamName,
DeploymentID: modConf.DeploymentID,
KubeCluster: &kubeCluster,
}
}

return firehose, nil
Expand Down
61 changes: 61 additions & 0 deletions internal/server/v1/iam/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package iam

import (
"net/http"

"github.com/go-chi/chi/v5"

"github.com/goto/dex/internal/server/reqctx"
"github.com/goto/dex/internal/server/utils"
)

type handler struct {
service *Service
}

func NewHandler(service *Service) *handler {
return &handler{service: service}
}

func (h *handler) listUserWardenTeams(w http.ResponseWriter, r *http.Request) {
reqCtx := reqctx.From(r.Context())
const errEmailMissedInHeader = "user email not in header"

if reqCtx.UserEmail == "" {
utils.WriteErrMsg(w, http.StatusUnauthorized, errEmailMissedInHeader)
return
}

teamListResp, err := h.service.UserWardenTeamList(r.Context(), reqCtx.UserEmail)
if err != nil {
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, map[string]any{
"teams": teamListResp,
})
}

func (h *handler) linkGroupToWarden(w http.ResponseWriter, r *http.Request) {
groupID := chi.URLParam(r, "group_id")

var body struct {
WardenTeamID string `json:"warden_team_id"`
}
if err := utils.ReadJSON(r, &body); err != nil {
utils.WriteErr(w, err)
return
} else if body.WardenTeamID == "" {
utils.WriteErrMsg(w, http.StatusBadRequest, "missing warden_team_id")
return
}

resShield, err := h.service.LinkGroupToWarden(r.Context(), groupID, body.WardenTeamID)
if err != nil {
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, resShield)
}
Loading

0 comments on commit 58ea2a7

Please sign in to comment.