-
Notifications
You must be signed in to change notification settings - Fork 5
/
pusu.go
132 lines (100 loc) Β· 3.72 KB
/
pusu.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package pusu
import (
"context"
"encoding/json"
"strings"
"github.com/benleb/gloomberg/internal"
"github.com/benleb/gloomberg/internal/gbl"
"github.com/benleb/gloomberg/internal/nemo/gloomberg"
"github.com/benleb/gloomberg/internal/nemo/osmodels"
"github.com/benleb/gloomberg/internal/nemo/totra"
"github.com/benleb/gloomberg/internal/trapri"
"github.com/ethereum/go-ethereum/common"
"github.com/go-redis/redis/v8"
"github.com/spf13/viper"
)
func SubscribeToSales(gb *gloomberg.Gloomberg, channel string, queueTokenTransactions chan *totra.TokenTransaction) {
pubsub := gb.Rdb.Subscribe(context.Background(), channel)
ch := pubsub.Channel(redis.WithChannelSize(1024))
for msg := range ch {
gbl.Log.Infof("π subscribe channel %s (%d)", msg.Channel, len(ch))
// validate json
if !json.Valid([]byte(msg.Payload)) {
gbl.Log.Warnf("βοΈ invalid json: %s", msg.Payload)
continue
}
// create the event transaction
var ttx totra.TokenTransaction
// unmarshal event transaction from json
err := json.Unmarshal([]byte(msg.Payload), &ttx)
if err != nil {
gbl.Log.Warnf("βοΈ error unmarshalling event Tx: %+v | %s", msg.Payload, err)
continue
}
queueTokenTransactions <- &ttx
}
}
// SubscribeToListings subscribes to all collections for which we have a slug
func SubscribeToListings(gb *gloomberg.Gloomberg, queueTokenTransactions chan *totra.TokenTransaction) {
slugAddresses := gb.CollectionDB.OpenseaSlugAddresses()
if len(slugAddresses) == 0 {
gbl.Log.Warn("β no slugs to send to gloomberg server")
return
}
// create a list of channels to subscribe to
channels := make([]string, 0)
for _, collectionAddress := range slugAddresses {
channelPattern := internal.TopicSeaWatcher + "/" + collectionAddress.String() + "/*"
channels = append(channels, channelPattern)
}
pubsub := gb.Rdb.PSubscribe(context.Background(), channels...)
ch := pubsub.Channel(redis.WithChannelSize(1024))
for i := 0; i < viper.GetInt("server.pubsub.listings"); i++ {
go func(i int) {
gbl.Log.Infof("π starting pusu listings receiver #%d | subscriptions: %d", i, len(channels))
for msg := range ch {
gbl.Log.Debugf("π received msg on channel %s (%d): %s", msg.Channel, len(ch), msg.Payload)
var itemListedEvent osmodels.ItemListedEvent
// validate json
if !json.Valid([]byte(msg.Payload)) {
gbl.Log.Warnf("βοΈ invalid json: %s", msg.Payload)
continue
}
// unmarshal
if err := json.Unmarshal([]byte(msg.Payload), &itemListedEvent); err != nil {
gbl.Log.Errorf("β error json.Unmarshal: %+v\n", err.Error())
continue
}
// nftID is a string in the format <chain>/<contract>/<tokenID>
nftID := strings.Split(itemListedEvent.Payload.Item.NftID, "/")
if len(nftID) != 3 {
gbl.Log.Warnf("π€·ββοΈ error parsing nftID: %s | %+v", itemListedEvent.Payload.Item.NftID, nftID)
continue
}
//
// discard listings for ignored collections
if collection, ok := gb.CollectionDB.Collections[common.HexToAddress(nftID[1])]; ok && collection.IgnorePrinting {
gbl.Log.Debugf("ποΈ ignoring printing for collection %s", collection.Name)
continue
}
// print
trapri.FormatListing(gb, &itemListedEvent, queueTokenTransactions)
}
}(i)
}
}
func Publish(gb *gloomberg.Gloomberg, channel string, event any) {
// marshal event to json
marshalledEvent, err := json.Marshal(event)
if err != nil {
gbl.Log.Warnf("βοΈ error marshalling event: %s", err)
return
}
// publish event to redis pubsub
err = gb.Rdb.Publish(context.Background(), channel, marshalledEvent).Err()
if err != nil {
gbl.Log.Warnf("βοΈ error publishing event to redis: %s", err)
} else {
gbl.Log.Debug("published event to redis")
}
}