Skip to content

Commit

Permalink
Merge pull request #1 from ThotaGopichandThota/Gopi
Browse files Browse the repository at this point in the history
v0.0.1
  • Loading branch information
ThotaGopichandThota authored Apr 30, 2024
2 parents 3a748ac + c32f5f0 commit efeadf8
Show file tree
Hide file tree
Showing 325 changed files with 70,780 additions and 1 deletion.
23 changes: 23 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories
/vendor

# IDE files
.idea/

# Go workspace file
go.work

.idea
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
test:
@echo " > Running unit tests"
go test -cover -race -coverprofile=coverage.txt -covermode=atomic -v ./...
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# gn-core3
# gn-core3-go

mx-chain-go common components and data that can be used in other repositories as well
17 changes: 17 additions & 0 deletions core/accumulator/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package accumulator

import "time"

const MinimumAllowedTime = minimumAllowedTime

func (ta *timeAccumulator) Data() []interface{} {
ta.mut.Lock()
data := make([]interface{}, len(ta.data))
ta.mut.Unlock()

return data
}

func (ta *timeAccumulator) ComputeWaitTime() time.Duration {
return ta.computeWaitTime()
}
133 changes: 133 additions & 0 deletions core/accumulator/timeAccumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package accumulator

import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
"sync"
"time"

"github.com/ThotaGopichandThota/gn-core3/core"
"github.com/ThotaGopichandThota/gn-core3/core/check"
)

var _ core.Accumulator = (*timeAccumulator)(nil)

const minimumAllowedTime = time.Millisecond * 10

// timeAccumulator is a structure that is able to accumulate data and will try to write on the output channel
// once per provided interval
type timeAccumulator struct {
cancel func()
maxAllowedTime time.Duration
maxOffset time.Duration
mut sync.Mutex
data []interface{}
output chan []interface{}
log core.Logger
}

// NewTimeAccumulator returns a new accumulator instance
func NewTimeAccumulator(maxAllowedTime time.Duration, maxOffset time.Duration, logger core.Logger) (*timeAccumulator, error) {
if maxAllowedTime < minimumAllowedTime {
return nil, fmt.Errorf("%w for maxAllowedTime as minimum allowed time is %v",
core.ErrInvalidValue,
minimumAllowedTime,
)
}
if maxOffset < 0 {
return nil, fmt.Errorf("%w for maxOffset: should not be negative", core.ErrInvalidValue)
}
if check.IfNil(logger) {
return nil, core.ErrNilLogger
}

ctx, cancel := context.WithCancel(context.Background())

ta := &timeAccumulator{
cancel: cancel,
maxAllowedTime: maxAllowedTime,
output: make(chan []interface{}),
maxOffset: maxOffset,
log: logger,
}

go ta.continuousEviction(ctx)

return ta, nil
}

// AddData will append a new data on the queue
func (ta *timeAccumulator) AddData(data interface{}) {
ta.mut.Lock()
ta.data = append(ta.data, data)
ta.mut.Unlock()
}

// OutputChannel returns the output channel on which accumulated data will be sent periodically
func (ta *timeAccumulator) OutputChannel() <-chan []interface{} {
return ta.output
}

// will call do eviction periodically until the context is done
func (ta *timeAccumulator) continuousEviction(ctx context.Context) {
defer func() {
close(ta.output)
}()

for {
select {
case <-time.After(ta.computeWaitTime()):
isDone := ta.doEviction(ctx)
if isDone {
return
}
case <-ctx.Done():
ta.log.Debug("closing timeAccumulator.continuousEviction go routine")
return
}
}
}

func (ta *timeAccumulator) computeWaitTime() time.Duration {
if ta.maxOffset == 0 {
return ta.maxAllowedTime
}

randBuff := make([]byte, 4)
_, _ = rand.Read(randBuff)
randUint64 := binary.BigEndian.Uint32(randBuff)
offset := time.Duration(randUint64) % ta.maxOffset

return ta.maxAllowedTime - offset
}

// doEviction will do the eviction of all accumulated data
// if context.Done is triggered during the eviction, the whole operation will be aborted
func (ta *timeAccumulator) doEviction(ctx context.Context) bool {
ta.mut.Lock()
tempData := make([]interface{}, len(ta.data))
copy(tempData, ta.data)
ta.data = nil
ta.mut.Unlock()

select {
case ta.output <- tempData:
return false
case <-ctx.Done():
ta.log.Debug("closing timeAccumulator.doEviction go routine")
return true
}
}

// Close stops the time accumulator's eviction loop and closes the output chan
func (ta *timeAccumulator) Close() error {
ta.cancel()
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (ta *timeAccumulator) IsInterfaceNil() bool {
return ta == nil
}
184 changes: 184 additions & 0 deletions core/accumulator/timeAccumulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package accumulator_test

import (
"errors"
"testing"
"time"

"github.com/ThotaGopichandThota/gn-core3/core"
"github.com/ThotaGopichandThota/gn-core3/core/accumulator"
"github.com/ThotaGopichandThota/gn-core3/core/check"
"github.com/ThotaGopichandThota/gn-core3/core/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var timeout = time.Second * 2

func TestNewTimeAccumulator_InvalidMaxWaitTimeShouldErr(t *testing.T) {
t.Parallel()

ta, err := accumulator.NewTimeAccumulator(accumulator.MinimumAllowedTime-1, 0, &mock.LoggerMock{})

assert.True(t, check.IfNil(ta))
assert.True(t, errors.Is(err, core.ErrInvalidValue))
}

func TestNewTimeAccumulator_InvalidMaxOffsetShouldErr(t *testing.T) {
t.Parallel()

ta, err := accumulator.NewTimeAccumulator(accumulator.MinimumAllowedTime, -1, &mock.LoggerMock{})

assert.True(t, check.IfNil(ta))
assert.True(t, errors.Is(err, core.ErrInvalidValue))
}

func TestNewTimeAccumulator_NilLoggerShouldErr(t *testing.T) {
t.Parallel()

ta, err := accumulator.NewTimeAccumulator(accumulator.MinimumAllowedTime, 0, nil)

assert.True(t, check.IfNil(ta))
assert.True(t, errors.Is(err, core.ErrNilLogger))
}

func TestNewTimeAccumulator_ShouldWork(t *testing.T) {
t.Parallel()

ta, err := accumulator.NewTimeAccumulator(accumulator.MinimumAllowedTime, 0, &mock.LoggerMock{})

assert.False(t, check.IfNil(ta))
assert.Nil(t, err)
}

//------- AddData

func TestTimeAccumulator_AddDataShouldWorkEvenIfTheChanIsBlocked(t *testing.T) {
t.Parallel()

chDone := make(chan struct{})
allowedTime := time.Millisecond * 100
ta, _ := accumulator.NewTimeAccumulator(allowedTime, 0, &mock.LoggerMock{})
go func() {
ta.AddData(struct{}{})
time.Sleep(allowedTime * 3)
ta.AddData(struct{}{})
ta.AddData(struct{}{})

existing := ta.Data()
assert.Equal(t, 2, len(existing))

chDone <- struct{}{}
}()

select {
case <-chDone:
case <-time.After(timeout):
assert.Fail(t, "test did not finish in a reasonable time span. "+
"Maybe problems with the used mutexes?")
}
}

//------- eviction

func TestTimeAccumulator_EvictionShouldStopWhenCloseIsCalled(t *testing.T) {
t.Parallel()

allowedTime := time.Millisecond * 100
ta, _ := accumulator.NewTimeAccumulator(allowedTime, 0, &mock.LoggerMock{})

ta.AddData(struct{}{})
time.Sleep(allowedTime * 3)

_ = ta.Close()
time.Sleep(allowedTime)

ch := ta.OutputChannel()
items, ok := <-ch

assert.False(t, ok)
assert.Equal(t, 0, len(items))
}

func TestTimeAccumulator_EvictionDuringWaitShouldStopWhenCloseIsCalled(t *testing.T) {
t.Parallel()

allowedTime := time.Millisecond * 100
ta, _ := accumulator.NewTimeAccumulator(allowedTime, 0, &mock.LoggerMock{})
ta.AddData(struct{}{})

_ = ta.Close()
time.Sleep(allowedTime)

ch := ta.OutputChannel()
items, ok := <-ch

assert.False(t, ok)
assert.Equal(t, 0, len(items))
}

func TestTimeAccumulator_EvictionShouldPreserveTheOrder(t *testing.T) {
t.Parallel()

allowedTime := time.Millisecond * 100
ta, _ := accumulator.NewTimeAccumulator(allowedTime, 0, &mock.LoggerMock{})

data := []interface{}{"data1", "data2", "data3"}
for _, d := range data {
ta.AddData(d)
}
time.Sleep(allowedTime * 3)

ch := ta.OutputChannel()
items, ok := <-ch

require.True(t, ok)
assert.Equal(t, data, items)
}

func TestTimeAccumulator_EvictionWithOffsetShouldPreserveTheOrder(t *testing.T) {
t.Parallel()

allowedTime := time.Millisecond * 100
ta, _ := accumulator.NewTimeAccumulator(allowedTime, time.Millisecond, &mock.LoggerMock{})

data := []interface{}{"data1", "data2", "data3"}
for _, d := range data {
ta.AddData(d)
}
time.Sleep(allowedTime * 3)

ch := ta.OutputChannel()
items, ok := <-ch

require.True(t, ok)
assert.Equal(t, data, items)
}

//------- computeWaitTime

func TestTimeAccumulator_ComputeWaitTimeWithMaxOffsetZeroShouldRetMaxWaitTime(t *testing.T) {
t.Parallel()

allowedTime := time.Millisecond * 56
ta, _ := accumulator.NewTimeAccumulator(allowedTime, 0, &mock.LoggerMock{})

assert.Equal(t, allowedTime, ta.ComputeWaitTime())
}

func TestTimeAccumulator_ComputeWaitTimeShouldWork(t *testing.T) {
t.Parallel()

allowedTime := time.Millisecond * 56
maxOffset := time.Millisecond * 12
ta, _ := accumulator.NewTimeAccumulator(allowedTime, maxOffset, &mock.LoggerMock{})

numComputations := 10000
for i := 0; i < numComputations; i++ {
waitTime := ta.ComputeWaitTime()
isInInterval := waitTime >= allowedTime-maxOffset &&
waitTime <= allowedTime

assert.True(t, isInInterval)
}
}
Loading

0 comments on commit efeadf8

Please sign in to comment.