Skip to content

Commit

Permalink
feat(quickwit_output): prepare index creation with mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Idriss Neumann <idriss.neumann@comwork.io>
  • Loading branch information
idrissneumann committed Dec 29, 2023
1 parent 3fc81aa commit f583b90
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 1 deletion.
18 changes: 17 additions & 1 deletion main.go
Expand Up @@ -96,6 +96,7 @@ var (
config *types.Configuration
stats *types.Statistics
promStats *types.PromStatistics
initClientArgs *types.InitClientArgs

regPromLabels *regexp.Regexp
)
Expand Down Expand Up @@ -124,6 +125,13 @@ func init() {
DogstatsdClient: dogstatsdClient,
}

initClientArgs = &types.InitClientArgs{
Config: config,
Stats: stats,
DogstatsdClient: dogstatsdClient,
PromStats: promStats,
}

if config.Statsd.Forwarder != "" {
var err error
statsdClient, err = outputs.NewStatsdClient("StatsD", config, stats)
Expand Down Expand Up @@ -247,7 +255,15 @@ func init() {
config.Quickwit.Index = "falco"
}

quickwitClient, err = outputs.NewClient("Quickwit", config.Quickwit.HostPort+"/"+config.Quickwit.ApiEndpoint+"/"+config.Quickwit.Index, config.Quickwit.MutualTLS, config.Quickwit.CheckCert, config, stats, promStats, statsdClient, dogstatsdClient)
if config.Quickwit.IndexVersion == "" {
config.Quickwit.IndexVersion = "0.6"
}

endpointUrl := fmt.Sprintf("%s/%s/%s/ingest", config.Quickwit.HostPort, config.Quickwit.ApiEndpoint, config.Quickwit.Index)
quickwitClient, err = outputs.NewClient("Quickwit", endpointUrl, config.Quickwit.MutualTLS, config.Quickwit.CheckCert, config, stats, promStats, statsdClient, dogstatsdClient)
if err == nil {
err = quickwitClient.AutoCreateQuickwitIndex(*initClientArgs)
}

if err != nil {
config.Quickwit.HostPort = ""
Expand Down
5 changes: 5 additions & 0 deletions outputs/client.go
Expand Up @@ -143,6 +143,11 @@ type Client struct {
RedisClient *redis.Client
}

// InitClient returns a new output.Client for accessing the different API.
func InitClient(outputType string, defaultEndpointURL string, mutualTLSEnabled bool, checkCert bool, params types.InitClientArgs) (*Client, error) {
return NewClient(outputType, defaultEndpointURL, mutualTLSEnabled, checkCert, params.Config, params.Stats, params.PromStats, params.StatsdClient, params.DogstatsdClient)
}

// NewClient returns a new output.Client for accessing the different API.
func NewClient(outputType string, defaultEndpointURL string, mutualTLSEnabled bool, checkCert bool, config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {
reg := regexp.MustCompile(`(http|nats)(s?)://.*`)
Expand Down
107 changes: 107 additions & 0 deletions outputs/quickwit.go
Expand Up @@ -18,11 +18,118 @@ limitations under the License.
package outputs

import (
"fmt"
"log"

"github.com/falcosecurity/falcosidekick/types"
)

type QuickwitDynamicMapping struct {
Description string `json:"description"`
Fast bool `json:"fast"`
ExpendDots bool `json:"expand_dots"`
Indexed bool `json:"indexed"`
Record string `json:"record"`
Stored bool `json:"bool"`
Tokenizer string `json:"tokenizer"`
}

type QuickwitFieldMapping struct {
Name string `json:"name"`
Type string `json:"type"`
Fast bool `json:"fast"`
}

type QuickwitSearchSettings struct {
DefaultSearchFields []string `json:"default_search_fields"`
}

type QuickwitDocMapping struct {
DynamicMapping QuickwitDynamicMapping `json:"dynamic_mapping"`
FieldMappings []QuickwitFieldMapping `json:"field_mappings"`
Mode string `json:"mode"`
StoreSource bool `json:"store_source"`
TimestampField string `json:"timestamp_field"`
}

type QuickwitMappingPayload struct {
Id string `json:"index_id"`
Version string `json:"version"`
SearchSettings QuickwitSearchSettings `json:"search_settings"`
DocMapping QuickwitDocMapping `json:"doc_mapping"`
}

func (c *Client) AutoCreateQuickwitIndex(params types.InitClientArgs) error {
config := params.Config.Quickwit
if !config.AutoCreateIndex {
return nil
}

endpointUrl := fmt.Sprintf("%s/%s/indexes", config.HostPort, config.ApiEndpoint)
quickwitInitClient, err := InitClient("QuickwitInit", endpointUrl, config.MutualTLS, config.CheckCert, params)
if nil != err {
return err
}

mapping := &QuickwitMappingPayload{
Id: config.Index,
Version: config.IndexVersion,
DocMapping: QuickwitDocMapping{
Mode: "dynamic",
StoreSource: true,
TimestampField: "time",
DynamicMapping: QuickwitDynamicMapping{
Description: "Falco",
Fast: true,
ExpendDots: true,
Indexed: true,
Stored: true,
Record: "basic",
Tokenizer: "raw",
},
FieldMappings: []QuickwitFieldMapping{
{
Name: "time",
Type: "datetime",
Fast: true,
},
{
Name: "priority",
Type: "i64",
Fast: true,
},
{
Name: "source",
Type: "text",
Fast: true,
},
{
Name: "output",
Type: "string",
Fast: true,
},
{
Name: "rule",
Type: "string",
Fast: true,
},
},
},
SearchSettings: QuickwitSearchSettings{
DefaultSearchFields: []string{"source", "output"},
},
}

err = quickwitInitClient.Post(mapping)

// This error means it's an http 400 (meaning the index already exists, so no need to throw an error)
if err.Error() == "header missing" {
return nil
}

return err
}

func (c *Client) QuickwitPost(falcopayload types.FalcoPayload) {
c.Stats.Quickwit.Add(Total, 1)

Expand Down
12 changes: 12 additions & 0 deletions types/types.go
Expand Up @@ -24,6 +24,7 @@ import (
"text/template"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/embano1/memlog"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -128,6 +129,15 @@ type Configuration struct {
Dynatrace DynatraceOutputConfig
}

// InitClientArgs represent a client parameters for initialization
type InitClientArgs struct {
Config *Configuration
Stats *Statistics
PromStats *PromStatistics
StatsdClient *statsd.Client
DogstatsdClient *statsd.Client
}

// MutualTLSClient represents parameters for mutual TLS as client
type MutualTLSClient struct {
CertFile string
Expand Down Expand Up @@ -281,10 +291,12 @@ type QuickwitOutputConfig struct {
HostPort string
ApiEndpoint string
Index string
IndexVersion string
CustomHeaders map[string]string
MinimumPriority string
CheckCert bool
MutualTLS bool
AutoCreateIndex bool
}

type influxdbOutputConfig struct {
Expand Down

0 comments on commit f583b90

Please sign in to comment.