Skip to content

Commit

Permalink
feat(plc4go/bacnet): fix some open issues regarding task processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jan 16, 2023
1 parent 7ff73a2 commit eb59f7c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 16 deletions.
63 changes: 61 additions & 2 deletions plc4go/internal/bacnetip/Core.go
Expand Up @@ -19,9 +19,68 @@

package bacnetip

import "github.com/rs/zerolog/log"
import (
"github.com/rs/zerolog/log"
"math"
"time"
)

var running bool
var spin = 10 * time.Millisecond
var sleepTime = 0 * time.Nanosecond
var deferredFunctions []func() error

func init() {
running = true
go func() {
for running {
// get the next task
task, delta := _taskManager.getNextTask()
if task != nil {
_taskManager.processTask(task)
}

// if delta is None, there are no tasks, default to spinning
if delta == 0 {
delta = spin
}

// there may be threads around, sleep for a bit
if sleepTime > 0 && delta > sleepTime {
time.Sleep(sleepTime)
delta -= sleepTime
}

// delta should be no more than the spin value
delta = time.Duration(math.Min(float64(delta), float64(spin)))

// if there are deferred functions, use a small delta
if len(deferredFunctions) > 0 {
delta = time.Duration(math.Min(float64(delta), float64(1*time.Millisecond)))
}

// wait for socket
time.Sleep(delta)

// check for deferred functions
fnlist := deferredFunctions
// empty list
deferredFunctions = nil
for _, fn := range fnlist {
if err := fn(); err != nil {
log.Debug().Err(err).Msg("error executing deferred function")
}
}
}
}()
}

func Deferred(fn func() error) {
log.Debug().Msg("Deferred")
// TODO: implement me

// append it to the list
deferredFunctions = append(deferredFunctions, fn)

// trigger the task manager event
// TODO: there is no trigger
}
14 changes: 0 additions & 14 deletions plc4go/internal/bacnetip/Task.go
Expand Up @@ -223,20 +223,6 @@ func RecurringFunctionTask(interval *time.Duration, fn func() error) *RecurringT

var _taskManager = TaskManager{}

func init() {
go func() {
for {
task, delta := _taskManager.getNextTask()
if task == nil {
time.Sleep(10 * time.Millisecond)
continue
}
_taskManager.processTask(task)
time.Sleep(delta)
}
}()
}

type TaskManager struct {
sync.Mutex
tasks []_TaskRequirements
Expand Down

0 comments on commit eb59f7c

Please sign in to comment.