Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filter/clausefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var ClauseFilterDesc = baker.FilterDesc{
}

type ClauseFilterConfig struct {
Clause string `help:"Boolean formula describing which events to let through. Empty = let everything through."`
Clause string `help:"Boolean formula describing which events to let through. If empty, let everything through."`
}

/*
Expand Down
2 changes: 1 addition & 1 deletion filter/clear_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var ClearFieldsDesc = baker.FilterDesc{

// ClearFieldsConfig holds config parameters of the ClearFields filter.
type ClearFieldsConfig struct {
Fields []string `help:"set of fields to clear" default:"[]"`
Fields []string `help:"set of fields to clear" required:"true"`
}

// ClearFields filter clears (i.e set to the empty string) a set of fields.
Expand Down
16 changes: 0 additions & 16 deletions filter/set_string_from_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,6 @@ type setStringFromURLConfig struct {
Strings []string `help:"Strings to look for in the URL. Discard records not containing any of them." required:"true"`
}

func (cfg *setStringFromURLConfig) fillDefaults() error {
if cfg.Field == "" {
return fmt.Errorf("Field is a required parameter")
}

if len(cfg.Strings) == 0 {
return fmt.Errorf("Strings is a required parameter")
}

return nil
}

type setStringFromURL struct {
numProcessedLines int64
numFilteredLines int64
Expand All @@ -50,10 +38,6 @@ func newSetStringFromURL(cfg baker.FilterParams) (baker.Filter, error) {
}
dcfg := cfg.DecodedConfig.(*setStringFromURLConfig)

if err := dcfg.fillDefaults(); err != nil {
return nil, fmt.Errorf("can't configure SetStringFromURL filter: %v", err)
}

f, ok := cfg.FieldByName(dcfg.Field)
if !ok {
return nil, fmt.Errorf("SetStringFromURL: unknow field %q", dcfg.Field)
Expand Down
11 changes: 2 additions & 9 deletions input/kcl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package input

import (
"bytes"
"errors"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -40,8 +39,8 @@ var KCLDesc = baker.InputDesc{
// KCLConfig is the configuration for the KCL input.
type KCLConfig struct {
AwsRegion string `help:"AWS region to connect to" default:"us-west-2"`
Stream string `help:"Name of Kinesis stream" default:""`
AppName string `help:"Used by KCL to allow multiple app to consume the same stream." default:""`
Stream string `help:"Name of Kinesis stream" required:"true"`
AppName string `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"`
MaxShards int `help:"Max shards this Worker can handle at a time" default:"32767"`
ShardSync time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"`
InitialPosition string `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"`
Expand All @@ -51,12 +50,6 @@ type KCLConfig struct {
var appNameRx = regexp.MustCompile(`^[a-zA-Z_0-9]+$`)

func (cfg *KCLConfig) validate() error {
if cfg.Stream == "" {
return errors.New("'Stream' is required")
}
if cfg.AppName == "" {
return errors.New("'AppName' is required")
}
if !appNameRx.MatchString(cfg.AppName) {
return fmt.Errorf("invalid 'AppName' '%s', accepts only [A-Za-z0-9_]+", cfg.AppName)
}
Expand Down
15 changes: 4 additions & 11 deletions input/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package input

import (
"bytes"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -28,7 +27,7 @@ var KTailDesc = baker.InputDesc{

type KTailConfig struct {
AwsRegion string `help:"AWS region to connect to" default:"us-west-2"`
Stream string `help:"Stream name on Kinesis" default:"(required)"`
Stream string `help:"Stream name on Kinesis" required:"true"`
IdleTime time.Duration `help:"Time between polls of each shard" default:"100ms"`
}

Expand All @@ -41,10 +40,6 @@ func (cfg *KTailConfig) fillDefaults() error {
cfg.IdleTime = 100 * time.Millisecond
}

if cfg.Stream == "" {
return fmt.Errorf("Stream field is required")
}

return nil
}

Expand All @@ -58,16 +53,14 @@ type KTail struct {
numLines int64
}

// Create a Kinesis tail, and immediately do a first connection to get the current shard list
// NewKTail creates a Kinesis tail, and immediately do a first connection to
// get the current shard list.
func NewKTail(cfg baker.InputParams) (baker.Input, error) {
if cfg.DecodedConfig == nil {
cfg.DecodedConfig = &KTailConfig{}
}
dcfg := cfg.DecodedConfig.(*KTailConfig)
if dcfg.Stream == "" {
return nil, errors.New("'Stream' is required")
}

dcfg := cfg.DecodedConfig.(*KTailConfig)
if err := dcfg.fillDefaults(); err != nil {
return nil, fmt.Errorf("Kinesis: %s", err)
}
Expand Down
19 changes: 8 additions & 11 deletions input/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package input

import (
"encoding/json"
"fmt"
"net/url"
"regexp"
"strings"
Expand All @@ -28,13 +27,15 @@ var SQSDesc = baker.InputDesc{
"It never exits.\n",
}

const MessageFormatPlain = "plain"
const MessageFormatSNS = "sns"
const (
sqsFormatPlain = "plain"
sqsFormatSNS = "sns"
)

type SQSConfig struct {
AwsRegion string `help:"AWS region to connect to" default:"us-west-2"`
Bucket string `help:"S3 Bucket to use for processing" default:""`
QueuePrefixes []string `help:"Prefixes of the names of the SQS queues to monitor"`
QueuePrefixes []string `help:"Prefixes of the names of the SQS queues to monitor" required:"true"`
MessageFormat string `help:"The format of the SQS messages.\n'plain' the SQS messages received have the S3 file path as a plain string.\n'sns' the SQS messages were produced by a SNS notification." default:"sns"`
FilePathFilter string `help:"If provided, will only use S3 files with the given path."`
}
Expand All @@ -44,7 +45,7 @@ func (cfg *SQSConfig) fillDefaults() {
cfg.AwsRegion = "us-west-2"
}
if cfg.MessageFormat == "" {
cfg.MessageFormat = MessageFormatSNS
cfg.MessageFormat = sqsFormatSNS
} else {
cfg.MessageFormat = strings.ToLower(cfg.MessageFormat)
}
Expand All @@ -67,10 +68,6 @@ func NewSQS(cfg baker.InputParams) (baker.Input, error) {
dcfg := cfg.DecodedConfig.(*SQSConfig)
dcfg.fillDefaults()

if len(dcfg.QueuePrefixes) == 0 {
return nil, fmt.Errorf("\"queues\" not specified in SQS configuration")
}

sess := session.New(&aws.Config{Region: aws.String(dcfg.AwsRegion)})
svc := sqs.New(sess)

Expand Down Expand Up @@ -162,14 +159,14 @@ func (s *SQS) parseMessage(Body *string, ctxLog *log.Entry) (string, string, err
var snsMsgTimestamp string

switch s.Cfg.MessageFormat {
case MessageFormatPlain:
case sqsFormatPlain:
// The SQS queue is populated by a lambda function that
// just provides the path to the S3 file in the message's
// body.
s3FilePath = string(*Body)
snsMsgTimestamp = ""

case MessageFormatSNS:
case sqsFormatSNS:
// The SQS queue is populated by SNS messages. So the
// body is a JSON document with several fields; we only
// care about one field: "Message", which is the URL of
Expand Down
9 changes: 4 additions & 5 deletions output/dyndb.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (dp *dynamoProcess) Wait() bool {

type DynamoWriterConfig struct {
Regions []string `help:"DynamoDB regions to connect to" default:"us-west-2"`
Table string `help:"Name of the table to modify"`
Table string `help:"Name of the table to modify" required:"true"`
Columns []string `help:"Table columns that correspond to each of the fields being written"`
FlushInterval time.Duration `help:"Interval at which flush the data to DynamoDB even if we have not reached 25 records" default:"1s"`
MaxWritesPerSec int64 `help:"Maximum number of writes per second that DynamoDB can accept (0 for unlimited)" default:"0"`
Expand Down Expand Up @@ -210,7 +210,9 @@ type DynamoWriter struct {
errn int64 // number of lines that were skipped because of errors
}

// Create a new DynamoWriter. TableName is the name of the DynamoDB table to be written.
// NewDynamoWriter create a new DynamoWriter output.
//
// TableName is the name of the DynamoDB table to be written.
// Columns is a slice listing the columns that will be written; the first item in the slice
// *MUST* be the primary key of the table.
func NewDynamoWriter(cfg baker.OutputParams) (baker.Output, error) {
Expand All @@ -220,9 +222,6 @@ func NewDynamoWriter(cfg baker.OutputParams) (baker.Output, error) {
dcfg := cfg.DecodedConfig.(*DynamoWriterConfig)
dcfg.fillDefaults()

if dcfg.Table == "" {
return nil, fmt.Errorf("\"table\" not specified in DynamoDB configuration")
}
if len(cfg.Fields) == 0 {
return nil, fmt.Errorf("\"fields\" not specified in [output] configuration")
}
Expand Down
1 change: 0 additions & 1 deletion output/filewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var FileWriterDesc = baker.OutputDesc{
type FileWriterConfig struct {
PathString string `help:"Template to describe location of the output directory: supports .Year, .Month, .Day and .Rotation. Also .Field0 if a field name has been specified in the output's fields list."`
RotateInterval time.Duration `help:"Time after which data will be rotated. If -1, it will not rotate until the end." default:"60s"`
StagingPathString string `help:"Staging directory for the upload functionality"`
ZstdCompressionLevel int `help:"zstd compression level, ranging from 1 (best speed) to 19 (best compression)." default:"3"`
ZstdWindowLog int `help:"Enable zstd long distance matching. Increase memory usage for both compressor/decompressor. If more than 27 the decompressor requires special treatment. 0:disabled." default:"0"`
}
Expand Down
2 changes: 1 addition & 1 deletion output/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var StatsDesc = baker.OutputDesc{
type StatsConfig struct {
CountEmptyFields bool `help:"Whether fields with empty values are counted or not" default:"false"`
CSVPath string `help:"Path of the CSV file to create" default:"stats.csv"`
TimestampField string `help:"Name of a field containing a POSIX timestamp (in seconds) used to build the times stats" default:"(none)"`
TimestampField string `help:"Name of a field containing a POSIX timestamp (in seconds) used to build the times stats" required:"true"`
}

func (cfg *StatsConfig) fillDefaults() {
Expand Down
8 changes: 2 additions & 6 deletions upload/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var S3Desc = baker.UploadDesc{
// SourceBasePath.
type S3Config struct {
SourceBasePath string `help:"Base path used to consider the final S3 path." default:"/tmp/baker/ologs/"`
Region string `help:"S3 region to upload to." default:"us-east-1"`
Bucket string `help:"S3 bucket to upload to. (required)"`
Region string `help:"S3 region to upload to" default:"us-east-1"`
Bucket string `help:"S3 bucket to upload to" required:"true"`
Prefix string `help:"Prefix on the destination bucket" default:"/"`
StagingPath string `help:"Local staging area to copy files to before upload." default:"/tmp/baker/ologs/staging/"`
Retries int `help:"Number of retries before a failed upload" default:"3"`
Expand All @@ -52,10 +52,6 @@ type S3Config struct {
}

func (cfg *S3Config) fillDefaults() error {
if cfg.Bucket == "" {
return fmt.Errorf("Bucket is required")
}

if cfg.Region == "" {
cfg.Region = "us-east-1"
}
Expand Down