Skip to content

Commit

Permalink
Merge branch 'master' into ADG-7965
Browse files Browse the repository at this point in the history
  • Loading branch information
IldarKamalov committed Jan 12, 2024
2 parents df0f189 + 1e0ff4d commit e79b7bf
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -63,6 +63,7 @@ NOTE: Add new changes BELOW THIS COMMENT.

### Fixed

- Zero value in `querylog.size_memory` disables logging ([#6570]).
- Non-anonymized IP addresses on the dashboard ([#6584]).
- Maximum cache TTL requirement when editing minimum cache TTL in the Web UI
([#6409]).
Expand All @@ -81,6 +82,7 @@ NOTE: Add new changes BELOW THIS COMMENT.
[#6541]: https://github.com/AdguardTeam/AdGuardHome/issues/6541
[#6545]: https://github.com/AdguardTeam/AdGuardHome/issues/6545
[#6568]: https://github.com/AdguardTeam/AdGuardHome/issues/6568
[#6570]: https://github.com/AdguardTeam/AdGuardHome/issues/6570
[#6574]: https://github.com/AdguardTeam/AdGuardHome/issues/6574
[#6584]: https://github.com/AdguardTeam/AdGuardHome/issues/6584

Expand Down
18 changes: 5 additions & 13 deletions internal/aghalg/ringbuffer.go
@@ -1,23 +1,15 @@
package aghalg

import (
"github.com/AdguardTeam/golibs/errors"
)

// RingBuffer is the implementation of ring buffer data structure.
type RingBuffer[T any] struct {
buf []T
cur int
cur uint
full bool
}

// NewRingBuffer initializes the new instance of ring buffer. size must be
// greater or equal to zero.
func NewRingBuffer[T any](size int) (rb *RingBuffer[T]) {
if size < 0 {
panic(errors.Error("ring buffer: size must be greater or equal to zero"))
}

func NewRingBuffer[T any](size uint) (rb *RingBuffer[T]) {
return &RingBuffer[T]{
buf: make([]T, size),
}
Expand All @@ -30,7 +22,7 @@ func (rb *RingBuffer[T]) Append(e T) {
}

rb.buf[rb.cur] = e
rb.cur = (rb.cur + 1) % cap(rb.buf)
rb.cur = (rb.cur + 1) % uint(cap(rb.buf))
if rb.cur == 0 {
rb.full = true
}
Expand Down Expand Up @@ -87,12 +79,12 @@ func (rb *RingBuffer[T]) splitCur() (before, after []T) {
}

// Len returns a length of the buffer.
func (rb *RingBuffer[T]) Len() (l int) {
func (rb *RingBuffer[T]) Len() (l uint) {
if !rb.full {
return rb.cur
}

return cap(rb.buf)
return uint(cap(rb.buf))
}

// Clear clears the buffer.
Expand Down
32 changes: 14 additions & 18 deletions internal/aghalg/ringbuffer_test.go
Expand Up @@ -9,13 +9,13 @@ import (
)

// elements is a helper function that returns n elements of the buffer.
func elements(b *aghalg.RingBuffer[int], n int, reverse bool) (es []int) {
func elements(b *aghalg.RingBuffer[int], n uint, reverse bool) (es []int) {
fn := b.Range
if reverse {
fn = b.ReverseRange
}

i := 0
var i uint
fn(func(e int) (cont bool) {
if i >= n {
return false
Expand All @@ -42,29 +42,25 @@ func TestNewRingBuffer(t *testing.T) {
assert.Zero(t, b.Len())
})

t.Run("negative_size", func(t *testing.T) {
assert.PanicsWithError(t, "ring buffer: size must be greater or equal to zero", func() {
aghalg.NewRingBuffer[int](-5)
})
})

t.Run("zero", func(t *testing.T) {
b := aghalg.NewRingBuffer[int](0)
for i := 0; i < 10; i++ {
b.Append(i)
assert.Equal(t, 0, b.Len())
assert.Empty(t, elements(b, b.Len(), false))
assert.Empty(t, elements(b, b.Len(), true))
bufLen := b.Len()
assert.EqualValues(t, 0, bufLen)
assert.Empty(t, elements(b, bufLen, false))
assert.Empty(t, elements(b, bufLen, true))
}
})

t.Run("single", func(t *testing.T) {
b := aghalg.NewRingBuffer[int](1)
for i := 0; i < 10; i++ {
b.Append(i)
assert.Equal(t, 1, b.Len())
assert.Equal(t, []int{i}, elements(b, b.Len(), false))
assert.Equal(t, []int{i}, elements(b, b.Len(), true))
bufLen := b.Len()
assert.EqualValues(t, 1, bufLen)
assert.Equal(t, []int{i}, elements(b, bufLen, false))
assert.Equal(t, []int{i}, elements(b, bufLen, true))
}
})
}
Expand All @@ -78,7 +74,7 @@ func TestRingBuffer_Range(t *testing.T) {
name string
want []int
count int
length int
length uint
}{{
name: "three",
count: 3,
Expand Down Expand Up @@ -163,11 +159,11 @@ func TestRingBuffer_Range_increment(t *testing.T) {
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
b.Append(i)

assert.Equal(t, tc.want, elements(b, b.Len(), false))
bufLen := b.Len()
assert.Equal(t, tc.want, elements(b, bufLen, false))

slices.Reverse(tc.want)
assert.Equal(t, tc.want, elements(b, b.Len(), true))
assert.Equal(t, tc.want, elements(b, bufLen, true))
})
}
}
2 changes: 1 addition & 1 deletion internal/home/config.go
Expand Up @@ -267,7 +267,7 @@ type queryLogConfig struct {

// MemSize is the number of entries kept in memory before they are flushed
// to disk.
MemSize int `yaml:"size_memory"`
MemSize uint `yaml:"size_memory"`

// Enabled defines if the query log is enabled.
Enabled bool `yaml:"enabled"`
Expand Down
5 changes: 3 additions & 2 deletions internal/querylog/qlog.go
Expand Up @@ -196,7 +196,7 @@ func newLogEntry(params *AddParams) (entry *logEntry) {
// Add implements the [QueryLog] interface for *queryLog.
func (l *queryLog) Add(params *AddParams) {
var isEnabled, fileIsEnabled bool
var memSize int
var memSize uint
func() {
l.confMu.RLock()
defer l.confMu.RUnlock()
Expand All @@ -205,7 +205,7 @@ func (l *queryLog) Add(params *AddParams) {
memSize = l.conf.MemSize
}()

if !isEnabled || memSize == 0 {
if !isEnabled {
return
}

Expand All @@ -230,6 +230,7 @@ func (l *queryLog) Add(params *AddParams) {
if !l.flushPending && fileIsEnabled && l.buffer.Len() >= memSize {
l.flushPending = true

// TODO(s.chzhen): Fix occasional rewrite of entires.
go func() {
flushErr := l.flushLogBuffer()
if flushErr != nil {
Expand Down
11 changes: 7 additions & 4 deletions internal/querylog/querylog.go
Expand Up @@ -63,7 +63,7 @@ type Config struct {

// MemSize is the number of entries kept in a memory buffer before they are
// flushed to disk.
MemSize int
MemSize uint

// Enabled tells if the query log is enabled.
Enabled bool
Expand Down Expand Up @@ -143,14 +143,17 @@ func newQueryLog(conf Config) (l *queryLog, err error) {
}
}

if conf.MemSize < 0 {
return nil, errors.Error("memory size must be greater or equal to zero")
memSize := conf.MemSize
if memSize == 0 {
// If query log is enabled, we still need to write entries to a file.
// And all writing goes through a buffer.
memSize = 1
}

l = &queryLog{
findClient: findClient,

buffer: aghalg.NewRingBuffer[*logEntry](conf.MemSize),
buffer: aghalg.NewRingBuffer[*logEntry](memSize),

conf: &Config{},
confMu: &sync.RWMutex{},
Expand Down
14 changes: 11 additions & 3 deletions internal/querylog/search.go
Expand Up @@ -47,7 +47,14 @@ func (l *queryLog) client(clientID, ip string, cache clientCache) (c *Client, er
// searchMemory looks up log records which are currently in the in-memory
// buffer. It optionally uses the client cache, if provided. It also returns
// the total amount of records in the buffer at the moment of searching.
// l.confMu is expected to be locked.
func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entries []*logEntry, total int) {
// We use this configuration check because a buffer can contain a single log
// record. See [newQueryLog].
if l.conf.MemSize == 0 {
return nil, 0
}

l.bufferLock.Lock()
defer l.bufferLock.Unlock()

Expand All @@ -73,11 +80,12 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie
return true
})

return entries, l.buffer.Len()
return entries, int(l.buffer.Len())
}

// search - searches log entries in the query log using specified parameters
// returns the list of entries found + time of the oldest entry
// search searches log entries in memory buffer and log file using specified
// parameters and returns the list of entries found and the time of the oldest
// entry. l.confMu is expected to be locked.
func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest time.Time) {
start := time.Now()

Expand Down

0 comments on commit e79b7bf

Please sign in to comment.