Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* @samcm
* @savid
* @mattevans
17 changes: 17 additions & 0 deletions .github/workflows/go-setup/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: 'Go Setup'
description: 'Sets up Go environment with caching'

inputs:
go-version:
description: 'Go version to use'
required: false
default: '1.23.4'

runs:
using: "composite"
steps:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ inputs.go-version }}
cache: true
18 changes: 18 additions & 0 deletions .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: go-test

on:
push:
branches:
- master
pull_request:

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: ./.github/workflows/go-setup
- run: go test -v -race ./...
89 changes: 85 additions & 4 deletions beacon_chain.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package ethwallclock

import (
"sync"
"time"
)

type EthereumBeaconChain struct {
slots *DefaultSlotCreator
epochs *DefaultEpochCreator

mu sync.RWMutex
epochChangedCallbacks []func(current Epoch)
slotChangedCallbacks []func(current Slot)

slotCh chan struct{}
epochCh chan struct{}
stopCh chan struct{}
stopped bool
}

func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, slotsPerEpoch uint64) *EthereumBeaconChain {
Expand All @@ -25,20 +29,39 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl

slotCh: make(chan struct{}),
epochCh: make(chan struct{}),
stopCh: make(chan struct{}),
stopped: false,
}

go func() {
for {
select {
case <-e.slotCh:
return
case <-e.stopCh:
return
default:
slot := e.slots.Current()

time.Sleep(time.Until(slot.TimeWindow().End()))

slot = e.slots.Current()
for _, callback := range e.slotChangedCallbacks {

// Take a read lock and copy the callbacks.
e.mu.RLock()

if e.stopped {
e.mu.RUnlock()

return
}

callbacks := make([]func(current Slot), len(e.slotChangedCallbacks))
copy(callbacks, e.slotChangedCallbacks)
e.mu.RUnlock()

// Execute callbacks from our copy.
for _, callback := range callbacks {
go callback(slot)
}
}
Expand All @@ -50,13 +73,30 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl
select {
case <-e.epochCh:
return
case <-e.stopCh:
return
default:
epoch := e.epochs.Current()

time.Sleep(time.Until(epoch.TimeWindow().End()))

epoch = e.epochs.Current()
for _, callback := range e.epochChangedCallbacks {

// Take a read lock and copy the callbacks.
e.mu.RLock()

if e.stopped {
e.mu.RUnlock()

return
}

callbacks := make([]func(current Epoch), len(e.epochChangedCallbacks))
copy(callbacks, e.epochChangedCallbacks)
e.mu.RUnlock()

// Execute callbacks from our copy.
for _, callback := range callbacks {
go callback(epoch)
}
}
Expand Down Expand Up @@ -89,16 +129,57 @@ func (e *EthereumBeaconChain) Epochs() *DefaultEpochCreator {
}

func (e *EthereumBeaconChain) OnEpochChanged(callback func(current Epoch)) {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return
}

e.epochChangedCallbacks = append(e.epochChangedCallbacks, callback)
}

func (e *EthereumBeaconChain) OnSlotChanged(callback func(current Slot)) {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return
}

e.slotChangedCallbacks = append(e.slotChangedCallbacks, callback)
}

func (e *EthereumBeaconChain) Stop() {
e.slotCh <- struct{}{}
e.epochCh <- struct{}{}
e.mu.Lock()

if e.stopped {
e.mu.Unlock()

return
}

e.stopped = true
e.mu.Unlock()

close(e.stopCh)

// Send a signal to the other channels, but don't close them yet
// to avoid "send on closed channel" panics from any other goroutines.
select {
case e.slotCh <- struct{}{}:
default:
}

select {
case e.epochCh <- struct{}{}:
default:
}

// Small delay to allow goroutines to exit
time.Sleep(100 * time.Millisecond)

// Now safe to close
close(e.slotCh)
close(e.epochCh)
}
102 changes: 96 additions & 6 deletions beacon_chain_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,125 @@
package ethwallclock

import (
"sync"
"testing"
"time"
)

// MetadataService mocks the metadata service we use across xatu.
type MetadataService struct {
wallclock *EthereumBeaconChain
}

// Wallclock returns the wallclock instance (can be nil).
func (m *MetadataService) Wallclock() *EthereumBeaconChain {
return m.wallclock
}

func TestBeaconChainEventCallbacks(t *testing.T) {
beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2)

t.Run("Event callbacks", func(t *testing.T) {
epochCallbacks := 0
slotCallbacks := 0
var (
mu sync.Mutex
epochCallbacks int
slotCallbacks int
)

beacon.OnEpochChanged(func(epoch Epoch) {
mu.Lock()
epochCallbacks++
mu.Unlock()
})

beacon.OnSlotChanged(func(slot Slot) {
mu.Lock()
slotCallbacks++
mu.Unlock()
})

time.Sleep(5100 * time.Millisecond)

if epochCallbacks != 2 {
t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCallbacks, 2)
mu.Lock()
epochCount := epochCallbacks
slotCount := slotCallbacks
mu.Unlock()

if epochCount != 2 {
t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCount, 2)
}

if slotCallbacks != 5 {
t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCallbacks, 5)
if slotCount != 5 {
t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCount, 5)
}
})

beacon.Stop()
}

// TestConcurrentStopAndCallback tests that there's no race condition
// between stopping the beacon chain and registering/executing callbacks.
func TestConcurrentStopAndCallback(t *testing.T) {
beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2)

// Set up a sync WaitGroup to coordinate goroutines.
var wg sync.WaitGroup

// Start multiple goroutines that try to register callbacks.
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Try to register a callback - should not panic even if Stop is called concurrently.
beacon.OnEpochChanged(func(epoch Epoch) {})
}(i)
}

// Start a goroutine that stops the beacon chain.
wg.Add(1)
go func() {
defer wg.Done()
// Small delay to increase chance of concurrent execution
time.Sleep(5 * time.Millisecond)
beacon.Stop()
}()

// Wait for all goroutines to finish.
wg.Wait()
}

// TestNilWallclockScenario specifically tests for the panic seen in production:
// when OnEpochChanged is called on a nil receiver.
func TestNilWallclockScenario(t *testing.T) {
// Create a metadata service with a valid wallclock
metadata := &MetadataService{
wallclock: NewEthereumBeaconChain(time.Now(), time.Second*1, 2),
}

wc := metadata.Wallclock()
if wc == nil {
t.Fatal("Wallclock should not be nil")
}

// Register a callback.
wc.OnEpochChanged(func(epoch Epoch) {})

// If the beacon chain connection fails/is-lost, the wallclock becomes nil.
// Subsequent callbacks then attempt to call the nil wallclock, which panics.
metadata.wallclock = nil

shouldPanic := func() {
defer func() {
if r := recover(); r == nil {
t.Error("Expected panic when using nil wallclock, but no panic occurred")
} else {
t.Logf("Got expected panic: %v", r)
}
}()

wc := metadata.Wallclock() // Get nil wallclock.
wc.OnEpochChanged(func(epoch Epoch) {})
}

shouldPanic()
}
Loading