Skip to content

Commit

Permalink
refactor(plc4go/spi): move worker starting into a own method
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 21, 2023
1 parent e9d705a commit 430655f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
20 changes: 12 additions & 8 deletions plc4go/spi/default/DefaultCodec.go
Expand Up @@ -149,8 +149,7 @@ func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
}

m.log.Debug().Msg("Message codec currently not running, starting worker now")
m.activeWorker.Add(1)
go m.Work(m.DefaultCodecRequirements)
m.startWorker()
m.running.Store(true)
m.log.Trace().Msg("connected")
return nil
Expand Down Expand Up @@ -277,7 +276,13 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
return messageHandled
}

func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
func (m *defaultCodec) startWorker() {
m.log.Trace().Msg("starting worker")
m.activeWorker.Add(1)
go m.Work()
}

func (m *defaultCodec) Work() {
defer m.activeWorker.Done()
workerLog := m.log.With().Logger()
if !m.traceDefaultMessageCodecWorker {
Expand All @@ -286,19 +291,18 @@ func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
workerLog.Trace().Msg("Starting work")
defer workerLog.Trace().Msg("work ended")

defer func(workerLog zerolog.Logger) {
defer func() {
if err := recover(); err != nil {
// TODO: If this is an error, cast it to an error and log it with "Err(err)"
m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
}
if m.running.Load() {
workerLog.Warn().Msg("Keep running")
m.activeWorker.Add(1)
go m.Work(codec)
m.startWorker()
} else {
workerLog.Info().Msg("Worker terminated")
}
}(workerLog)
}()

// Start an endless loop
mainLoop:
Expand Down Expand Up @@ -361,7 +365,7 @@ mainLoop:
if m.customMessageHandling != nil {
workerLog.Trace().Msg("Executing custom handling")
start := time.Now()
handled := m.customMessageHandling(codec, message)
handled := m.customMessageHandling(m.DefaultCodecRequirements, message)
workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
if handled {
workerLog.Trace().Msg("Custom handling handled the message")
Expand Down
61 changes: 46 additions & 15 deletions plc4go/spi/default/DefaultCodec_test.go
Expand Up @@ -884,14 +884,10 @@ func Test_defaultCodec_Work(t *testing.T) {
running bool
customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
}
type args struct {
codec DefaultCodecRequirements
}
tests := []struct {
name string
fields fields
args args
mockSetup func(t *testing.T, fields *fields, args *args)
mockSetup func(t *testing.T, fields *fields)
manipulator func(t *testing.T, codec *defaultCodec)
}{
{
Expand All @@ -900,7 +896,7 @@ func Test_defaultCodec_Work(t *testing.T) {
codec.running.Store(true)
codec.activeWorker.Add(1)
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
fields.DefaultCodecRequirements = requirements
},
Expand Down Expand Up @@ -941,7 +937,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
fields.DefaultCodecRequirements = requirements
Expand Down Expand Up @@ -987,7 +983,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(nil, nil)
fields.DefaultCodecRequirements = requirements
Expand Down Expand Up @@ -1033,7 +1029,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
Expand All @@ -1057,7 +1053,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
Expand Down Expand Up @@ -1103,7 +1099,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
fields.DefaultCodecRequirements = requirements
Expand Down Expand Up @@ -1152,7 +1148,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
Expand Down Expand Up @@ -1201,7 +1197,7 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultCodecRequirements(t)
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
fields.DefaultCodecRequirements = requirements
Expand All @@ -1215,7 +1211,7 @@ func Test_defaultCodec_Work(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.mockSetup != nil {
tt.mockSetup(t, &tt.fields, &tt.args)
tt.mockSetup(t, &tt.fields)
}
m := &defaultCodec{
DefaultCodecRequirements: tt.fields.DefaultCodecRequirements,
Expand All @@ -1232,7 +1228,7 @@ func Test_defaultCodec_Work(t *testing.T) {
time.Sleep(200 * time.Millisecond)
m.running.Store(false)
}()
m.Work(tt.args.codec)
m.Work()
})
}
}
Expand Down Expand Up @@ -1300,3 +1296,38 @@ func Test_defaultCodec_String(t *testing.T) {
})
}
}

func Test_defaultCodec_startWorker(t *testing.T) {
type fields struct {
DefaultCodecRequirements DefaultCodecRequirements
transportInstance transports.TransportInstance
expectations []spi.Expectation
defaultIncomingMessageChannel chan spi.Message
customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
receiveTimeout time.Duration
traceDefaultMessageCodecWorker bool
}
tests := []struct {
name string
fields fields
}{
{
name: "start it not running",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &defaultCodec{
DefaultCodecRequirements: tt.fields.DefaultCodecRequirements,
transportInstance: tt.fields.transportInstance,
expectations: tt.fields.expectations,
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
customMessageHandling: tt.fields.customMessageHandling,
receiveTimeout: tt.fields.receiveTimeout,
traceDefaultMessageCodecWorker: tt.fields.traceDefaultMessageCodecWorker,
log: testutils.ProduceTestingLogger(t),
}
m.startWorker()
})
}
}

0 comments on commit 430655f

Please sign in to comment.