This repository has been archived by the owner on Aug 21, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 56
/
blockhash.go
94 lines (85 loc) · 2.42 KB
/
blockhash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package blockhash
import (
"encoding/json"
"github.com/notegio/openrelay/channels"
"github.com/notegio/openrelay/monitor/blocks"
"gopkg.in/redis.v3"
"log"
"fmt"
)
// BlockHash will get the latest block hash from the ethereum blockchain
type BlockHash interface {
Get() string
}
// ChanneledBlockHashConsumer listens to a consumerChannel for block hashes,
// and sends them over provided channel
type ChanneledBlockHashConsumer struct {
channel chan string
}
// Consume processes blockhashes as they arrive from the provided consumer
// channel
func (rbhc *ChanneledBlockHashConsumer) Consume(delivery channels.Delivery) {
block := &blocks.MiniBlock{}
payload := []byte(delivery.Payload())
err := json.Unmarshal(payload, block)
if err != nil {
log.Printf("Error Parsing Payload: %v - '%v'", err.Error(), string(payload))
delivery.Reject()
return
}
rbhc.channel <- fmt.Sprintf("%#x", block.Hash[:])
delivery.Ack()
}
// ChanneledBlockHash is a BlockHash implementation that gets the latest
// block hash by watching a ConsumerChannel
type ChanneledBlockHash struct {
channel channels.ConsumerChannel
sourceChan chan string
sinkChan chan chan string
started bool
}
// Start kicks off a go routine to listen for changes to the blockhash
func (rbh *ChanneledBlockHash) Start() {
rbh.channel.AddConsumer(&ChanneledBlockHashConsumer{rbh.sourceChan})
rbh.channel.StartConsuming()
go func() {
// TODO: Make this a random value
currentHash := "initializing"
for {
select {
case msg := <-rbh.sourceChan:
currentHash = msg
case channel := <-rbh.sinkChan:
channel <- currentHash
}
}
}()
rbh.started = true
}
// Get retrieves the blockhash from the monitoring go routine
func (rbh *ChanneledBlockHash) Get() string {
if !rbh.started {
rbh.Start()
}
channel := make(chan string)
rbh.sinkChan <- channel
return <-channel
}
// NewChanneledBlockHash returns a BlockHash given a ConsumerChannel
func NewChanneledBlockHash(channel channels.ConsumerChannel) BlockHash {
return &ChanneledBlockHash{
channel,
make(chan string),
make(chan chan string),
false,
}
}
// NewRedisBlockHash constructs a ConsumerChannel from a channelURI and a
// redisClient
func NewRedisBlockHash(channelURI string, redisClient *redis.Client) (BlockHash, error) {
consumerChannel, err := channels.ConsumerFromURI(channelURI, redisClient)
if err != nil {
return nil, err
}
return NewChanneledBlockHash(consumerChannel), nil
}