Skip to content

Commit

Permalink
feat: add tolerations feature
Browse files Browse the repository at this point in the history
  • Loading branch information
spy16 committed Apr 14, 2023
1 parent 4e03452 commit 0bcabc6
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 9 deletions.
113 changes: 106 additions & 7 deletions modules/firehose/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,36 @@ type (
)

type driverConf struct {
Labels map[string]string `json:"labels,omitempty"`
Telegraf *Telegraf `json:"telegraf"`
Namespace string `json:"namespace" validate:"required"`
ChartValues ChartValues `json:"chart_values" validate:"required"`
Limits UsageSpec `json:"limits,omitempty" validate:"required"`
Requests UsageSpec `json:"requests,omitempty" validate:"required"`
Labels map[string]string `json:"labels,omitempty"`
Telegraf *Telegraf `json:"telegraf"`
Namespace string `json:"namespace" validate:"required"`
ChartValues ChartValues `json:"chart_values" validate:"required"`
Limits UsageSpec `json:"limits,omitempty" validate:"required"`
Requests UsageSpec `json:"requests,omitempty" validate:"required"`
Tolerations map[string]Toleration `json:"tolerations"`
InitContainer InitContainer `json:"init_container"`

GCSSinkCredential string `json:"gcs_sink_credential,omitempty"`
DLQGCSSinkCredential string `json:"dlq_gcs_sink_credential,omitempty"`
BigQuerySinkCredential string `json:"big_query_sink_credential,omitempty"`
}

type InitContainer struct {
Enabled bool `json:"enabled"`

Args []string `json:"args"`
Command []string `json:"command"`

Repository string `json:"repository"`
ImageTag string `json:"image_tag"`
PullPolicy string `json:"pull_policy"`
}

type Toleration struct {
Key string `json:"key"`
Value string `json:"value"`
Effect string `json:"effect"`
Operator string `json:"operator"`
}

type UsageSpec struct {
Expand All @@ -94,7 +118,8 @@ type transientData struct {
ResetOffsetTo string `json:"reset_offset_to,omitempty"`
}

func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*helm.ReleaseConfig, error) {
func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config,
kubeOut kubernetes.Output) (*helm.ReleaseConfig, error) {
var telegrafConf Telegraf
if conf.Telegraf != nil && conf.Telegraf.Enabled {
telegrafTags, err := renderLabels(conf.Telegraf.Config.AdditionalGlobalTags, res.Labels)
Expand All @@ -112,6 +137,17 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*h
}
}

sinkType := conf.EnvVariables["SINK_TYPE"]
var tolerations []map[string]any
if t, ok := fd.conf.Tolerations[sinkType]; ok {
tolerations = append(tolerations, map[string]any{
"key": t.Key,
"value": t.Value,
"effect": t.Effect,
"operator": t.Operator,
})
}

entropyLabels := map[string]string{
labelDeployment: conf.DeploymentID,
labelOrchestrator: orchestratorLabelValue,
Expand All @@ -122,6 +158,56 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*h
return nil, err
}

var volumes []map[string]any
var volumeMounts []map[string]any

newVolume := func(name string) map[string]any {
return map[string]any{
"name": name,
"defaultMode": 420,
"items": []map[string]any{
{"key": "token", "path": "auth.json"},
},
"secretName": name,
}
}

if fd.conf.GCSSinkCredential != "" {
const mountPath = "/etc/secret/blob-gcs-sink"
const credentialPath = mountPath + "/auth.json"

volumes = append(volumes, newVolume(fd.conf.GCSSinkCredential))
volumeMounts = append(volumeMounts, map[string]any{
"name": fd.conf.GCSSinkCredential,
"mountPath": mountPath,
})
conf.EnvVariables["SINK_BLOB_GCS_CREDENTIAL_PATH"] = credentialPath
}

if fd.conf.DLQGCSSinkCredential != "" {
const mountPath = "/etc/secret/dlq-gcs"
const credentialPath = mountPath + "/auth.json"

volumes = append(volumes, newVolume(fd.conf.DLQGCSSinkCredential))
volumeMounts = append(volumeMounts, map[string]any{
"name": fd.conf.DLQGCSSinkCredential,
"mountPath": mountPath,
})
conf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"] = credentialPath
}

if fd.conf.BigQuerySinkCredential != "" {
const mountPath = "/etc/secret/bigquery-sink"
const credentialPath = mountPath + "/auth.json"

volumes = append(volumes, newVolume(fd.conf.BigQuerySinkCredential))
volumeMounts = append(volumeMounts, map[string]any{
"name": fd.conf.BigQuerySinkCredential,
"mountPath": mountPath,
})
conf.EnvVariables["SINK_BIGQUERY_CREDENTIAL_PATH"] = credentialPath
}

rc := helm.DefaultReleaseConfig()
rc.Name = conf.DeploymentID
rc.Repository = chartRepo
Expand Down Expand Up @@ -149,6 +235,19 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*h
"memory": conf.Requests.Memory,
},
},
"tolerations": tolerations,
"volumeMounts": volumeMounts,
"volumes": volumes,
},
"init-firehose": map[string]any{
"enabled": fd.conf.InitContainer.Enabled,
"image": map[string]any{
"repository": fd.conf.InitContainer.Repository,
"pullPolicy": fd.conf.InitContainer.PullPolicy,
"tag": fd.conf.InitContainer.ImageTag,
},
"command": fd.conf.InitContainer.Command,
"args": fd.conf.InitContainer.Args,
},
"telegraf": map[string]any{
"enabled": telegrafConf.Enabled,
Expand Down
2 changes: 1 addition & 1 deletion modules/firehose/driver_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (fd *firehoseDriver) Output(ctx context.Context, exr module.ExpandedResourc
func (fd *firehoseDriver) refreshOutput(ctx context.Context, r resource.Resource,
conf Config, output Output, kubeOut kubernetes.Output,
) (json.RawMessage, error) {
rc, err := fd.getHelmRelease(r, conf)
rc, err := fd.getHelmRelease(r, conf, kubeOut)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion modules/firehose/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource)
func (fd *firehoseDriver) releaseSync(ctx context.Context, r resource.Resource,
isCreate bool, conf Config, kubeOut kubernetes.Output,
) error {
rc, err := fd.getHelmRelease(r, conf)
rc, err := fd.getHelmRelease(r, conf, kubeOut)
if err != nil {
return err
}
Expand Down

0 comments on commit 0bcabc6

Please sign in to comment.