New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server/asset/eth: Fix run. #1322
Conversation
d173ff5
to
dd34de8
Compare
server/asset/eth/eth.go
Outdated
@@ -154,9 +141,8 @@ func unconnectedETH(logger dex.Logger, cfg *config) *Backend { | |||
rpcCtx: ctx, | |||
cancelRPCs: cancel, | |||
cfg: cfg, | |||
blockCache: newBlockCache(logger), | |||
hashCache: newHashCache(logger), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you feel about creating the hashCache
in (ExchangeWallet).Connect
, and passing the context.Context
and ethFetcher
to the constructor so you can handle prime
ing internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also removed the constructor...
b6bbb48
to
e2ee2b2
Compare
server/asset/eth/eth_test.go
Outdated
// Ok if ctx was canceled above. Linters complain about calling t.Fatal | ||
// in the goroutine above. | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
t.Fatal("test timeout") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I put t.Fatal in the goroutine govet would tell me Error: testinggoroutine: call to (*T).Fatal from a non-test goroutine (govet)
and the wording suggests there's a test goroutine? Is there a way to fail in the goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at this pr at all yet, but generally you have to signal back to the main testing goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so the TestSomething(t *testing.T)
itself is the goroutine it is referring to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple tiny things.
We should consider some kind of block update metering, maybe in the consumer though. I imagine ethereum blocks come in pretty tight clusters sometimes.
e2ee2b2
to
4ec5308
Compare
What, tests are passing for me locally. Oh, client. Will leave since unrelated to this pr. |
The bug on master is fixed here: #1320 (comment) |
4ec5308
to
96190b0
Compare
rebased to correct a commit typo |
56fa200
to
0de334b
Compare
Rebased to fix the test error. |
server/asset/eth/eth.go
Outdated
signalMtx: ð.signalMtx, | ||
blockChans: eth.blockChans, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A pointer to the mutex and a reference to the parent's map are suspect. AFAICT Backend
no longer has any business with either of these.
server/asset/eth/eth.go:
diff --git a/server/asset/eth/eth.go b/server/asset/eth/eth.go
index e97c6fb6..7afcac10 100644
--- a/server/asset/eth/eth.go
+++ b/server/asset/eth/eth.go
@@ -104,10 +104,7 @@ type Backend struct {
cancelRPCs context.CancelFunc
cfg *config
node ethFetcher
- // The backend provides block notification channels through the BlockChannel
- // method. signalMtx locks the blockChans array.
- signalMtx sync.RWMutex
- blockChans map[chan *asset.BlockUpdate]struct{}
+
// The hash cache stores just enough info to detect reorgs.
hashCache *hashCache
// A logger will be provided by the DEX. All logging should use the provided
@@ -148,7 +145,6 @@ func unconnectedETH(logger dex.Logger, cfg *config) *Backend {
cancelRPCs: cancel,
cfg: cfg,
log: logger,
- blockChans: make(map[chan *asset.BlockUpdate]struct{}),
contractAddr: contractAddr,
initTxSize: uint32(dexeth.InitGas(1, version)),
}
@@ -184,8 +180,7 @@ func (eth *Backend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
eth.hashCache = &hashCache{
log: eth.log.SubLogger("hashcache"),
node: &c,
- signalMtx: ð.signalMtx,
- blockChans: eth.blockChans,
+ blockChans: make(map[chan *asset.BlockUpdate]struct{}),
best: hashN{
height: hdr.Number.Uint64(),
hash: hdr.Hash(),
@@ -242,11 +237,7 @@ func (eth *Backend) FeeRate(ctx context.Context) (uint64, error) {
// updates. If the returned channel is ever blocking, there will be no error
// logged from the eth package. Part of the asset.Backend interface.
func (eth *Backend) BlockChannel(size int) <-chan *asset.BlockUpdate {
- c := make(chan *asset.BlockUpdate, size)
- eth.signalMtx.Lock()
- defer eth.signalMtx.Unlock()
- eth.blockChans[c] = struct{}{}
- return c
+ return eth.hashCache.blockChannel(size)
}
server/asset/eth/cache.go:
diff --git a/server/asset/eth/cache.go b/server/asset/eth/cache.go
index 99737f16..5a14692b 100644
--- a/server/asset/eth/cache.go
+++ b/server/asset/eth/cache.go
@@ -25,17 +25,22 @@ type hashN struct {
// listeners of new blocks. All methods are concurrent safe unless specified
// otherwise.
type hashCache struct {
- // signalMtx locks the blockChans array.
- signalMtx *sync.RWMutex
+ mtx sync.RWMutex
+ best hashN
blockChans map[chan *asset.BlockUpdate]struct{}
- mtx sync.RWMutex
- best hashN
-
log dex.Logger
node ethFetcher
}
+func (hc *hashCache) blockChannel(size int) <-chan *asset.BlockUpdate {
+ c := make(chan *asset.BlockUpdate, size)
+ hc.mtx.Lock()
+ defer hc.mtx.Unlock()
+ hc.blockChans[c] = struct{}{}
+ return c
+}
+
// poll pulls the best hash from an eth node and compares that to a stored
// hash. If the same does nothing. If different, updates the stored hash and
// notifies listeners on block chans.
@@ -44,7 +49,6 @@ func (hc *hashCache) poll(ctx context.Context) {
if err != nil {
hc.log.Error(err)
}
- hc.signalMtx.Lock()
for c := range hc.blockChans {
select {
case c <- &asset.BlockUpdate{
@@ -55,8 +59,8 @@ func (hc *hashCache) poll(ctx context.Context) {
hc.log.Error("failed to send block update on blocking channel")
}
}
- hc.signalMtx.Unlock()
}
+
hc.mtx.Lock()
defer hc.mtx.Unlock()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't still need to hold a read lock for ranging over the hc.blockChans
map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hashCache
has it's own mutex, and the use pattern in poll
as written is that it will always be locked when both blockChans
and best
are accessed. Do you need two mutexes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right.
So originally this was created with the unconnected wallet, so a lot of the wierdness is to make the exported BlockChannel
not panic before hashCache is created in Connect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'll just remove the type since it doesn't really do that much anymore.
server/asset/eth/eth.go
Outdated
// The block cache stores just enough info about the blocks to prevent future | ||
// calls to Block. | ||
blockCache *blockCache | ||
// The hash cache stores just enough info to detect reorgs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... it also handles block polling and new block notifications to BlockChannel consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the hashCache in the last commit. Let me know how it looks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like an improved design to me. In run
there is a polling loop, and if that's the only caller of poll
and it's a synchronous call, then the fields of best
don't need a mutex any more as it appears you've concluded as well. And only needs to rlock blockChansMtx
if something actually changes, not each call to poll
.
👍
That the idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Only cache the best hash and use it to determine if we see a reorg. Move polling and notification logic to hashCache.
0de334b
to
6b4a2e8
Compare
part of #1154