Skip to content

Commit

Permalink
feat: add tolerations feature (#35)
Browse files Browse the repository at this point in the history
* feat: add tolerations feature

* Update modules/firehose/driver.go

Co-authored-by: StewartJingga <stewart_jingga@yahoo.com>

* fix: fix sink types for firehose module

---------

Co-authored-by: StewartJingga <stewart_jingga@yahoo.com>
  • Loading branch information
spy16 and StewartJingga committed Apr 18, 2023
1 parent 4e03452 commit b013688
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 18 deletions.
114 changes: 107 additions & 7 deletions modules/firehose/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -70,12 +71,36 @@ type (
)

type driverConf struct {
Labels map[string]string `json:"labels,omitempty"`
Telegraf *Telegraf `json:"telegraf"`
Namespace string `json:"namespace" validate:"required"`
ChartValues ChartValues `json:"chart_values" validate:"required"`
Limits UsageSpec `json:"limits,omitempty" validate:"required"`
Requests UsageSpec `json:"requests,omitempty" validate:"required"`
Labels map[string]string `json:"labels,omitempty"`
Telegraf *Telegraf `json:"telegraf"`
Namespace string `json:"namespace" validate:"required"`
ChartValues ChartValues `json:"chart_values" validate:"required"`
Limits UsageSpec `json:"limits,omitempty" validate:"required"`
Requests UsageSpec `json:"requests,omitempty" validate:"required"`
Tolerations map[string]Toleration `json:"tolerations"`
InitContainer InitContainer `json:"init_container"`

GCSSinkCredential string `json:"gcs_sink_credential,omitempty"`
DLQGCSSinkCredential string `json:"dlq_gcs_sink_credential,omitempty"`
BigQuerySinkCredential string `json:"big_query_sink_credential,omitempty"`
}

type InitContainer struct {
Enabled bool `json:"enabled"`

Args []string `json:"args"`
Command []string `json:"command"`

Repository string `json:"repository"`
ImageTag string `json:"image_tag"`
PullPolicy string `json:"pull_policy"`
}

type Toleration struct {
Key string `json:"key"`
Value string `json:"value"`
Effect string `json:"effect"`
Operator string `json:"operator"`
}

type UsageSpec struct {
Expand All @@ -94,7 +119,9 @@ type transientData struct {
ResetOffsetTo string `json:"reset_offset_to,omitempty"`
}

func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*helm.ReleaseConfig, error) {
func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config,
kubeOut kubernetes.Output,
) (*helm.ReleaseConfig, error) {
var telegrafConf Telegraf
if conf.Telegraf != nil && conf.Telegraf.Enabled {
telegrafTags, err := renderLabels(conf.Telegraf.Config.AdditionalGlobalTags, res.Labels)
Expand All @@ -112,6 +139,17 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*h
}
}

tolerationKey := fmt.Sprintf("firehose_%s", conf.EnvVariables["SINK_TYPE"])
var tolerations []map[string]any
for _, t := range kubeOut.Tolerations[tolerationKey] {
tolerations = append(tolerations, map[string]any{
"key": t.Key,
"value": t.Value,
"effect": t.Effect,
"operator": t.Operator,
})
}

entropyLabels := map[string]string{
labelDeployment: conf.DeploymentID,
labelOrchestrator: orchestratorLabelValue,
Expand All @@ -122,6 +160,55 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*h
return nil, err
}

var volumes []map[string]any
var volumeMounts []map[string]any

newVolume := func(name string) map[string]any {
const mountMode = 420
return map[string]any{
"name": name,
"items": []map[string]any{{"key": "token", "path": "auth.json"}},
"secretName": name,
"defaultMode": mountMode,
}
}

if fd.conf.GCSSinkCredential != "" {
const mountPath = "/etc/secret/blob-gcs-sink"
const credentialPath = mountPath + "/auth.json"

volumes = append(volumes, newVolume(fd.conf.GCSSinkCredential))
volumeMounts = append(volumeMounts, map[string]any{
"name": fd.conf.GCSSinkCredential,
"mountPath": mountPath,
})
conf.EnvVariables["SINK_BLOB_GCS_CREDENTIAL_PATH"] = credentialPath
}

if fd.conf.DLQGCSSinkCredential != "" {
const mountPath = "/etc/secret/dlq-gcs"
const credentialPath = mountPath + "/auth.json"

volumes = append(volumes, newVolume(fd.conf.DLQGCSSinkCredential))
volumeMounts = append(volumeMounts, map[string]any{
"name": fd.conf.DLQGCSSinkCredential,
"mountPath": mountPath,
})
conf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"] = credentialPath
}

if fd.conf.BigQuerySinkCredential != "" {
const mountPath = "/etc/secret/bigquery-sink"
const credentialPath = mountPath + "/auth.json"

volumes = append(volumes, newVolume(fd.conf.BigQuerySinkCredential))
volumeMounts = append(volumeMounts, map[string]any{
"name": fd.conf.BigQuerySinkCredential,
"mountPath": mountPath,
})
conf.EnvVariables["SINK_BIGQUERY_CREDENTIAL_PATH"] = credentialPath
}

rc := helm.DefaultReleaseConfig()
rc.Name = conf.DeploymentID
rc.Repository = chartRepo
Expand Down Expand Up @@ -149,6 +236,19 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config) (*h
"memory": conf.Requests.Memory,
},
},
"tolerations": tolerations,
"volumeMounts": volumeMounts,
"volumes": volumes,
},
"init-firehose": map[string]any{
"enabled": fd.conf.InitContainer.Enabled,
"image": map[string]any{
"repository": fd.conf.InitContainer.Repository,
"pullPolicy": fd.conf.InitContainer.PullPolicy,
"tag": fd.conf.InitContainer.ImageTag,
},
"command": fd.conf.InitContainer.Command,
"args": fd.conf.InitContainer.Args,
},
"telegraf": map[string]any{
"enabled": telegrafConf.Enabled,
Expand Down
2 changes: 1 addition & 1 deletion modules/firehose/driver_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (fd *firehoseDriver) Output(ctx context.Context, exr module.ExpandedResourc
func (fd *firehoseDriver) refreshOutput(ctx context.Context, r resource.Resource,
conf Config, output Output, kubeOut kubernetes.Output,
) (json.RawMessage, error) {
rc, err := fd.getHelmRelease(r, conf)
rc, err := fd.getHelmRelease(r, conf, kubeOut)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion modules/firehose/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource)
func (fd *firehoseDriver) releaseSync(ctx context.Context, r resource.Resource,
isCreate bool, conf Config, kubeOut kubernetes.Output,
) error {
rc, err := fd.getHelmRelease(r, conf)
rc, err := fd.getHelmRelease(r, conf, kubeOut)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions modules/firehose/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@
"SINK_TYPE": {
"type": "string",
"enum": [
"LOG",
"JDBC",
"HTTP",
"POSTGRES",
"INFLUXDB",
"ELASTIC",
"REDIS",
"ELASTICSEARCH",
"GRPC",
"PROMETHEUS",
"BIGQUERY",
"BLOB",
"BIGTABLE",
"JDBC"
"MONGODB",
"LOG",
"REDIS",
"BIGQUERY",
"BIGTABLE"
]
},
"KAFKA_RECORD_PARSER_MODE": {
Expand Down
12 changes: 10 additions & 2 deletions modules/kubernetes/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ import (
)

type Output struct {
Configs kube.Config `json:"configs"`
ServerInfo version.Info `json:"server_info"`
Configs kube.Config `json:"configs"`
ServerInfo version.Info `json:"server_info"`
Tolerations map[string][]Toleration `json:"tolerations"`
}

type Toleration struct {
Key string `json:"key"`
Value string `json:"value"`
Effect string `json:"effect"`
Operator string `json:"operator"`
}

func (out Output) JSON() []byte {
Expand Down

0 comments on commit b013688

Please sign in to comment.