Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
benleb committed Aug 21, 2022
1 parent 3babc50 commit daf2321
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
7 changes: 7 additions & 0 deletions internal/collections/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const (
Purchase
)

func (et EventType) String() string {
return map[EventType]string{
Sale: "Sale", Mint: "Mint", Transfer: "Transfer", Listing: "Listing", Purchase: "Purchase",
}[et]
}

func (dt EventType) Icon() string {
switch dt {
case Sale:
Expand Down Expand Up @@ -58,6 +64,7 @@ func (dt EventType) ActionName() string {
}

type Event struct {
NodeID int
EventType EventType
Topic string
TxHash common.Hash
Expand Down
2 changes: 2 additions & 0 deletions internal/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func FormatTokenInfo(tokenID uint64, collection *collections.GbCollection, faint
collectionName = strings.ReplaceAll(collectionName, "Psychedelics Anonymous", "PA")
collectionName = strings.ReplaceAll(collectionName, "Open Edition", "OE")
collectionName = strings.ReplaceAll(collectionName, "Genesis Edition", "Genesis")
collectionName = strings.ReplaceAll(collectionName, "Golid and Deca", "G&D")
collectionName = strings.ReplaceAll(collectionName, "[ Ledger ] Market Pass", "Ledger Market Pass")

collectionName = strings.ReplaceAll(collectionName, " Collection", "")

Expand Down
2 changes: 1 addition & 1 deletion internal/subscriptions/listings.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func StreamListingsHandler(workerID int, gOwnCollections *collections.Collection
continue
}

patternTokenID := regexp.MustCompile(`^(.*?)\ #?(\d*)(\/.*)?$`)
patternTokenID := regexp.MustCompile(`^(.*?)\ ?#?(\d*)(\/.*)?$`)
tokenIDRaw := patternTokenID.ReplaceAllString(event.Payload.Item.Metadata.Name, "$2")
gbl.Log.Debugf("tokenIDRaw: %+v", tokenIDRaw)

Expand Down
56 changes: 47 additions & 9 deletions internal/subscriptions/sales.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/benleb/gloomberg/internal/cache"
"github.com/benleb/gloomberg/internal/collections"
"github.com/benleb/gloomberg/internal/gbl"
"github.com/benleb/gloomberg/internal/gbnode"
Expand All @@ -24,10 +25,40 @@ var (
ensContractAddress = common.HexToAddress("0x57f1887a8BF19b14fC0dF6Fd9B2acc9Af147eA85")
)

func SubscriptionLogsHandler(nodes *gbnode.NodeCollection, gOwnCollections *collections.Collections, queueLogs chan types.Log, queueEvents chan *collections.Event) {
for subLog := range queueLogs {
func SubscriptionLogsHandler(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCollection, gOwnCollections *collections.Collections, queueLogs *chan types.Log, queueEvents chan *collections.Event) {
for subLog := range *queueLogs {
// atomic.AddUint64(&stats.queueEvents, 1)
gbl.Log.Debugf("%s | new subscription log (%d): %+v", time.Now().String(), len(queueLogs), subLog)
gbl.Log.Debugf("%s | new subscription log (%d): %+v", time.Now().String(), len(*queueLogs), subLog)

// if subLog.Address == common.HexToAddress("0x8297d8e55c27aa6ce2d8a65b1fa3debb02410efc") && subLog.Topics[1] == common.HexToHash("0xDcaE87821FA6CAEA05dBc2811126f4bc7fF73bd1") {
// fmt.Println("")
// fmt.Println("")
// fmt.Println(" ‼️ OSF's 7 Deadly Sins ‼️ OSF's 7 Deadly Sins ‼️ OSF's 7 Deadly Sins ‼️")
// fmt.Println("")
// fmt.Println(" https://opensea.io/collection/osf-7-sins")
// fmt.Println("")
// fmt.Println("")
// notifications.SendAlert("OSF's 7 Deadly Sins", "https://opensea.io/collection/osf-7-sins", true)

// if msg, err := notifications.SendTelegramMessage(1320669206, " @benleb ‼️ OSF's 7 Deadly Sins !!\n\nhttps://opensea.io/collection/osf-7-sins", ""); err != nil {
// gbl.Log.Errorf("failed to send telegram message: %s | imageURI: %s | err: %s\n", msg, "", err)
// }
// }

// if subLog.Address == common.HexToAddress("0x495f947276749ce646f68ac8c248420045cb7b5e") {
// fmt.Println("")
// fmt.Println("")
// fmt.Println(" ‼️ the saddest collection ‼️ the saddest collection ‼️ the saddest collection ‼️")
// fmt.Println("")
// fmt.Println(" https://opensea.io/collection/thesaddestcollection")
// fmt.Println("")
// fmt.Println("")
// notifications.SendAlert("the saddest collection", "https://opensea.io/collection/thesaddestcollection", true)

// if msg, err := notifications.SendTelegramMessage(1320669206, " @benleb ‼️ the saddest collection !!\n\nhttps://opensea.io/collection/thesaddestcollection", ""); err != nil {
// gbl.Log.Errorf("failed to send telegram message: %s | imageURI: %s | err: %s\n", msg, "", err)
// }
// }

// erc721 has 0-3, (erc1155 has topics 2?), erc20 has topics 0-2
if len(subLog.Topics) != 4 {
Expand All @@ -44,11 +75,11 @@ func SubscriptionLogsHandler(nodes *gbnode.NodeCollection, gOwnCollections *coll
continue
}

go parseLog(nodes, gOwnCollections, subLog, queueEvents)
go parseLog(ctx, node, nodes, gOwnCollections, subLog, queueEvents)
}
}

func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collections, subLog types.Log, queueEvents chan *collections.Event) {
func parseLog(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCollection, ownCollections *collections.Collections, subLog types.Log, queueEvents chan *collections.Event) {
// transaction collector to "recognize" multi-item txs
var transco *TransactionCollector

Expand Down Expand Up @@ -110,7 +141,7 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti

if collection == nil {
// atomic.AddUint64(&StatsBTV.DiscardedUnknownCollection, 1)
gbl.Log.Warnf("DiscardedUnknownCollection| %v | TxHash: %v / %d | %+v\n", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)
gbl.Log.Warnf("‼️ DiscardedUnknownCollection| %v | TxHash: %v / %d | %+v\n", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)

return
}
Expand All @@ -129,6 +160,7 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti
// mint
if !showMints && !collection.Show.Mints {
// atomic.AddUint64(&StatsBTV.DiscardedMints, 1)
gbl.Log.Debugf("‼️ mint not shown %v | TxHash: %v / %d | %+v\n", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)
return
}

Expand All @@ -139,7 +171,7 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti
// get the transaction details
tx, _, err := nodes.GetRandomNode().Client.TransactionByHash(context.Background(), subLog.TxHash)
if err != nil {
gbl.Log.Infof("getting tx details failed | %v | TxHash: %v / %d | %+v", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)
gbl.Log.Infof("‼️ getting tx details failed | %v | TxHash: %v / %d | %+v", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)
// atomic.AddUint64(&StatsBTV.DiscardedTransactions, 1)
return
}
Expand All @@ -156,7 +188,7 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti

if !isMint && !isOwnCollection && WeiToEther(value).Cmp(big.NewFloat(viper.GetFloat64("show.min_price"))) < 0 {
// atomic.AddUint64(&StatsBTV.DiscardedLowPrice, 1)
gbl.Log.Debugf("DiscardedLowPrice| %v | TxHash: %v / %d | %+v", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)
gbl.Log.Debugf("‼️ DiscardedLowPrice| %v | TxHash: %v / %d | %+v", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)

return
}
Expand All @@ -165,6 +197,7 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti
// transfer
if !showTransfers {
// atomic.AddUint64(&StatsBTV.DiscardedTransfers, 1)
gbl.Log.Debugf("‼️ transfer not shown %v | TxHash: %v / %d | %+v\n", subLog.Address.String(), subLog.TxHash, subLog.TxIndex, subLog)
return
}

Expand Down Expand Up @@ -195,6 +228,7 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti
// tokenID := GetTokenIDFromTopics(subLog.Topics)

event := &collections.Event{
NodeID: node.NodeID,
EventType: eventType,
Topic: logTopic.String(),
TxHash: subLog.TxHash,
Expand All @@ -217,12 +251,16 @@ func parseLog(nodes *gbnode.NodeCollection, ownCollections *collections.Collecti
// send to formatting
queueEvents <- event

cache := cache.New(ctx)
// cache.StoreEvent(event.Collection.Name, event.TokenID, event.PriceWei.Uint64(), event.TxItemCount, event.Time, eventType.String())
cache.StoreEvent(event.Collection.ContractAddress, event.Collection.Name, event.TokenID, event.PriceWei.Uint64(), event.TxItemCount, event.Time, int64(eventType))

// *outputWs <- event

// xAddArgs := &redis.XAddArgs{
// Stream: "sales",
// MaxLen: 100000,
// Approx: true,
// Approx: true,c
// ID: "*",
// Values: map[string]any{
// "collection": event.Collection.Name,
Expand Down

0 comments on commit daf2321

Please sign in to comment.