Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add pkg/splitwriter and pkg/buffercache reusable components [#55](https://github.com/AdRoll/baker/pull/55)
- output: add SQLite output [#56](https://github.com/AdRoll/baker/pull/56)
- README: document KCL input [#59](https://github.com/AdRoll/baker/pull/59)
- Document how to specialize baker.LogLine [#63](https://github.com/AdRoll/baker/pull/63)

### Changed

Expand Down
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ Baker is fully parallel and maximizes usage of both CPU-bound and I/O bound pipe
- [Provided Baker components](#provided-baker-components)
- [Inputs](#inputs-1)
- [KCL](#kcl)
- [Implementation and prevent throttling prevention](#implementation-and-prevent-throttling-prevention)
- [Implementation and throttling prevention](#implementation-and-throttling-prevention)
- [Working with baker.Record](#working-with-bakerrecord)
- [`baker.LogLine` CSV record](#bakerlogline-csv-record)
- [Tuning parallelism](#tuning-parallelism)
- [Sharding](#sharding)
- [How to implement a sharding function](#how-to-implement-a-sharding-function)
Expand Down Expand Up @@ -452,6 +454,39 @@ Doing so, we're guaranteed to never exceed the per-shard read througput limit of
2MB/s, while being close to it on data peaks. This has the added advantage of
reducing the number of IO syscalls.

## Working with baker.Record

`baker.Record` is an interface which provides an abstraction over a record of
flattened data, where columns of fields are indexed through integers.

At the moment, `baker` proposes an unique `Record` implementation, `baker.LogLine`.

### `baker.LogLine` CSV record

`baker.LogLine` is an highly optimized CSV compatible Record implementation. It
supports any single-byte field separator and doesn't handle quotes (neither
single nor double). The maximum number of fields is hard-coded by the
`LogLineNumFields` constant which is 3000. 100 extra fields can be stored at
runtime in a `LogLine` (also hardcoded with `NumFieldsBaker`), these extra fields
are a fast way to exchange data between filters and/or outputs but they are neither
handled during `Parsing` (i.e `LogLine.Parse`) nor serialization (`LogLine.ToText`).

If the hardcoded values for `LogLineNumFields` and `NumFieldsBaker` do not suit
your needs, it's advised that you copy `logline.go` in your project and modify
the constants declared at the top of the file. Your specialized `LogLine` will
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the user should copy the logline but also rename the object, if I'm not mistaken. If true, I'd also change the code example below using something like MyCustomLogline

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's not necessary since the user will do this in its own project so in a different package so the definitions are never going to clash

still implement `baker.Record` and thus can be used in lieu of `baker.LogLine`.
To do so, you need to provide a [CreateRecord](https://pkg.go.dev/github.com/AdRoll/baker#Components)
function to `baker.Components` when calling `baker.NewConfigFromToml`.

For example:

```go
comp := baker.Components{}

comp.CreateRecord = func() baker.Record {
return &LogLine{FieldSeparator:','}
}
```

## Tuning parallelism

Expand Down
4 changes: 3 additions & 1 deletion logline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ const (
// LogLineNumFields is the maximum number of standard fields in a log line.
LogLineNumFields FieldIndex = 3000
// NumFieldsBaker is an additional list of custom fields, not present
// in the input logline, that can be set during processing
// in the input logline nor in the output, that can be set during processing.
// Its main purpose it to fastly exchange values between filters (and possibly
// outputs) on a per-record basis.
NumFieldsBaker FieldIndex = 100

// DefaultLogLineFieldSeparator defines the default field separator, which is the comma
Expand Down
21 changes: 12 additions & 9 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,6 @@ import (
log "github.com/sirupsen/logrus"
)

func countInvalid(invalid *[LogLineNumFields]int64) int64 {
sum := int64(0)
for _, i := range invalid {
sum = sum + i
}
return sum
}

// A StatsDumper gathers statistics about all baker components of topology.
type StatsDumper struct {
t *Topology
Expand Down Expand Up @@ -105,7 +97,7 @@ func (sd *StatsDumper) dumpNow() {
log.Fatalf("numUploads < prevUploads: %d < %d\n", numUploads, sd.prevUploads)
}

invalid := countInvalid(&t.invalid)
invalid := sd.countInvalid()
parseErrors := t.malformed
totalErrors := invalid + parseErrors + filtered + outErrors
sd.metrics.RawCount("error_lines", totalErrors)
Expand Down Expand Up @@ -173,6 +165,17 @@ func (sd *StatsDumper) dumpNow() {
sd.prevUploadErrors = numUploadErrors
}

func (sd *StatsDumper) countInvalid() int64 {
sd.t.mu.RLock()
defer sd.t.mu.RUnlock()

sum := int64(0)
for _, i := range sd.t.invalid {
sum = sum + i
}
return sum
}

// Run starts dumping stats every second on standard output. Call stop() to
// stop periodically dumping stats, this prints stats one last time.
func (sd *StatsDumper) Run() (stop func()) {
Expand Down
109 changes: 109 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"math"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/AdRoll/baker"
"github.com/AdRoll/baker/filter/filtertest"
"github.com/AdRoll/baker/input/inputtest"
"github.com/AdRoll/baker/output"
"github.com/AdRoll/baker/output/outputtest"
"github.com/AdRoll/baker/testutil"
"github.com/AdRoll/baker/upload/uploadtest"
Expand Down Expand Up @@ -110,3 +112,110 @@ func TestStatsDumper(t *testing.T) {

testutil.DiffWithGolden(t, buf.Bytes(), golden)
}

func TestStatsDumperInvalidRecords(t *testing.T) {
// This test controls the correct integration of the StatsDumper with the
// Topology by counting the number of invalid fields, that is the number
// of fields which do not pass the user-specificed validation function, as
// reported by the StatsDumper, after the topology has finished its execution.
toml := `
[input]
name="logline"

[output]
name="nop"
fields=["field0"]
`

components := baker.Components{
Inputs: []baker.InputDesc{inputtest.LogLineDesc},
Outputs: []baker.OutputDesc{output.NopDesc},
FieldByName: func(n string) (baker.FieldIndex, bool) {
switch n {
case "field0":
return 0, true
case "field1":
return 1, true
}
return 0, false
},
FieldName: func(f baker.FieldIndex) string {
switch f {
case 0:
return "field0"
case 1:
return "field1"
}
return ""
},
Validate: func(r baker.Record) (bool, baker.FieldIndex) {
// field at index 0 must be "value0"
// field at index 1 must be "value1"
if !bytes.Equal(r.Get(0), []byte("value0")) {
return false, 0
}
if !bytes.Equal(r.Get(1), []byte("value1")) {
return false, 1
}
return true, 0
},
}

cfg, err := baker.NewConfigFromToml(strings.NewReader(toml), components)
if err != nil {
t.Fatal(err)
}

topo, err := baker.NewTopologyFromConfig(cfg)
if err != nil {
t.Fatal(err)
}

csvs := []string{
"value0,bar", // field1 invalid
"value0,bar", // field1 invalid
"value0,value1", // both valid
"foo,value1", // field0 invalid
"foo,bar", // both invalid (when that's the case the first one is reported)
}

in := topo.Input.(*inputtest.LogLine)
in.Lines = make([]*baker.LogLine, len(csvs))
for i, csv := range csvs {
ll := &baker.LogLine{FieldSeparator: ','}
ll.Parse([]byte(csv), nil)
in.Lines[i] = ll
}

buf := &bytes.Buffer{}
stats := baker.NewStatsDumper(topo)
stats.SetWriter(buf)
stop := stats.Run()

topo.Start()
topo.Wait()
if err = topo.Error(); err != nil {
t.Fatal(err)
}

// The StatsDumper needs at least one second to print anything.
time.Sleep(time.Second)
stop()

// Clean the stats dumper output.
out := strings.Split(strings.TrimSpace(buf.String()), "\n")
// Only take the last 2 lines (the final ones).
out = out[len(out)-2:]

// The Stats line should contain the following string, indicating 6 validation errors
want := `errors[p:0 i:4 f:0 o:0 u:0]`
if !strings.Contains(out[0], want) {
t.Errorf("StatsDumper stats line doesn't contain %q\nline:\n\t%q", out[0], want)
}

// The validation errors line should shown 2 errors for each field
want = `map[field0:2 field1:2]`
if !strings.Contains(out[1], want) {
t.Errorf("StatsDumper validation error line doesn't contain %q\nline:\n\t%q", out[1], want)
}
}
14 changes: 10 additions & 4 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type Topology struct {
upch chan string

metrics MetricsClient
invalid [LogLineNumFields]int64 // count validation errors (by field)
malformed int64 // count parse or empty records
malformed int64 // count parse or empty records

mu sync.RWMutex // protects invalid map
invalid map[FieldIndex]int64 // tracks validation errors (by field)

shard func(l Record) uint64
chain func(l Record)
Expand Down Expand Up @@ -59,6 +61,7 @@ func NewTopologyFromConfig(cfg *Config) (*Topology, error) {
return cfg.createRecord()
},
},
invalid: make(map[FieldIndex]int64),
}

// Create the metrics client first since it's injected into components parameters.
Expand Down Expand Up @@ -369,8 +372,11 @@ func (t *Topology) runFilterChain() {
// Validate against patterns
if t.validate != nil {
// call external validation function
if ok, idx := t.validate(record); !ok {
atomic.AddInt64(&t.invalid[idx], 1)
ok, idx := t.validate(record)
if !ok {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it as fast as the previous version?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's not since incrementing an atomic is faster than locking a mutex. However this is the only correct way to do it. Moreover the change is only seen when there's an invalid logline, the happy path stays the same

t.mu.Lock()
t.invalid[idx]++
t.mu.Unlock()
continue
}
}
Expand Down