Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
peekpi committed Apr 12, 2019
1 parent 96fa1f5 commit fbc0449
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
8 changes: 4 additions & 4 deletions event/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ var printLock sync.Mutex

func Println(args ...interface{}) {
//return
printLock.Lock()
// printLock.Lock()
fmt.Printf("%d:", time.Now().Unix())
fmt.Println(args...)
printLock.Unlock()
// printLock.Unlock()
}

func Printf(sfmt string, args ...interface{}) {
//return
printLock.Lock()
// printLock.Lock()
fmt.Printf("%d:", time.Now().Unix())
fmt.Printf(sfmt, args...)
printLock.Unlock()
// printLock.Unlock()
}

// ProtoAdaptor used to send out event
Expand Down
53 changes: 50 additions & 3 deletions txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,61 @@ package txpool
import (
"time"

mapset "github.com/deckarep/golang-set"
"github.com/fractalplatform/fractal/common"
router "github.com/fractalplatform/fractal/event"
"github.com/fractalplatform/fractal/types"
)

const (
maxKonwnTxs = 1024
maxKonwnTxs = 32
txsSendDelay = 50 * time.Millisecond
txsSendThreshold = 32
)

type hashQueue struct {
write int
elements [maxKonwnTxs]common.Hash
}

func (q *hashQueue) add(hash common.Hash) {
q.elements[q.write] = hash
q.write++
q.write %= len(q.elements)
}

func (q *hashQueue) has(hash common.Hash) bool {
write := q.write
for {
if q.elements[write] == hash {
return true
}
write--
if write < 0 {
write = len(q.elements) - 1
}
if write == q.write {
break
}
}
return false
}

type peerInfo struct {
knownTxs hashQueue
peer router.Station
}

func (p *peerInfo) addTxs(txs []*types.Transaction) {
for _, tx := range txs {
p.knownTxs.add(tx.Hash())
}
}

func (p *peerInfo) hadTxs(tx *types.Transaction) bool {
return p.knownTxs.has(tx.Hash())
}

/*
type peerInfo struct {
knownTxs mapset.Set
peer router.Station
Expand All @@ -47,6 +91,7 @@ func (p *peerInfo) addTxs(txs []*types.Transaction) {
func (p *peerInfo) hadTxs(tx *types.Transaction) bool {
return p.knownTxs.Contains(tx.Hash())
}
*/

type TxpoolStation struct {
station router.Station
Expand Down Expand Up @@ -84,6 +129,7 @@ func (s *TxpoolStation) broadcast(txs []*types.Transaction) {
break
}
}
router.Println("broadcast:", sendCount)
}

/*
Expand Down Expand Up @@ -116,7 +162,8 @@ func (s *TxpoolStation) handleMsg() {
peerInfo.addTxs(txs)
s.txpool.AddRemotes(txs)
case router.NewPeerPassedNotify:
s.peers[e.From.Name()] = &peerInfo{knownTxs: mapset.NewSet(), peer: e.From}
//s.peers[e.From.Name()] = &peerInfo{knownTxs: mapset.NewSet(), peer: e.From}
s.peers[e.From.Name()] = &peerInfo{peer: e.From}
go s.syncTransactions(e)
case router.DelPeerNotify:
delete(s.peers, e.From.Name())
Expand Down

0 comments on commit fbc0449

Please sign in to comment.