New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make sure broadcast happens #1871
Conversation
# Conflicts: # consensus/broadcast/shardChainMessenger.go
# Conflicts: # process/interceptors/processor/miniblockInterceptorProcessor.go
# Conflicts: # epochStart/bootstrap/epochStartMetaBlockProcessor.go # epochStart/bootstrap/epochStartMetaBlockProcessor_test.go # epochStart/mock/metaBlockInterceptorProcessorStub.go # process/interceptors/processor/hdrInterceptorProcessor.go # process/interceptors/processor/miniblockInterceptorProcessor.go # process/interceptors/processor/trieNodeInterceptorProcessor.go # process/interceptors/processor/txInterceptorProcessor.go # process/interface.go # process/mock/interceptorProcessorStub.go
if check.IfNil(args.HeadersSubscriber) { | ||
return spos.ErrNilHeadersSubscriber | ||
} | ||
if args.MaxDelayCacheSize == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MaxValidatorDelayCacheSize should not be checked for 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, done
return spos.ErrNilParameter | ||
} | ||
|
||
dbb.mutDataForBroadcast.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lock/Unlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
log.Debug("delayedBroadcast.headerReceived", "nbHeaderHashes", len(headerHashes)) | ||
for i := range headerHashes { | ||
log.Debug("delayedBroadcast.headerReceived", "headerHash", headerHashes[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be set to Trace after we are sure that everything will be ok
log.Debug("delayedBroadcast.scheduleValidatorBroadcast - header data for validator") | ||
|
||
for i := range dataForValidators { | ||
log.Debug("delayedBroadcast.scheduleValidatorBroadcast", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be set to Trace after we are sure that everything will be ok
|
||
log.Debug("delayedBroadcast.scheduleValidatorBroadcast - registered data for broadcast") | ||
for i := range dbb.valBroadcastData { | ||
log.Debug("delayedBroadcast.scheduleValidatorBroadcast", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be set to Trace after we are sure that everything will be ok
dbb.cacheHeaders.Put(headerHash, struct{}{}, 0) | ||
dbb.mutHeadersCache.Unlock() | ||
|
||
dbb.mutDataForBroadcast.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RLock/RUnlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
log.Debug("delayedBroadcast.interceptedHeader canceling alarm for broadcasting header", | ||
"headerHash", headerHash, | ||
"alarmID-header", alarmID, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You add break here, if there could not be more than one header witth the same round and prevRandSeed in dbb.valHeaderBroadcastData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"headerHash", broadcastData.headerHash, | ||
"alarmID-delay", alarmID, | ||
) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to iterate all dbb.valBroadcastData? There could not be more records with the same topic and hash in dbb.valBroadcastData ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should only be one, if the doEndRoundJob is called only once per consensus round, unless different leaders include the same miniblock hash "fromMe".
To cover also this case, removed the return.
core/alarm/alarm.go
Outdated
case <-ctx.Done(): | ||
return | ||
case evt := <-as.event: | ||
elapsedTime := time.Now().Sub(startTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.Since(startTime)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -78,6 +83,8 @@ func (mip *MiniblockInterceptorProcessor) Save(data process.InterceptedData, _ c | |||
return err | |||
} | |||
|
|||
go mip.notify(miniblock, interceptedMiniblock.Hash(), topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will notify even the cross miniblocks dest me and even if they are not whitelisted. It is ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The miniblocks "destination me" can be ignored, on subscriber (this is done on delayedBlockBroadcaster, as it waits for specific miniblocks on specific topics).
The whitelisting is done on the "cross shard destination me" miniblocks, whereas for delayed broadcaster the "cross shard source me" are relevant.
I agree that if the miniblock is received before the metablock notarizing the parent header then it could be ignored on destination, so waiting for suggestions here.
# Conflicts: # integrationTests/consensus/testInitializer.go # node/node_test.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System tests passed.
This implementation adds a fallback for broadcasting cross shard data.
In case leader fails to broadcast the cross shard miniblocks and transactions then participants in consensus will schedule their slots for doing the broadcast instead of consensus leader in the order they were assigned in consensus.
The broadcast slots are canceled when the cross shard data is observed on the cross shard topics.