-
Notifications
You must be signed in to change notification settings - Fork 13
/
subscriber.go
127 lines (103 loc) · 2.86 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// SPDX-License-Identifier: ISC
// Copyright (c) 2014-2019 Bitmark Inc.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"encoding/json"
"fmt"
zmq "github.com/pebbe/zmq4"
"github.com/bitmark-inc/bitmarkd/blockrecord"
"github.com/bitmark-inc/logger"
)
// sent by bitmarkd
// ***** FIX THIS: need to refactor
type PublishedItem struct {
Job string
Header blockrecord.Header
}
// subscriber thread
func Subscribe(
i int,
connectTo string,
v6 bool,
serverPublicKey []byte,
publicKey []byte,
privateKey []byte,
log *logger.L,
proofer Proofer,
) error {
log.Info("starting…")
socket, err := zmq.NewSocket(zmq.SUB)
if nil != err {
return err
}
log.Infof("connect to: %q", connectTo)
socket.SetCurveServer(0)
socket.SetCurvePublickey(string(publicKey))
socket.SetCurveSecretkey(string(privateKey))
socket.SetCurveServerkey(string(serverPublicKey))
socket.SetIdentity(string(publicKey)) // just use public key for identity
// basic socket options
socket.SetIpv6(v6)
// keep-alive settings
socket.SetTcpKeepalive(1)
socket.SetTcpKeepaliveCnt(5)
socket.SetTcpKeepaliveIdle(60)
socket.SetTcpKeepaliveIntvl(60)
// ***** FIX THIS: enabling this causes complete failure
// ***** FIX THIS: socket disconnects, perhaps after IVL value
// heartbeat
// socket.SetHeartbeatIvl(heartbeatInterval)
// socket.SetHeartbeatTimeout(heartbeatTimeout)
// socket.SetHeartbeatTtl(heartbeatTTL)
// set subscription prefix - empty => receive everything
socket.SetSubscribe("")
socket.Connect(connectTo)
if nil != err {
socket.Close()
}
// to submit hashing requests
proof, err := zmq.NewSocket(zmq.PUSH)
if nil != err {
socket.Close()
return err
}
identity := fmt.Sprintf("subscriber-%d", i)
mySubmitterIdentity := fmt.Sprintf("submitter-%d", i) // ***** FIX THIS: sync up with submitter so names match *****
proof.SetLinger(0)
proof.SetIdentity(identity)
err = proof.Connect(proofRequest)
if nil != err {
socket.Close()
proof.Close()
}
// background process
go func() {
defer socket.Close()
defer proof.Close()
loop:
for {
data, err := socket.Recv(0)
logger.PanicIfError("subscriber", err)
log.Infof("received data: %s", data)
// prevent queuing outdated request
if !proofer.IsWorking() {
log.Infof("Rest time, discard request")
continue loop
}
// ***** FIX THIS: just debugging? or really split block into multiple nonce ranges
var item PublishedItem
json.Unmarshal([]byte(data), &item)
log.Infof("received : %v", item)
// initial try just forward block
_, err = proof.Send(mySubmitterIdentity, zmq.SNDMORE)
logger.PanicIfError("subscriber sending 1", err)
_, err = proof.Send(data, 0)
logger.PanicIfError("subscriber sending 2", err)
ProofQueueIncrement()
log.Infof("queue depth: %d", proofQueueDepth)
}
}()
return nil
}