Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #500 to avoid potential hang and event loss #501

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"ecapture/user/event"
"encoding/hex"
"sync/atomic"
"time"
)

Expand All @@ -30,6 +31,9 @@ type IWorker interface {
// 收包
Write(event.IEventStruct) error
GetUUID() string
IfUsed() bool
Get()
Put()
}

const (
Expand All @@ -49,6 +53,7 @@ type eventWorker struct {
processor *EventProcessor
parser IParser
payload *bytes.Buffer
used atomic.Bool
}

func NewEventWorker(uuid string, processor *EventProcessor) IWorker {
Expand Down Expand Up @@ -127,12 +132,46 @@ func (ew *eventWorker) parserEvents() []byte {
func (ew *eventWorker) Run() {
for {
select {
case _ = <-ew.ticker.C:
case <-ew.ticker.C:
// 输出包
if ew.tickerCount > MaxTickerCount {
//ew.processor.GetLogger().Printf("eventWorker TickerCount > %d, event closed.", MaxTickerCount)
ew.Close()
return
ew.processor.delWorkerByUUID(ew)

/*
When returned from delWorkerByUUID(), there are two possiblities:
1) no routine can touch it.
2) one routine can still touch ew because getWorkerByUUID()
*happen before* delWorkerByUUID()

When no routine can touch it (i.e.,ew.IfUsed == false),
we just drain the ew.incoming and return.

When one routine can touch it (i.e.,ew.IfUsed == true), we ensure
that we only return after the routine can not touch it
(i.e.,ew.IfUsed == false). At this point, we can ensure that no
other routine will touch it and send events through the ew.incoming.
So, we return.

Because eworker has been deleted from workqueue after delWorkerByUUID()
(ordered by a workqueue lock), at this point, we can ensure that
no ew will not be touched even **in the future**. So the return is
safe.

*/
for {
select {
case e := <-ew.incoming:
ew.writeEvent(e)
default:
if ew.IfUsed() {
time.Sleep(10 * time.Millisecond)
continue
ruitianzhong marked this conversation as resolved.
Show resolved Hide resolved
}
ew.Close()
return
}
}
}
ew.tickerCount++
case e := <-ew.incoming:
Expand All @@ -149,5 +188,22 @@ func (ew *eventWorker) Close() {
ew.ticker.Stop()
ew.Display()
ew.tickerCount = 0
ew.processor.delWorkerByUUID(ew)
}

func (ew *eventWorker) Get() {
if !ew.used.CompareAndSwap(false, true) {
panic("unexpected behavior and incorrect usage for eventWorker")
}
}

func (ew *eventWorker) Put() {
if !ew.used.CompareAndSwap(true, false) {
panic("unexpected behavior and incorrect usage for eventWorker")
}

}

func (ew *eventWorker) IfUsed() bool {

return ew.used.Load()
}
3 changes: 3 additions & 0 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (this *EventProcessor) dispatch(e event.IEventStruct) {
}

err := eWorker.Write(e)
eWorker.Put() // never touch eWorker again
if err != nil {
//...
this.GetLogger().Fatalf("write event failed , error:%v", err)
Expand All @@ -89,13 +90,15 @@ func (this *EventProcessor) getWorkerByUUID(uuid string) (bool, IWorker) {
if !found {
return false, eWorker
}
eWorker.Get()
return true, eWorker
}

func (this *EventProcessor) addWorkerByUUID(worker IWorker) {
this.Lock()
defer this.Unlock()
this.workerQueue[worker.GetUUID()] = worker
worker.Get()
}

// 每个worker调用该方法,从处理器中删除自己
Expand Down
4 changes: 3 additions & 1 deletion pkg/event_processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strings"
Expand Down Expand Up @@ -70,10 +71,11 @@ func TestEventProcessor_Serve(t *testing.T) {
ep.Write(&BaseEvent{Data_len: eventSSL.Data_len, Data: eventSSL.Data, DataType: eventSSL.DataType, Timestamp: eventSSL.Timestamp, Pid: eventSSL.Pid, Tid: eventSSL.Tid, Comm: eventSSL.Comm, Fd: eventSSL.Fd, Version: eventSSL.Version})
}

tick := time.NewTicker(time.Second * 3)
tick := time.NewTicker(time.Second * 10)
<-tick.C

err = ep.Close()
logger.SetOutput(io.Discard)
lines = strings.Split(buf.String(), "\n")
ok := true
for _, line := range lines {
Expand Down
Loading