Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pandora orc client #67

Merged
merged 32 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6944854
miner integration with orchestrator client
meta-bot Jun 7, 2021
4cbb06f
delete header from container added
meta-bot Jun 8, 2021
63b547a
write block with state modified
meta-bot Jun 8, 2021
8f4f48d
log debug is printed. Code is broken
meta-bot Jun 10, 2021
d08b151
Update .goreleaser.yml
patred20 Jun 10, 2021
1563fd2
Update .goreleaser.yml
patred20 Jun 10, 2021
8b46f16
Restricted goreleaser to linux/amd64
Jun 10, 2021
8b2c79d
Provided config file to goreleaser
Jun 10, 2021
01dfde8
Deleted go generate
Jun 10, 2021
15d0f54
Temporarily disabled goreleaser
Jun 10, 2021
bc713c4
Added mulinet connection
Jun 10, 2021
bcd77ce
changed binary name
Jun 10, 2021
e0c6e50
pandora engine test
meta-bot Jun 10, 2021
f548931
Feature: pandora-orc-client
blazejkrzak Jun 10, 2021
d57ae04
changed tag source
Jun 11, 2021
e453084
provide more log to debug
meta-bot Jun 11, 2021
69e0a9f
pandora extra data bug fixed
meta-bot Jun 13, 2021
96bcfc8
Merge branch 'feature/gh-actions' into feature/pandora-orc-client
meta-bot Jun 13, 2021
db5ca5e
Reverted extra data fixed
Jun 14, 2021
042b65b
Fix log context len error
Jun 14, 2021
d983f39
Donot return error from confirmation loop
Jun 14, 2021
4f62bc8
Covert status type from int to string
Jun 14, 2021
e849928
Increase re-try time limit for confirmation
Jun 14, 2021
229289d
feature pandora client buffer removed from api.go
Jun 15, 2021
06e6fb9
pending logs are printed in the api.go
Jun 15, 2021
718e548
result channel size is made to 0 in the worker.go
Jun 15, 2021
1b372b6
log changed in blockchain.go
Jun 15, 2021
46f1897
logs are added in the sending and receiving flow
Jun 21, 2021
3cc2f00
more logs are added for download fetch
Jun 21, 2021
c10d3db
pandora now support skipped block
Jun 23, 2021
e6317b4
TestPendingBlockHeaderFullpath is removed due to pandora engine not h…
Jun 29, 2021
1f1cdce
merged with epic/pandora-consensus and conflict resolve for release.yml
Jun 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions .github/.goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ before:
hooks:
# You may remove this if you don't use go modules.
- go mod tidy
# you may remove this if you don't need go generate
- go generate ...
builds:
- env:
- CGO_ENABLED=0
goos:
- linux
- windows
- darwin
goarch:
- amd64
ignore:
- goos: darwin
- goos: linux
goarch: arm
flags:
- -tags=blst_enabled
archives:
- replacements:
darwin: Darwin
linux: Linux
windows: Windows
386: i386
amd64: x86_64
format: binary
checksum:
Expand Down
36 changes: 31 additions & 5 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,37 @@ jobs:
run: |
go get -v -t -d ./...

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@master
- name: Build binary
run: |
mkdir builds &&
go build -v --tags=blst_enabled -o ./builds/geth ./cmd/geth

# - name: Run GoReleaser
# uses: goreleaser/goreleaser-action@master
# with:
# version: latest
# workdir: ./cmd/geth
# args: release --rm-dist --snapshot --config ../../.github/.goreleaser.yml
# env:
# GITHUB_TOKEN: ${{ secrets.GH_TOKEN }}


- name: Release
uses: softprops/action-gh-release@v1
with:
version: latest
workdir: ./cmd/geth
args: release --rm-dist --snapshot
tag_name: ${{ github.event.release.tag_name }}
files: |
./builds/geth
env:
GITHUB_TOKEN: ${{ secrets.GH_TOKEN }}

- name: Request Multinet Upgrade
env:
GITHUB_TOKEN: ${{ secrets.GH_TOKEN }}
run: |
export GH_TAG=${GITHUB_REF/refs\/tags\//} &&
curl -X POST \
-H "Accept: application/vnd.github.v3+json" \
-d '{"event_type":"upgrade dev", "client_payload": {"repository": "PANDORA", "tag": "'$GH_TAG'"}}' \
-u "$GITHUB_TOKEN" \
https://api.github.com/repos/lukso-network/l16-multinet/dispatches
189 changes: 181 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
package core

import (
"context"
"errors"
"fmt"
"io"
"math/big"
mrand "math/rand"
"net/url"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/pandora_orcclient"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
Expand Down Expand Up @@ -117,6 +122,11 @@ const (
BlockChainVersion uint64 = 8
)

const (
// orchestratorConfirmationRetrievalLimit is the maximum limit of orchestrator client confirmation retrieval
orchestratorConfirmationRetrievalLimit = 5
)

// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
Expand All @@ -130,7 +140,8 @@ type CacheConfig struct {
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
OrcClientEndpoint interface{} // It is a temporary hack. Will change it when refactoring. It is the address of orchestrator client.
}

// defaultCacheConfig are the default caching values if none are specified by the
Expand Down Expand Up @@ -212,6 +223,11 @@ type BlockChain struct {
writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format.

pendingHeaderContainer *PandoraPendingHeaderContainer // in memory temporary header container which holds headers for orchestrator confirmation.
confirmedBlockHashes event.Feed // orchestrator client will pass fetched response using this event.

// pandora-orchestrator data passing
orcClientSubscriber event.Subscription
blockConfirmationCh chan []*pandora_orcclient.BlockStatus
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -251,11 +267,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
engine: engine,
vmConfig: vmConfig,
pendingHeaderContainer: headerContainer,
blockConfirmationCh: make(chan []*pandora_orcclient.BlockStatus),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

bc.orcClientSubscriber = bc.SubscribeConfirmedBlockHashFetcher(bc.blockConfirmationCh)

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
Expand Down Expand Up @@ -400,6 +419,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}

// check if we are running pandora engine. If so, only then call the go routine
if bc.isPandora() {
// consensus engine is on pandora mode. Now run the orchestrator go routine
go func() {
err := bc.pandoraBlockHashConfirmationFetcher()
if err != nil {
log.Error("error found while fetching confirmed block hashes from orchestrator", err)
}
}()
}

return bc, nil
}

Expand All @@ -408,6 +439,119 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}

// isPandora returns if we are running pandora engine
func (bc *BlockChain) isPandora() bool {
ethashEngine, isEthashEngine := bc.engine.(*ethash.Ethash)
return isEthashEngine && ethashEngine.IsPandoraModeEnabled()
}

// pandoraBlockHashConfirmationFetcher is a ticker based loop. In every 2 sec, it calls
// the orchestrator client with a set of headers and fetch response from the orchestrator.
// If it gets any response, it will send that response in the feed.
// Miner and blockchain both are initialized from the backend. So this is a suitable place
// to write subscriber. Both Miner and blockchain can subscribe for the event from Ethereum.
func (bc *BlockChain) pandoraBlockHashConfirmationFetcher() error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

var orcClient *pandora_orcclient.OrcClient

switch orcClientObject := bc.cacheConfig.OrcClientEndpoint.(type) {
case []string:
// This may not be a good solution. But for now we are using it temporarily. In future we will change it.
// Miner.Notify will take an HTTP address of the orchestrator client.
// pandora engine using an web socket address in 0-th index.
// Thus, we are using http address at index- 1.
if len(orcClientObject) < 1 {
return errors.New("orchestrator http endpoint not provided")
}

// expecting http / https endpoint address. If not given throw an error
parsedUrl, err := url.Parse(orcClientObject[1])
if err != nil {
return err
}

// first initialize orchestrator client
switch parsedUrl.Scheme {
case "http", "https":
orcClient, err = pandora_orcclient.Dial(orcClientObject[1])
if err != nil {
return err
}

default:
return fmt.Errorf("expecting http or https scheme. but provided %s", parsedUrl.Scheme)
}
case *pandora_orcclient.OrcClient:
// for testing purpose we will send in process orchestrator client. we have to use it
orcClient = orcClientObject
default:
return fmt.Errorf("unsupported orchestrator client type")
}

for {
select {
case <-ticker.C:
// prepare header requests
headers := bc.GetPendingHeaderContainer().ReadAllHeaders()
if len(headers) == 0 {
// no header found. nothing to prepare request. simply continue
continue
}
log.Debug("tick", "retrieving pending headers", headers)

request, err := preparePanBlockHashRequest(headers)
if err != nil {
return err
}

log.Debug("sending request to the orchestrator", "request", request)

blockHashResponse, err := orcClient.GetConfirmedPanBlockHashes(context.Background(), request)
if err != nil {
return err
}
log.Debug("got confirmation", "sending to the feed", blockHashResponse)
// send the blockhash in the feed. Thus, miner and insertchain will be able to listen it.
bc.confirmedBlockHashes.Send(blockHashResponse)

case <-bc.quit:
// Ethereum service is closed. Break the loop
//close orc client
orcClient.Close()
return nil
}
}
}

// preparePanBlockHashRequest prepares pandora BlockHash request for the orchestrator client.
// After preparing request it will sort the request in slot ascending order.
func preparePanBlockHashRequest(headers []*types.Header) ([]*pandora_orcclient.BlockHash, error) {
var blockHashes []*pandora_orcclient.BlockHash
for _, header := range headers {
pandoraExtraData := ethash.PandoraExtraData{}
err := rlp.DecodeBytes(header.Extra, &pandoraExtraData)
if err != nil {
log.Error("found error while decoding pandora extra data", err)
return nil, err
}
blockHashes = append(blockHashes, &pandora_orcclient.BlockHash{Slot: pandoraExtraData.Slot, Hash: header.Hash()})
}

if len(blockHashes) > 0 {
sort.SliceStable(blockHashes, func(i, j int) bool {
return blockHashes[i].Slot < blockHashes[j].Slot
})
}

return blockHashes, nil
}

func (bc *BlockChain) SubscribeConfirmedBlockHashFetcher(ch chan<- []*pandora_orcclient.BlockStatus) event.Subscription {
return bc.confirmedBlockHashes.Subscribe(ch)
}

// empty returns an indicator whether the blockchain is empty.
// Note, it's a special case that we connect a non-empty ancient
// database with an empty node, so that we can plugin the ancient
Expand Down Expand Up @@ -1509,6 +1653,42 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.wg.Add(1)
defer bc.wg.Done()

// verification is done. Now if pandora mode is running then push it into the queue
// After that, wait for response of the orchestrator client.
if bc.isPandora() {
log.Debug("writeBlockWithState", "sending header with header hash", block.Header().Hash())
bc.pendingHeaderContainer.WriteAndNotifyHeader(block.Header())

retryLimit := orchestratorConfirmationRetrievalLimit
status := pandora_orcclient.Status(0)
meta-bot marked this conversation as resolved.
Show resolved Hide resolved
for retryLimit > 0 && status == 0 {
// halt and get orchestrator confirmation
log.Debug("waiting to get block confirmation", "fetching...", retryLimit)
confirmedBlocks := <-bc.blockConfirmationCh
for _, confirmedBlock := range confirmedBlocks {
if confirmedBlock.Hash == block.Hash() {
status = confirmedBlock.Status
log.Debug("found confirmation", "confirmed block status", status)
}
if status != 0 {
// if status is invalid or correct then break the loop
break
}
}
retryLimit--
}
log.Debug("deleting from pending container", "header hash", block.Hash())
// remove the header from the pending queue
bc.GetPendingHeaderContainer().DeleteHeader(block.Header())
// if status is pending or invalid then just continue default work
if status == pandora_orcclient.Status(0) || status == pandora_orcclient.Status(2) {
log.Warn("failed to write block into the chain. block hash %v", block.Hash())
return CanonStatTy, consensus.ErrInvalidNumber
}
// if status is approved then write block in canonical chain.
log.Debug("block %v is verified. so continuing existing parts", block.Header().Hash())
}

// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
Expand Down Expand Up @@ -1735,13 +1915,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)

//save chain in the in-memory database
bc.pendingHeaderContainer.WriteHeaderBatch(headers)

// send chain to the subscribed orchestrator.
bc.pendingHeaderContainer.pndHeaderFeed.Send(PendingHeaderEvent{Headers: headers})
// TODO: in future we will halt execution here to get confirmation from orchestrator.

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.validator)

Expand Down
11 changes: 8 additions & 3 deletions core/pending_header_processor_pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (container *PandoraPendingHeaderContainer) WriteHeader(header *types.Header
rawdb.WriteHeadHeaderHash(container.headerContainer, header.Hash())
}

// DeleteHeader deletes a single header from the container
func (container *PandoraPendingHeaderContainer) DeleteHeader(header *types.Header) {
rawdb.DeleteHeader(container.headerContainer, header.Hash(), header.Number.Uint64())
}

// ReadHeaderSince will receive a from header hash and return a batch of headers from that header.
func (container *PandoraPendingHeaderContainer) ReadHeaderSince(from common.Hash) []*types.Header {
fromHeaderNumber := rawdb.ReadHeaderNumber(container.headerContainer, from)
Expand All @@ -66,7 +71,7 @@ func (container *PandoraPendingHeaderContainer) ReadHeaderSince(from common.Hash
// 1. When requesting for empty hash. That is when orchestrator bootup it sends empty hash to the pandora. It is not present in the memory container
// 2. When orchestrator requesting a from hash, which is already confirmed and removed from the memory container.
// In both cases we are sending all available headers to the subscriber.
return container.readAllHeaders()
return container.ReadAllHeaders()
}

if lastHeaderNumber == nil {
Expand Down Expand Up @@ -94,8 +99,8 @@ func (container *PandoraPendingHeaderContainer) readHeader(headerNumber uint64)
return rawdb.ReadHeader(container.headerContainer, hashes[0], headerNumber)
}

// readAllHeaders reads all the headers from the memory
func (container *PandoraPendingHeaderContainer) readAllHeaders() []*types.Header {
// ReadAllHeaders reads all the headers from the memory
func (container *PandoraPendingHeaderContainer) ReadAllHeaders() []*types.Header {

// first retrieve the hashes of the headers
it := container.headerContainer.NewIterator([]byte("h"), nil)
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
OrcClientEndpoint: config.Miner.Notify,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit)
Expand Down
Loading