Skip to content

Commit

Permalink
fix context aggregator bug (#377)
Browse files Browse the repository at this point in the history
* fix context aggregator bug
  • Loading branch information
henryzhx8 committed Sep 8, 2022
1 parent 9a4791b commit 61b575d
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pluginmanager/plugin_manager_test.go
Expand Up @@ -24,7 +24,7 @@ import (
_ "github.com/alibaba/ilogtail/pkg/logger/test"

// dependency packages
_ "github.com/alibaba/ilogtail/plugins/aggregator/defaultone"
_ "github.com/alibaba/ilogtail/plugins/aggregator"
"github.com/alibaba/ilogtail/plugins/flusher/checker"
_ "github.com/alibaba/ilogtail/plugins/flusher/statistics"
_ "github.com/alibaba/ilogtail/plugins/flusher/stdout"
Expand Down
26 changes: 26 additions & 0 deletions plugins/aggregator/aggregator_default.go
@@ -0,0 +1,26 @@
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggregator

import (
"github.com/alibaba/ilogtail"
"github.com/alibaba/ilogtail/plugins/aggregator/baseagg"
)

func init() {
ilogtail.Aggregators["aggregator_default"] = func() ilogtail.Aggregator {
return baseagg.NewAggregatorBase()
}
}
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package defaultone
package context

import (
"strings"
Expand All @@ -37,9 +37,9 @@ type LogPackSeqInfo struct {
lastUpdateTime time.Time
}

// AggregatorDefault is the default aggregator in plugin system.
// AggregatorContext is the default aggregator in plugin system.
// If there is no specific aggregator in plugin config, it will be added.
type AggregatorDefault struct {
type AggregatorContext struct {
MaxLogGroupCount int // the maximum log group count to trigger flush operation
MaxLogCount int // the maximum log in a log group
Topic string // the output topic
Expand All @@ -62,20 +62,20 @@ type AggregatorDefault struct {
// Init method would be trigger before working.
// 1. context store the metadata of this Logstore config
// 2. que is a transfer channel for flushing LogGroup when reaches the maximum in the cache.
func (p *AggregatorDefault) Init(context ilogtail.Context, que ilogtail.LogGroupQueue) (int, error) {
func (p *AggregatorContext) Init(context ilogtail.Context, que ilogtail.LogGroupQueue) (int, error) {
p.defaultPack = util.NewPackIDPrefix(context.GetConfigName())
p.context = context
p.queue = que
p.lastCleanPackIDMapTime = time.Now()
return 0, nil
}

func (*AggregatorDefault) Description() string {
func (*AggregatorContext) Description() string {
return "default aggregator for logtail"
}

// Add adds @log with @ctx to aggregator.
func (p *AggregatorDefault) Add(log *protocol.Log, ctx map[string]interface{}) error {
func (p *AggregatorContext) Add(log *protocol.Log, ctx map[string]interface{}) error {
p.lock.Lock()
defer p.lock.Unlock()

Expand Down Expand Up @@ -116,6 +116,7 @@ func (p *AggregatorDefault) Add(log *protocol.Log, ctx map[string]interface{}) e
}
}
// New log group, reset size.
p.nowLogGroupSizeMap[source] = 0
logGroupList = append(logGroupList, p.newLogGroup(source, topic))
nowLogGroup = logGroupList[len(logGroupList)-1]
}
Expand All @@ -128,7 +129,7 @@ func (p *AggregatorDefault) Add(log *protocol.Log, ctx map[string]interface{}) e
}

// Flush ...
func (p *AggregatorDefault) Flush() []*protocol.LogGroup {
func (p *AggregatorContext) Flush() []*protocol.LogGroup {
p.lock.Lock()
defer p.lock.Unlock()
var ret []*protocol.LogGroup
Expand Down Expand Up @@ -160,13 +161,13 @@ func (p *AggregatorDefault) Flush() []*protocol.LogGroup {
}

// Reset ...
func (p *AggregatorDefault) Reset() {
func (p *AggregatorContext) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
p.logGroupPoolMap = make(map[string][]*protocol.LogGroup)
}

func (p *AggregatorDefault) newLogGroup(pack string, topic string) *protocol.LogGroup {
func (p *AggregatorContext) newLogGroup(pack string, topic string) *protocol.LogGroup {
logGroup := &protocol.LogGroup{
Logs: make([]*protocol.Log, 0, p.MaxLogCount),
Topic: topic,
Expand All @@ -186,7 +187,7 @@ func (p *AggregatorDefault) newLogGroup(pack string, topic string) *protocol.Log
return logGroup
}

func (*AggregatorDefault) evaluateLogSize(log *protocol.Log) int {
func (*AggregatorContext) evaluateLogSize(log *protocol.Log) int {
var logSize = 6
for _, logC := range log.Contents {
logSize += 5 + len(logC.Key) + len(logC.Value)
Expand All @@ -195,8 +196,8 @@ func (*AggregatorDefault) evaluateLogSize(log *protocol.Log) int {
}

// NewAggregatorDefault create a default aggregator with default value.
func NewAggregatorDefault() *AggregatorDefault {
return &AggregatorDefault{
func NewAggregatorDefault() *AggregatorContext {
return &AggregatorContext{
MaxLogGroupCount: 4,
MaxLogCount: MaxLogCount,
ContextPresevationTolerance: 10,
Expand All @@ -211,7 +212,7 @@ func NewAggregatorDefault() *AggregatorDefault {

// Register the plugin to the Aggregators array.
func init() {
ilogtail.Aggregators["aggregator_default"] = func() ilogtail.Aggregator {
ilogtail.Aggregators["aggregator_context"] = func() ilogtail.Aggregator {
return NewAggregatorDefault()
}
}
@@ -1,8 +1,21 @@
package defaultone
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package context

import (
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
Expand All @@ -16,6 +29,11 @@ import (

var packIDPrefix = [3]string{"ABCDEFGHIJKLMNOP", "ALOEJDMGNYTDEWS", "VDSRGHUKMLQETGVD"}

var (
shortLog = "This is short log. This log comes from source "
longLog = strings.Repeat("This is long log. ", 200) + "This log comes from source "
)

type SliceQueue struct {
logGroups []*protocol.LogGroup
}
Expand All @@ -40,7 +58,7 @@ type contextInfo struct {
logSeq int
}

func newAggregatorDefault() (*AggregatorDefault, *SliceQueue, error) {
func newAggregatorDefault() (*AggregatorContext, *SliceQueue, error) {
ctx := mock.NewEmptyContext("p", "l", "c")
que := &SliceQueue{}
agg := NewAggregatorDefault()
Expand All @@ -53,34 +71,46 @@ func TestAggregatorDefault(t *testing.T) {
agg, que, err := newAggregatorDefault()
So(err, ShouldBeNil)

Convey("When no quick flush happens", func() {
Convey("When log producing pace is slow and each log is relatively small", func() {
logNo := make([]int, len(packIDPrefix))
generateLogs(agg, 1000, true, logNo)
logGroups := que.PopAll()
generateLogs(agg, 900, true, logNo, true)
logGroups := agg.Flush()
generateLogs(agg, 1800, true, logNo, true)
logGroups = append(logGroups, agg.Flush()...)
generateLogs(agg, 2000, true, logNo)
logGroups = append(logGroups, que.PopAll()...)

Convey("Then no quick flush happens, and each logGroup should contain logs from the same source with chronological order", func() {
So(logGroups, ShouldHaveLength, 6)
checkResult(logGroups, 2700)
})
})

Convey("When log producing pace is fast but each log is relatively small", func() {
logNo := make([]int, len(packIDPrefix))
generateLogs(agg, 18432, true, logNo, true) // 1024 * 6 * 3
logGroups := que.PopAll()
logGroups = append(logGroups, agg.Flush()...)

Convey("Then each logGroup should contain logs from the same source with chronological order", func() {
checkResult(logGroups, 3000)
Convey("Then quick flush happens, and each logGroup should contain logs from the same source with chronological order", func() {
So(logGroups, ShouldHaveLength, 18)
checkResult(logGroups, 18432)
})
})

Convey("When quick flush happens", func() {
Convey("When log producing pace is slow but each log is relatively large", func() {
logNo := make([]int, len(packIDPrefix))
generateLogs(agg, 50000, true, logNo)
generateLogs(agg, 9216, true, logNo, false) // 1024 * 3 * 3
logGroups := que.PopAll()
logGroups = append(logGroups, agg.Flush()...)

Convey("Then each logGroup should contain logs from the same source with chronological order", func() {
checkResult(logGroups, 50000)
Convey("Then quick flush happens, and each logGroup should contain logs from the same source with chronological order", func() {
So(logGroups, ShouldHaveLength, 12)
checkResult(logGroups, 9216)
})
})

Convey("When no source information is provided", func() {
logNo := make([]int, len(packIDPrefix))
generateLogs(agg, 20000, false, logNo)
generateLogs(agg, 20000, false, logNo, true)
logGroups := que.PopAll()
logGroups = append(logGroups, agg.Flush()...)

Expand Down Expand Up @@ -110,7 +140,7 @@ func TestAggregatorDefault(t *testing.T) {

Convey("When logs are added", func() {
logNo := make([]int, len(packIDPrefix))
generateLogs(agg, 20000, true, logNo)
generateLogs(agg, 20000, true, logNo, true)
logGroups := que.PopAll()
logGroups = append(logGroups, agg.Flush()...)

Expand All @@ -134,11 +164,15 @@ func TestAggregatorDefault(t *testing.T) {
})
}

func generateLogs(agg *AggregatorDefault, logNum int, withCtx bool, logNo []int) {
func generateLogs(agg *AggregatorContext, logNum int, withCtx bool, logNo []int, isShort bool) {
for i := 0; i < logNum; i++ {
index := rand.Intn(len(packIDPrefix))
index := i % len(packIDPrefix)
log := &protocol.Log{Time: uint32(time.Now().Unix())}
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "content", Value: "This log comes from source " + fmt.Sprintf("%d", index)})
if isShort {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "content", Value: shortLog + fmt.Sprintf("%d", index)})
} else {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "content", Value: longLog + fmt.Sprintf("%d", index)})
}
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "no", Value: fmt.Sprintf("%d", logNo[index]+1)})
if withCtx {
ctx := map[string]interface{}{"source": packIDPrefix[index] + "-"}
Expand Down
3 changes: 2 additions & 1 deletion plugins/all/all.go
Expand Up @@ -15,8 +15,9 @@
package all

import (
_ "github.com/alibaba/ilogtail/plugins/aggregator"
_ "github.com/alibaba/ilogtail/plugins/aggregator/baseagg"
_ "github.com/alibaba/ilogtail/plugins/aggregator/defaultone"
_ "github.com/alibaba/ilogtail/plugins/aggregator/context"
_ "github.com/alibaba/ilogtail/plugins/aggregator/logstorerouter"
_ "github.com/alibaba/ilogtail/plugins/aggregator/shardhash"
_ "github.com/alibaba/ilogtail/plugins/aggregator/skywalking"
Expand Down
2 changes: 1 addition & 1 deletion plugins/test/common.go
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/pkg/util"
"github.com/alibaba/ilogtail/pluginmanager"
_ "github.com/alibaba/ilogtail/plugins/aggregator/defaultone"
_ "github.com/alibaba/ilogtail/plugins/aggregator" //
_ "github.com/alibaba/ilogtail/plugins/flusher/checker"
_ "github.com/alibaba/ilogtail/plugins/flusher/statistics"
_ "github.com/alibaba/ilogtail/plugins/flusher/stdout"
Expand Down

0 comments on commit 61b575d

Please sign in to comment.