Skip to content

Commit

Permalink
Merge pull request #13 from btccom/master-zmq
Browse files Browse the repository at this point in the history
Master zmq
  • Loading branch information
duguyifang committed Apr 15, 2019
2 parents 2846b78 + 3dc6733 commit 77502e4
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 18 deletions.
122 changes: 119 additions & 3 deletions mergedMiningProxy/AuxJobMaker.go
Expand Up @@ -11,6 +11,8 @@ import (

"merkle-tree-and-bitcoin/hash"
"merkle-tree-and-bitcoin/merkle"

zmq "github.com/pebbe/zmq4"
)

// AuxPowInfo 辅助工作量证明的信息
Expand Down Expand Up @@ -54,6 +56,7 @@ type AuxJobMaker struct {

minJobBits string
maxJobTarget hash.Byte32
blockHashChnel chan string
}

// NewAuxJobMaker 创建辅助挖矿任务构造器
Expand All @@ -73,6 +76,7 @@ func NewAuxJobMaker(config AuxJobMakerInfo, chains []ChainRPCInfo) (maker *AuxJo
maker.maxJobTarget.Assign(hexBytes)
maker.minJobBits, _ = TargetToBits(maker.maxJobTarget.Hex())
glog.Info("Max Job Target: ", maker.maxJobTarget.Hex(), ", Bits: ", maker.minJobBits)
maker.blockHashChnel = make(chan string)

return
}
Expand Down Expand Up @@ -137,7 +141,16 @@ func (maker *AuxJobMaker) updateAuxBlock(index int) {
maker.chainIDIndexSlots = nil
}

oldAuxBlockInfo := maker.currentAuxBlocks[index];

maker.currentAuxBlocks[index] = auxBlockInfo

if auxBlockInfo.Height > oldAuxBlockInfo.Height {
// glog.Info("send blockhash : ", auxBlockInfo.Hash.Hex())
auxBlockInfo.Hash = auxBlockInfo.Hash.Reverse()
maker.blockHashChnel <- auxBlockInfo.Hash.Hex()
auxBlockInfo.Hash = auxBlockInfo.Hash.Reverse()
}
maker.lock.Unlock()

if glog.V(3) {
Expand All @@ -148,13 +161,116 @@ func (maker *AuxJobMaker) updateAuxBlock(index int) {

// updateAuxBlockAllChains 持续更新所有链的辅助区块
func (maker *AuxJobMaker) updateAuxBlockAllChains() {


go func () {
txHashChnel := make(chan string)
defer close(txHashChnel)
notifyPublisher, err := zmq.NewSocket(zmq.PUB)
defer notifyPublisher.Close()
if err != nil {
glog.Info(" create notifyPublisher handle failed!", err)
return
}
address := "tcp://*:" + maker.config.BlockHashPublishPort
glog.Info("notifyPublisher address : ", address)
err = notifyPublisher.Bind(address)
if err != nil {
glog.Info(" bind notifyPublisher handle failed!", err)
return
}

go func (out chan<- string) {
for {
time.Sleep(time.Duration(maker.config.CreateAuxBlockIntervalSeconds) * time.Second)
out <- "connect ok!"
}
}(txHashChnel)

for {
select {
case txhashmsg := <- txHashChnel:
notifyPublisher.Send("hashtx", zmq.SNDMORE)
notifyPublisher.Send(txhashmsg, 0)
case blockhashmsg := <- maker.blockHashChnel:
hashByte, _ := hex.DecodeString(blockhashmsg)
notifyPublisher.Send("hashblock", zmq.SNDMORE)
notifyPublisher.SendBytes(hashByte, 0)
}
}
}()

for i := 0; i < len(maker.chains); i++ {
go func(index int) {
zmqsignalchanel := make(chan string)
timeoutchanel := make(chan string)
go func(out chan<- string) {
chainsupportzmq := maker.chains[index].IsSupportZmq
subscriber, _ := zmq.NewSocket(zmq.SUB)
connected := true
defer subscriber.Close()
if chainsupportzmq {
ip := maker.chains[index].SubBlockHashAddress
port := maker.chains[index].SubBlockHashPort
address := "tcp://" + ip + ":"+ port
glog.Info("address : ",address)
err := subscriber.Connect(address)
if err != nil {
glog.Info("[error] ", maker.chains[index].Name, " cannot connect to : ", address)
connected = false
}
glog.Info("[OK] ", maker.chains[index].Name, " connected to : ", address)
subscriber.SetSubscribe("hashblock")
}
if chainsupportzmq && connected {
for {
msgtype, err := subscriber.Recv(0)
if err != nil {
glog.Info("[error] when ", maker.chains[index].Name, " recv type msg ", msgtype)
continue
}
//glog.Info("[OK] ", maker.chains[index].Name, " receive msgtype : ", msgtype)
content, e := subscriber.Recv(0)
if e != nil {
glog.Info("[error] when ", maker.chains[index].Name, " recv content msg ", content)
continue
}
//glog.Info("[OK] ", maker.chains[index].Name, " receive first msgcontent : ", content)

content, e = subscriber.Recv(0)
if e != nil {
glog.Info("[error] when ", maker.chains[index].Name, " recv content msg ", content)
continue
}
//glog.Info("[OK] ", maker.chains[index].Name, " receive second msgcontent : ", content)

if msgtype != "hashblock" {
glog.Info("[ERROR] ", maker.chains[index].Name, " receive msgcontent : ", msgtype, "is not hashblock")
continue
}

out <- "ok"
}
}
}(zmqsignalchanel)
go func(out chan<- string) {
for {
time.Sleep(time.Duration(maker.config.CreateAuxBlockIntervalSeconds) * time.Second)
out <- "ok"
}
}(timeoutchanel)

for {
maker.updateAuxBlock(index)
time.Sleep(time.Duration(maker.config.CreateAuxBlockIntervalSeconds) * time.Second)
select {
case <- zmqsignalchanel:
//glog.Info("[ok] recv msg from zmq chanel ---> ")
maker.updateAuxBlock(index)
case <- timeoutchanel:
//glog.Info("[ok] recv msg from timeout chanel ")
maker.updateAuxBlock(index)
}
}
}(i)
}(i)
}
}

Expand Down
5 changes: 5 additions & 0 deletions mergedMiningProxy/ConfigData.go
Expand Up @@ -60,6 +60,10 @@ type ChainRPCInfo struct {
RPCServer ChainRPCServer
CreateAuxBlock RPCCreateAuxBlockInfo
SubmitAuxBlock RPCSubmitAuxBlockInfo
SubBlockHashAddress string
SubBlockHashPort string
IsSupportZmq bool

}

// ProxyRPCServer 该代理的RPC服务器信息
Expand All @@ -76,6 +80,7 @@ type AuxJobMakerInfo struct {
CreateAuxBlockIntervalSeconds uint
AuxPowJobListSize uint
MaxJobTarget string
BlockHashPublishPort string
}

// ConfigData 配置文件的数据结构
Expand Down
1 change: 0 additions & 1 deletion mergedMiningProxy/Main.go
Expand Up @@ -22,7 +22,6 @@ func main() {
// 运行任务生成器
auxJobMaker := NewAuxJobMaker(configData.AuxJobMaker, configData.Chains)
auxJobMaker.Run()

// 启动 RPC Server
runHTTPServer(configData.RPCServer, auxJobMaker)
}
32 changes: 19 additions & 13 deletions mergedMiningProxy/ProxyRPC.go
Expand Up @@ -249,23 +249,29 @@ func (handle *ProxyRPCHandle) submitAuxBlock(params []interface{}, response *RPC
}
}

response, err := RPCCall(chain.RPCServer, chain.SubmitAuxBlock.Method, params)
responseJSON, _ := RPCCall(chain.RPCServer, chain.SubmitAuxBlock.Method, params)

{
response, err := ParseRPCResponse(responseJSON)
var submitauxblockinfo SubmitAuxBlockInfo
submitauxblockinfo.AuxBlockTableName = handle.auxJobMaker.chains[index].AuxTableName
if handle.config.MainChain == "LTC" {
submitauxblockinfo.ParentChainBllockHash = HexToString(ArrayReverse(DoubleSHA256(auxPowData.parentBlock)))
if response.Error != nil {
glog.Warning("RPC result : " + string(responseJSON))
} else {
submitauxblockinfo.ParentChainBllockHash = auxPowData.blockHash.Hex()
}

submitauxblockinfo.AuxChainBlockHash = extAuxPow.Hash.Hex()
submitauxblockinfo.AuxPow = auxPowHex
submitauxblockinfo.CurrentTime = time.Now().Format("2006-01-02 15:04:05")
submitauxblockinfo.AuxBlockTableName = handle.auxJobMaker.chains[index].AuxTableName
if handle.config.MainChain == "LTC" {
submitauxblockinfo.ParentChainBllockHash = HexToString(ArrayReverse(DoubleSHA256(auxPowData.parentBlock)))
} else {
submitauxblockinfo.ParentChainBllockHash = auxPowData.blockHash.Hex()
}

submitauxblockinfo.AuxChainBlockHash = extAuxPow.Hash.Hex()
submitauxblockinfo.AuxPow = auxPowHex
submitauxblockinfo.CurrentTime = time.Now().Format("2006-01-02 15:04:05")

if ok = handle.dbhandle.InsertAuxBlock(submitauxblockinfo); !ok {
glog.Warning("Insert AuxBlock to db failed!")
if ok = handle.dbhandle.InsertAuxBlock(submitauxblockinfo); !ok {
glog.Warning("Insert AuxBlock to db failed!")
}
}

glog.Info(
Expand All @@ -274,7 +280,7 @@ func (handle *ProxyRPCHandle) submitAuxBlock(params []interface{}, response *RPC
", hash: ", extAuxPow.Hash.Hex(),
", parentBlockHash: ", submitauxblockinfo.ParentChainBllockHash,
", target: ", extAuxPow.Target.Hex(),
", response: ", string(response),
", response: ", string(responseJSON),
", errmsg: ", err)
}

Expand All @@ -295,8 +301,8 @@ func (handle *ProxyRPCHandle) submitAuxBlock(params []interface{}, response *RPC
}

func runHTTPServer(config ProxyRPCServer, auxJobMaker *AuxJobMaker) {
handle := NewProxyRPCHandle(config, auxJobMaker)

handle := NewProxyRPCHandle(config, auxJobMaker)
// HTTP监听
glog.Info("Listen HTTP ", config.ListenAddr)
err := http.ListenAndServe(config.ListenAddr, handle)
Expand Down
6 changes: 5 additions & 1 deletion mergedMiningProxy/config.default.json
Expand Up @@ -15,12 +15,16 @@
"AuxJobMaker": {
"CreateAuxBlockIntervalSeconds": 5,
"AuxPowJobListSize": 1000,
"MaxJobTarget": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
"MaxJobTarget": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
"BlockHashPublishPort": "5555",
},
"Chains": [
{
"Name": "Namecoin",
"AuxTableName" :"found_nmc_blocks",
"IsSupportZmq" : true,
"SubBlockHashAddress" : "127.0.0.1",
"SubBlockHashPort" :"1234",
"RPCServer":{
"URL": "http://127.0.0.1:8444/",
"User": "test",
Expand Down

0 comments on commit 77502e4

Please sign in to comment.