Skip to content

Commit

Permalink
go2go: overhaul the Buffer and PubSub types to use generics
Browse files Browse the repository at this point in the history
Use the go2go generics experiment to enforce type checking at compile
time instead of runtime.

Switch the Buffer type to manipulate a slice of comparable T's instead
of a slice of empty interfaces. Remove the global EmptyMarker in favor
of a per-Buffer instance empty marker value.

Add an any T type parameter to the PubSub type. Use an internal Buffer
of cells of T, which hold a T value or a marker channel. The marker
channels indicate an unsubscribing consumer or the closing of the
PubSub instance.

Drop the Publisher & Subscriber API, it is no longer needed now that the
publish & subscribe methods for both functions & channels support the
instance's type parameter.
  • Loading branch information
benburkert committed Jan 25, 2021
1 parent 86eaf77 commit 73593d5
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 202 deletions.
56 changes: 26 additions & 30 deletions buffer.go2
Expand Up @@ -7,17 +7,12 @@ import (
"github.com/benburkert/pubsub/cursor"
)

type Marker byte
type ReaderFunc[T comparable] func(T) bool

const (
EmptyMarker Marker = iota
)

type ReaderFunc func(interface{}) bool

type Buffer struct {
mu sync.RWMutex
data []interface{}
type Buffer[T comparable] struct {
mu sync.RWMutex
data []T
empty T

wcond *sync.Cond
wcursor *cursor.Cursor
Expand All @@ -26,26 +21,27 @@ type Buffer struct {
rcursors cursor.Slice
}

func NewBuffer(minSize, maxReaders int) *Buffer {
size := calcBufferSize(minSize)
func NewBuffer[T comparable](empty T, minSize, maxReaders int) *Buffer[T] {
size := CalcBufferSize(minSize)
mask := size - 1

b := &Buffer{
data: make([]interface{}, size),
b := &Buffer[T]{
data: make([]T, size),
empty: empty,
wcursor: cursor.New(0, mask),
rcursors: cursor.MakeSlice(maxReaders, mask),
}

for i := range b.data {
b.data[i] = EmptyMarker
b.data[i] = empty
}

b.wcond = sync.NewCond(&b.mu)
b.rcond = sync.NewCond(b.mu.RLocker())
return b
}

func (b *Buffer) FullReadTo(rfn ReaderFunc) []interface{} {
func (b *Buffer[T]) FullReadTo(rfn ReaderFunc[T]) []T {
b.mu.RLock() // unlocked in readTo

c := b.getCursor() // reset in readTo
Expand All @@ -55,7 +51,7 @@ func (b *Buffer) FullReadTo(rfn ReaderFunc) []interface{} {
return s
}

func (b *Buffer) Read() []interface{} {
func (b *Buffer[T]) Read() []T {
b.mu.RLock()
defer b.mu.RUnlock()

Expand All @@ -65,20 +61,20 @@ func (b *Buffer) Read() []interface{} {
return b.read(c)
}

func (b *Buffer) ReadTo(rfn ReaderFunc) {
func (b *Buffer[T]) ReadTo(rfn ReaderFunc[T]) {
b.mu.RLock() // unlocked in readTo

go b.readTo(b.getCursor(), rfn)
}

func (b *Buffer) Write(v interface{}) {
func (b *Buffer[T]) Write(v T) {
b.mu.Lock()
defer b.mu.Unlock()

b.write(v)
}

func (b *Buffer) WriteSlice(vs []interface{}) {
func (b *Buffer[T]) WriteSlice(vs []T) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -88,33 +84,33 @@ func (b *Buffer) WriteSlice(vs []interface{}) {
}

// assumes b.mu RLock held
func (b *Buffer) getCursor() *cursor.Cursor {
func (b *Buffer[T]) getCursor() *cursor.Cursor {
return b.rcursors.Alloc(b.wcursor.Pos())
}

// assumes b.mu Rlock held
func (b *Buffer) read(c *cursor.Cursor) []interface{} {
func (b *Buffer[T]) read(c *cursor.Cursor) []T {
rpos := c.Pos()
if b.data[rpos] == EmptyMarker {
s := make([]interface{}, rpos)
if b.data[rpos] == b.empty {
s := make([]T, rpos)
copy(s, b.data[:rpos])
return s
}

size := int(len(b.data))
s := make([]interface{}, size)
s := make([]T, size)
copy(s[:(size-rpos)], b.data[rpos:])
copy(s[(size-rpos):], b.data[:rpos])
return s
}

// assumes b.mu RLock held
func (b *Buffer) readBarrier(c *cursor.Cursor) bool {
func (b *Buffer[T]) readBarrier(c *cursor.Cursor) bool {
return c.Pos() == b.wcursor.Pos()
}

// asumes b.mu RLock held
func (b *Buffer) readTo(c *cursor.Cursor, rfn ReaderFunc) {
func (b *Buffer[T]) readTo(c *cursor.Cursor, rfn ReaderFunc[T]) {
defer b.mu.RUnlock()
defer b.wcond.Signal()
defer c.Reset()
Expand All @@ -136,7 +132,7 @@ func (b *Buffer) readTo(c *cursor.Cursor, rfn ReaderFunc) {
}

// asumes b.mu Lock held
func (b *Buffer) write(v interface{}) {
func (b *Buffer[T]) write(v T) {
for b.writeBarrier() {
b.wcond.Wait()
}
Expand All @@ -149,7 +145,7 @@ func (b *Buffer) write(v interface{}) {
}

// assumes b.mu Lock held
func (b *Buffer) writeBarrier() bool {
func (b *Buffer[T]) writeBarrier() bool {
npos := b.wcursor.Next()
for _, c := range b.rcursors {
if npos == c.Pos() {
Expand All @@ -159,6 +155,6 @@ func (b *Buffer) writeBarrier() bool {
return false
}

func calcBufferSize(minSize int) int {
func CalcBufferSize(minSize int) int {
return int(math.Pow(2, math.Ceil(math.Log2(float64(minSize)))))
}

0 comments on commit 73593d5

Please sign in to comment.