Skip to content

Commit

Permalink
add mvp of a grpc model for remote usage of seawatcher and similar
Browse files Browse the repository at this point in the history
  • Loading branch information
benleb committed Jul 25, 2023
1 parent b27ab72 commit e565589
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 294 deletions.
71 changes: 60 additions & 11 deletions cmd/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math/big"
"net"
"strings"
"time"
Expand Down Expand Up @@ -378,12 +379,20 @@ func runGloomberg(_ *cobra.Command, _ []string) {

//
// subscribe to OpenSea API
if viper.GetBool("seawatcher.enabled") || viper.GetBool("listings.enabled") {
if viper.GetBool("seawatcher.local") || viper.GetBool("listings.enabled") {
go trapri.OpenseaEventsHandler(gb)

go gb.SendSlugsToServer()
}

if viper.GetBool("seawatcher.grpc.client.enabled") {
gb.Prf("starting grpc client...")

// go seawa.GetEvents()

go testGRPC()
}

//
// subscribe to redis pubsub channel to receive events from gloomberg central
if viper.GetBool("seawatcher.pubsub") || viper.GetBool("pubsub.listings.subscribe") {
Expand Down Expand Up @@ -469,11 +478,6 @@ func runGloomberg(_ *cobra.Command, _ []string) {
gb.Prf("wallet watcher started: %+v", wawa)
}()

if viper.IsSet("seawatcher.grpc.server") {
gb.Prf("starting grpc client...")
go testGRPC()
}

// loop forever
select {}
}
Expand Down Expand Up @@ -501,7 +505,7 @@ func testGRPC() {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

grpcAddress := fmt.Sprintf("%s:%d", viper.GetString("seawatcher.grpc.server"), viper.GetUint("seawatcher.grpc.port"))
grpcAddress := fmt.Sprintf("%s:%d", viper.GetString("seawatcher.grpc.client.host"), viper.GetUint("seawatcher.grpc.port"))

gb.Prf("connecting to gRPC %s...", style.BoldAlmostWhite(grpcAddress))

Expand All @@ -515,7 +519,7 @@ func testGRPC() {
gb.Prf("subscribing via grpc to: %s", style.BoldAlmostWhite(degendb.Listing.OpenseaEventName()))

subsriptionRequest := &seawatcher.SubscriptionRequest{EventTypes: []seawatcher.EventType{seawatcher.EventType_ITEM_LISTED}, Collections: gb.CollectionDB.OpenseaSlugs()} //nolint:nosnakecase
stream, err := client.GetEvents(context.Background(), subsriptionRequest)
stream, err := client.GetItemListedEvents(context.Background(), subsriptionRequest)
if err != nil {
log.Errorf("client.GetEvents failed: %v", err)

Expand All @@ -536,7 +540,49 @@ func testGRPC() {
time.Sleep(time.Second * 1)
}

gb.Prf("🐔 RECEIVED: %+v", event)
basePrice, ok := new(big.Int).SetString(event.Payload.BasePrice, 10)
if !ok {
log.Errorf("error parsing base price: %v", err)

continue
}

// transform event back to seawaModel.ItemListed
itemListedEvent := seawaModels.ItemListed{
EventType: strings.ToLower(event.EventType.String()),
SentAt: event.SentAt.AsTime(),
Payload: seawaModels.ItemListedPayload{
Item: seawaModels.Item{
NftID: *seawaModels.ParseNftID(event.Payload.Item.NftId),
Chain: seawaModels.Chain{Name: event.Payload.Item.Chain.Name},
Permalink: event.Payload.Item.Permalink,
Metadata: seawaModels.Metadata{
Name: event.Payload.Item.Metadata.Name,
ImageURL: event.Payload.Item.Metadata.ImageUrl,
AnimationURL: event.Payload.Item.Metadata.AnimationUrl,
MetadataURL: event.Payload.Item.Metadata.MetadataUrl,
},
},
IsPrivate: event.Payload.IsPrivate,
ListingDate: event.Payload.ListingDate.AsTime(),
EventPayload: seawaModels.EventPayload{
EventTimestamp: event.Payload.EventTimestamp.AsTime(),
BasePrice: basePrice,
Maker: seawaModels.Account{Address: common.HexToAddress(event.Payload.Maker.Address)},
Taker: seawaModels.Account{Address: common.HexToAddress(event.Payload.Taker.Address)},
Quantity: int(event.Payload.Quantity),
OrderHash: common.HexToHash(event.Payload.OrderHash),
ExpirationDate: event.Payload.ExpirationDate.AsTime(),
CollectionCriteria: seawaModels.CollectionCriteria{Slug: event.Payload.Collection.Slug},
PaymentToken: seawaModels.PaymentToken{Address: common.HexToAddress(event.Payload.PaymentToken.Address), Symbol: event.Payload.PaymentToken.Symbol, Decimals: int(event.Payload.PaymentToken.Decimals)},
},
},
}

gb.In.ItemListed <- &itemListedEvent

// gb.Prf("🐔 RECEIVED: %+v", event)
gb.Prf("🐔🐔🐔 RECEIVED: %+v", itemListedEvent)
}
}

Expand Down Expand Up @@ -615,15 +661,18 @@ func init() { //nolint:gochecknoinits
// worker settings
viper.SetDefault("trapri.numOpenSeaEventhandlers", 3)

// eventhub
viper.SetDefault("gloomberg.eventhub.numHandler", 3)
viper.SetDefault("gloomberg.eventhub.inQueuesSize", 256)
viper.SetDefault("gloomberg.eventhub.outQueuesSize", 32)

// first txs
viper.SetDefault("gloomberg.firstTxs.min_value", 0.5)

// job runner
viper.SetDefault("jobs.numRunner", 3)
viper.SetDefault("jobs.defaults.intervals", jobs.DefaultIntervals)

viper.SetDefault("etherscan.fetchInterval", time.Second*3)
viper.SetDefault("jobs.status_every", 137)

// OLD worker settings OLD
viper.SetDefault("server.workers.newHeadHandler", 2)
Expand Down
4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var (
keyPath string

// grpc.
grpcServer net.IP
grpcListen net.IP
grpcServerListen net.IP
grpcClientHost net.IP

gb *gloomberg.Gloomberg
)
Expand Down
11 changes: 3 additions & 8 deletions cmd/seawatcherCmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@ func init() {
seaWatcherCmd.Flags().Uint16("grpc-port", 31337, "gRPC server port")
_ = viper.BindPFlag("seawatcher.grpc.port", seaWatcherCmd.Flags().Lookup("grpc-port"))
// grpc server
seaWatcherCmd.Flags().IPVar(&grpcServer, "grpc-listen", nil, "gRPC server listen address")
seaWatcherCmd.Flags().IPVar(&grpcServerListen, "grpc-listen", nil, "gRPC server listen address")
_ = viper.BindPFlag("grpc.listen", seaWatcherCmd.Flags().Lookup("grpc-listen"))
// grpc client
seaWatcherCmd.Flags().IPVar(&grpcListen, "grpc", nil, "server gRPC client connects to")
_ = viper.BindPFlag("grpc.server", seaWatcherCmd.Flags().Lookup("grpc"))

// // metrics/prometheus
// viper.SetDefault("metrics.enabled", false)
// viper.SetDefault("metrics.host", net.IPv4(0, 0, 0, 0))
// viper.SetDefault("metrics.port", 9090)
seaWatcherCmd.Flags().IPVar(&grpcClientHost, "grpc", nil, "server gRPC client connects to")
_ = viper.BindPFlag("seawatcher.grpc.client.host", seaWatcherCmd.Flags().Lookup("grpc"))
}

func runSeawatcher(cmd *cobra.Command, _ []string) {
Expand Down
4 changes: 2 additions & 2 deletions internal/seawa/models/item_listed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import "time"
type ItemListed struct {
EventType string `json:"event_type" mapstructure:"event_type"`
SentAt time.Time `json:"sent_at" mapstructure:"sent_at"`
Payload itemListedPayload `json:"payload" mapstructure:"payload"`
Payload ItemListedPayload `json:"payload" mapstructure:"payload"`

Other map[string]interface{} `mapstructure:",remain"`
}

type itemListedPayload struct {
type ItemListedPayload struct {
Item Item `json:"item" mapstructure:"item"`
EventPayload `mapstructure:",squash"`

Expand Down
32 changes: 27 additions & 5 deletions internal/seawa/models/nftid.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,46 @@ func NewNftID(chain string, contractAddress common.Address, tokenID *big.Int) *N
func ParseNftID(combinedNftID string) *NftID {
nftID := strings.Split(combinedNftID, "/")

if len(nftID) != 3 {
gbl.Log.Error("Invalid NFT ID: %s", combinedNftID)
chain := "ethereum"

var collection common.Address

var tokenID *big.Int

var rawTokenID string

switch len(nftID) {
case 3:
chain = nftID[0]

collection = common.HexToAddress(nftID[1])

rawTokenID = nftID[2]

case 2:
collection = common.HexToAddress(nftID[0])

rawTokenID = nftID[1]

default:
gbl.Log.Errorf("Invalid NFT ID: %s", combinedNftID)

empty := []string{"", "", ""}

return (*NftID)(&empty)
}

tokenID, ok := new(big.Int).SetString(nftID[2], 10)
var ok bool
tokenID, ok = new(big.Int).SetString(rawTokenID, 10)
if !ok {
gbl.Log.Error("Invalid NFT ID: %s", combinedNftID)
gbl.Log.Errorf("Invalid NFT ID - error parsing tokenID: %s", combinedNftID)

empty := []string{"", "", ""}

return (*NftID)(&empty)
}

return NewNftID(nftID[0], common.HexToAddress(nftID[1]), tokenID)
return NewNftID(chain, collection, tokenID)
}

// Chain returns the chain of the token.
Expand Down

0 comments on commit e565589

Please sign in to comment.