Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,8 +1543,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

if status == CanonStatTy {
logs := bc.collectLogs(block, false)
//bc.chainFeed.Send(ChainEvent{Header: block.Header(), Hash: block.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
bc.chainFeed.Send(ChainEvent{
Header: block.Header(),
Receipts: receipts,
Transactions: block.Transactions(),
})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand Down Expand Up @@ -2102,6 +2105,13 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// the processing of the block that corresponds with the given hash.
// These logs are later announced as deleted or reborn.
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
_, logs := bc.collectReceiptsAndLogs(b, removed)
return logs
}

// collectReceiptsAndLogs retrieves receipts from the database and returns both receipts and logs.
// This avoids duplicate database reads when both are needed.
func (bc *BlockChain) collectReceiptsAndLogs(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) {
receipts := rawdb.ReadRawReceipts(bc.db, b.Hash(), b.NumberU64())
if err := receipts.DeriveFields(bc.chainConfig, b.Hash(), b.NumberU64(), b.Time(), b.Transactions()); err != nil {
log.Error("Failed to derive block receipts fields", "hash", b.Hash(), "number", b.NumberU64(), "err", err)
Expand All @@ -2116,7 +2126,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
logs = append(logs, log)
}
}
return logs
return receipts, logs
}

// reorg takes two blocks, an old chain and a new chain and will reconstruct the
Expand Down
6 changes: 3 additions & 3 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type NewMinedBlockEvent struct{ Block *types.Block }
type RemovedLogsEvent struct{ Logs []*types.Log }

type ChainEvent struct {
Header *types.Header
//Hash common.Hash
//Logs []*types.Log
Header *types.Header
Receipts []*types.Receipt
Transactions []*types.Transaction
}

type ChainSideEvent struct {
Expand Down
82 changes: 76 additions & 6 deletions ctxc/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/hexutil"
"github.com/CortexFoundation/CortexTheseus/core/types"
"github.com/CortexFoundation/CortexTheseus/internal/ctxcapi"
"github.com/CortexFoundation/CortexTheseus/rpc"
)

Expand All @@ -38,15 +39,20 @@ var (
errInvalidBlockRange = errors.New("invalid block range params")
errUnknownBlock = errors.New("unknown block")
errBlockHashWithRange = errors.New("can't specify fromBlock/toBlock with blockHash")
errExceedMaxTopics = errors.New("exceed max topics")
errPendingLogsUnsupported = errors.New("pending logs are not supported")
errExceedMaxTopics = errors.New("exceed max topics")
errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position")
errExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription")
)

// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
const maxTopics = 4

// The maximum number of allowed topics within a topic criteria
const maxSubTopics = 1000
const (
// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
maxTopics = 4
// The maximum number of allowed topics within a topic criteria
maxSubTopics = 1000
// The maximum number of transaction hash criteria allowed in a single subscription
maxTxHashes = 200
)

// filter is a helper struct that holds meta information over the filter type
// and associated subscription in the event system.
Expand Down Expand Up @@ -280,6 +286,70 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
return rpcSub, nil
}

// TransactionReceiptsFilter defines criteria for transaction receipts subscription.
// If TransactionHashes is nil or empty, receipts for all transactions included in new blocks will be delivered.
// Otherwise, only receipts for the specified transactions will be delivered.
type TransactionReceiptsFilter struct {
TransactionHashes []common.Hash `json:"transactionHashes,omitempty"`
}

// TransactionReceipts creates a subscription that fires transaction receipts when transactions are included in blocks.
func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *TransactionReceiptsFilter) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

// Validate transaction hashes limit
if filter != nil && len(filter.TransactionHashes) > maxTxHashes {
return nil, errExceedMaxTxHashes
}

var (
rpcSub = notifier.CreateSubscription()
matchedReceipts = make(chan []*ReceiptWithTx)
txHashes []common.Hash
)

if filter != nil {
txHashes = filter.TransactionHashes
}

receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, matchedReceipts)

go func() {
defer receiptsSub.Unsubscribe()

signer := types.LatestSigner(api.sys.backend.ChainConfig())

for {
select {
case receiptsWithTxs := <-matchedReceipts:
if len(receiptsWithTxs) > 0 {
marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTxs))
for i, receiptWithTx := range receiptsWithTxs {
marshaledReceipts[i] = ctxcapi.MarshalReceipt(
receiptWithTx.Receipt,
receiptWithTx.Receipt.BlockHash,
receiptWithTx.Receipt.BlockNumber.Uint64(),
signer,
receiptWithTx.Transaction,
int(receiptWithTx.Receipt.TransactionIndex),
)
}

// Send a batch of tx receipts in one notification
notifier.Notify(rpcSub.ID, marshaledReceipts)
}
case <-rpcSub.Err():
return
}
}
}()

return rpcSub, nil
}

// FilterCriteria represents a request to create a new filter.
// Same as cortex.FilterQuery but with UnmarshalJSON() method.
type FilterCriteria cortex.FilterQuery
Expand Down
69 changes: 69 additions & 0 deletions ctxc/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"slices"

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/core"
"github.com/CortexFoundation/CortexTheseus/core/bloombits"
"github.com/CortexFoundation/CortexTheseus/core/types"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/CortexTheseus/rpc"
)

Expand Down Expand Up @@ -398,3 +400,70 @@ func flatten(list [][]*types.Log) []*types.Log {
}
return flat
}

// ReceiptWithTx contains a receipt and its corresponding transaction
type ReceiptWithTx struct {
Receipt *types.Receipt
Transaction *types.Transaction
}

// filterReceipts returns the receipts matching the given criteria
// In addition to returning receipts, it also returns the corresponding transactions.
// This is because receipts only contain low-level data, while user-facing data
// may require additional information from the Transaction.
func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
var ret []*ReceiptWithTx

receipts := ev.Receipts
txs := ev.Transactions

if len(receipts) != len(txs) {
log.Warn("Receipts and transactions length mismatch", "receipts", len(receipts), "transactions", len(txs))
return ret
}

if len(txHashes) == 0 {
// No filter, send all receipts with their transactions.
ret = make([]*ReceiptWithTx, len(receipts))
for i, receipt := range receipts {
ret[i] = &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
}
}
} else if len(txHashes) == 1 {
// Filter by single transaction hash.
// This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization.
for i, receipt := range receipts {
if receipt.TxHash == txHashes[0] {
ret = append(ret, &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
})
break
}
}
} else {
// Filter by multiple transaction hashes.
txHashMap := make(map[common.Hash]bool, len(txHashes))
for _, hash := range txHashes {
txHashMap[hash] = true
}

for i, receipt := range receipts {
if txHashMap[receipt.TxHash] {
ret = append(ret, &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
})

// Early exit if all receipts are found
if len(ret) == len(txHashes) {
break
}
}
}
}

return ret
}
39 changes: 39 additions & 0 deletions ctxc/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/CortexFoundation/CortexTheseus/ctxcdb"
"github.com/CortexFoundation/CortexTheseus/event"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/CortexTheseus/params"
"github.com/CortexFoundation/CortexTheseus/rpc"
)

Expand Down Expand Up @@ -62,6 +63,8 @@ type Backend interface {
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)

CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
Expand Down Expand Up @@ -128,6 +131,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// TransactionReceiptsSubscription queries for transaction receipts when transactions are included in blocks
TransactionReceiptsSubscription
// LastIndexSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -150,8 +155,11 @@ type subscription struct {
created time.Time
logsCrit cortex.FilterQuery
logs chan []*types.Log
txs chan []*types.Transaction
hashes chan []common.Hash
headers chan *types.Header
receipts chan []*ReceiptWithTx
txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
Expand Down Expand Up @@ -245,6 +253,7 @@ func (sub *Subscription) Unsubscribe() {
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.headers:
case <-sub.f.receipts:
}
}

Expand Down Expand Up @@ -315,6 +324,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit cortex.FilterQuery, logs c
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
Expand All @@ -332,6 +342,7 @@ func (es *EventSystem) subscribeLogs(crit cortex.FilterQuery, logs chan []*types
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
Expand Down Expand Up @@ -365,6 +376,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
Expand All @@ -381,6 +393,26 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeTransactionReceipts creates a subscription that writes transaction receipts for
// transactions when they are included in blocks. If txHashes is provided, only receipts
// for those specific transaction hashes will be delivered.
func (es *EventSystem) SubscribeTransactionReceipts(txHashes []common.Hash, receipts chan []*ReceiptWithTx) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: TransactionReceiptsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
receipts: receipts,
txHashes: txHashes,
installed: make(chan struct{}),
err: make(chan error),
}
Expand Down Expand Up @@ -436,6 +468,13 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
}
})
}
// Handle transaction receipts subscriptions when a new block is added
for _, f := range filters[TransactionReceiptsSubscription] {
matchedReceipts := filterReceipts(f.txHashes, ev)
if len(matchedReceipts) > 0 {
f.receipts <- matchedReceipts
}
}
}

func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
Expand Down
4 changes: 4 additions & 0 deletions ctxc/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (b *testBackend) ChainDb() ctxcdb.Database {
return b.db
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
return nil
}

func (b *testBackend) CurrentHeader() *types.Header {
hdr, _ := b.HeaderByNumber(context.TODO(), rpc.LatestBlockNumber)
return hdr
Expand Down
6 changes: 3 additions & 3 deletions internal/ctxcapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func (api *PublicBlockChainAPI) GetBlockReceipts(ctx context.Context, blockNrOrH

result := make([]map[string]interface{}, len(receipts))
for i, receipt := range receipts {
result[i] = marshalReceipt(receipt, block.Hash(), block.NumberU64(), signer, txs[i], i)
result[i] = MarshalReceipt(receipt, block.Hash(), block.NumberU64(), signer, txs[i], i)
}

return result, nil
Expand Down Expand Up @@ -1919,8 +1919,8 @@ func checkTxFee(gasPrice *big.Int, gas uint64, cap float64) error {
return nil
}

// marshalReceipt marshals a transaction receipt into a JSON object.
func marshalReceipt(receipt *types.Receipt, blockHash common.Hash, blockNumber uint64, signer types.Signer, tx *types.Transaction, txIndex int) map[string]interface{} {
// MarshalReceipt marshals a transaction receipt into a JSON object.
func MarshalReceipt(receipt *types.Receipt, blockHash common.Hash, blockNumber uint64, signer types.Signer, tx *types.Transaction, txIndex int) map[string]interface{} {
from, _ := types.Sender(signer, tx)

fields := map[string]interface{}{
Expand Down