Skip to content

Commit

Permalink
add ws server command
Browse files Browse the repository at this point in the history
  • Loading branch information
benleb committed Sep 6, 2022
1 parent d596b8e commit 1343fa6
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 18 deletions.
7 changes: 5 additions & 2 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ var (

apiKeyMoralis string

queueEvents = make(chan *collections.Event, 1024)
queueLogs = make(chan types.Log, 1024)
queueListings = make(chan *models.ItemListedEvent, 1024)
queueOutput = make(chan string, 1024)

queueEvents = make(chan *collections.Event, 1024)

queueWS = make(chan *collections.Event, 1024)
queueOutput = make(chan string, 1024)

stats *glicker.Stats

Expand Down
2 changes: 1 addition & 1 deletion cmd/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func live(_ *cobra.Command, _ []string) {
// processes logs from the ethereum chain from our nodes
for _, node := range *nodes {
for workerID := 1; workerID <= viper.GetInt("workers.log_handler"); workerID++ {
go subscriptions.SubscriptionLogsHandler(ctx, node, nodes, ownCollections, logQueues[node.NodeID], queueEvents)
go subscriptions.SubscriptionLogsHandler(ctx, node, nodes, ownCollections, logQueues[node.NodeID], queueEvents, queueWS)
}
}

Expand Down
191 changes: 191 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package cmd

import (
"context"
"fmt"
"math"
"net"
"time"

"github.com/benleb/gloomberg/internal/collections"
"github.com/benleb/gloomberg/internal/gbl"
"github.com/benleb/gloomberg/internal/server"
"github.com/benleb/gloomberg/internal/style"
"github.com/benleb/gloomberg/internal/subscriptions"
"github.com/benleb/gloomberg/internal/wwatcher"
"github.com/ethereum/go-ethereum/core/types"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// serverCmd represents the server command.
var serverCmd = &cobra.Command{
Use: "server",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:
Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: gbServer, // func(cmd *cobra.Command, args []string) { server() },
}

//nolint:gochecknoinits
func init() {
rootCmd.AddCommand(serverCmd)

// Here you will define your flags and configuration settings.

// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// serverCmd.PersistentFlags().String("foo", "", "A help for foo")

// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// serverCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")

// stats
viper.SetDefault("stats.enabled", true)
viper.SetDefault("stats.extended", false)
viper.SetDefault("stats.gas", true)
viper.SetDefault("stats.balances", true)
viper.SetDefault("stats.lines", 5)
viper.SetDefault("stats.interval", time.Second*90)

// worker settings
viper.SetDefault("workers.log_handler", 7)
viper.SetDefault("workers.listings_handler", 4)
viper.SetDefault("workers.output", 10)

// number of retries to resolve an ens name to an address or vice versa
viper.SetDefault("ens.resolve_max_retries", 5)

// ipfs gateway to fetch metadata/images
viper.SetDefault("ipfs.gateway", "https://ipfs.io/ipfs/")

// default collections and wallet
viper.SetDefault("collections", []any{map[string]string{"name": "OSF's Red Lite District", "mark": "#6A0F27", "address": "0x513cd71defc801b9c1aa763db47b5df223da77a2"}})
viper.SetDefault("wallets", []string{"pranksy.eth"})
viper.SetDefault("wwatcher", []any{map[string]string{}})

// show desktop notifications
serverCmd.Flags().Bool("notifications", false, "Show notifications?")
_ = viper.BindPFlag("show.notifications", serverCmd.Flags().Lookup("notifications"))

// types of events to show
serverCmd.Flags().Bool("sales", true, "Show sales?")
_ = viper.BindPFlag("show.sales", serverCmd.Flags().Lookup("sales"))
serverCmd.Flags().Bool("mints", false, "Show mints?")
_ = viper.BindPFlag("show.mints", serverCmd.Flags().Lookup("mints"))
serverCmd.Flags().Bool("listings", false, "Show listings?")
_ = viper.BindPFlag("show.listings", serverCmd.Flags().Lookup("listings"))
serverCmd.Flags().Bool("transfers", false, "Show transfers?")
_ = viper.BindPFlag("show.transfers", serverCmd.Flags().Lookup("transfers"))
serverCmd.Flags().Float64("min-price", 0.0, "Minimum price to show sales?")
_ = viper.BindPFlag("show.min_price", serverCmd.Flags().Lookup("min-price"))

// process & show just our own collections (from config/wallet)
serverCmd.Flags().Bool("own", false, "Show only own collections (from config/wallet)")
_ = viper.BindPFlag("show.own", serverCmd.Flags().Lookup("own"))

// websockets server
serverCmd.Flags().Bool("server", false, "Start websockets server")
_ = viper.BindPFlag("server.enabled", serverCmd.Flags().Lookup("server"))
serverCmd.Flags().IP("host", net.IPv4(0, 0, 0, 0), "Websockets server port")
_ = viper.BindPFlag("server.host", serverCmd.Flags().Lookup("host"))
serverCmd.Flags().Uint16("port", 42069, "Websockets server port")
_ = viper.BindPFlag("server.port", serverCmd.Flags().Lookup("port"))

// telegram bot
serverCmd.Flags().Bool("telegram", false, "Start telegram bot")
_ = viper.BindPFlag("telegram.enabled", serverCmd.Flags().Lookup("telegram"))
}

func gbServer(_ *cobra.Command, _ []string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*4200*10)
defer cancel()

gbl.GetSugaredLogger()

// wallets *models.Wallets

ownCollections := collections.New()

//
// config
//
if viper.GetString("server.host") != net.IPv4(0, 0, 0, 0).String() || viper.GetUint("server.port") != 42069 {
viper.Set("server.enabled", true)
}

if viper.GetFloat64("show.min_price") > 0.0 {
numGlickerLines := math.Max(float64(viper.GetInt("stats.lines")), 4.0)
viper.Set("stats.lines", numGlickerLines)
}

// show listings for own collections if an opensea api key is set
if viper.IsSet("api_keys.opensea") && !viper.IsSet("show.listings") {
viper.Set("show.listings", true)
}

// wallet watcher
wwatcher.InitWatcher()

// write settings to log
gbl.Log.Debug(viper.AllSettings())

// print header
header := style.GetHeader(Version)
fmt.Println(header)
gbl.Log.Info(header)

//
// initialize node connections
//
nodes := getNodes()

// if we subscribe to all chain-events, we can do it now
if !viper.GetBool("show.own") {
for _, node := range *nodes {
queue := make(chan types.Log, 1024)
logQueues[node.NodeID] = &queue
nodes.SubscribeToAllTransfers(ctx, *logQueues[node.NodeID])
}
}

//
// MIWs
//
miwSpinner := style.GetSpinner("setting up MIWs...")
_ = miwSpinner.Start()

wwatcher.LoadMIWs()

if len(wwatcher.MIWC.WeightedMIWs) > 0 {
miwSpinner.StopMessage(fmt.Sprint(fmt.Sprint(style.BoldStyle.Render(fmt.Sprint(len(wwatcher.MIWC.WeightedMIWs))), " MIWs loaded", "\n")))
_ = miwSpinner.Stop()
} else {
_ = miwSpinner.StopFail()
}

//
// set up workers print to process events from the chain
//

// processes logs from the ethereum chain from our nodes
for _, node := range *nodes {
for workerID := 1; workerID <= viper.GetInt("workers.log_handler"); workerID++ {
go subscriptions.SubscriptionLogsHandler(ctx, node, nodes, ownCollections, logQueues[node.NodeID], queueEvents, queueWS)
}
}

// websockets server
if viper.GetBool("server.enabled") {
viper.Set("mode.server", true)

go server.StartWebsocketServer(&queueWS)
}

select {}
}
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ require (
github.com/gen2brain/beeep v0.0.0-20220518085355-d7852edf42fc
github.com/go-redis/redis/v8 v8.11.5
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1
github.com/gobwas/ws v1.1.0
github.com/jinzhu/copier v0.3.5
github.com/juliangruber/go-intersect v1.1.0
github.com/nshafer/phx v0.2.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.12.0
github.com/spf13/viper v1.13.0
github.com/theckman/yacspin v0.13.12
github.com/wealdtech/go-ens/v3 v3.5.5
go.uber.org/zap v1.23.0
Expand All @@ -35,6 +37,8 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/go-toast/toast v0.0.0-20190211030409-01e6764cf0a4 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down Expand Up @@ -78,7 +82,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
17 changes: 13 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGi
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8=
github.com/go-toast/toast v0.0.0-20190211030409-01e6764cf0a4 h1:qZNfIGkIANxGv/OqtnntR4DfOY2+BgwR60cAcu/i3SE=
github.com/go-toast/toast v0.0.0-20190211030409-01e6764cf0a4/go.mod h1:kW3HQ4UdaAyrUCSSDR4xUzBKW6O2iA4uHhk7AtyYp10=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0 h1:7RFti/xnNkMJnrK7D1yQ/iCIB5OrrY/54/H930kIbHA=
github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang-jwt/jwt/v4 v4.3.0 h1:kHL1vqdqWNfATmA0FNMdmZNMyZI1U6O31X4rlIPoBog=
Expand Down Expand Up @@ -190,6 +196,8 @@ github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
github.com/ipfs/go-cid v0.3.2 h1:OGgOd+JCFM+y1DjWPmVH+2/4POtpDzwcr7VgnB7mZXc=
github.com/ipfs/go-cid v0.3.2/go.mod h1:gQ8pKqT/sUxGY+tIwy1RPpAojYu7jAyCp5Tz1svoupw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/juliangruber/go-intersect v1.1.0 h1:sc+y5dCjMMx0pAdYk/N6KBm00tD/f3tq+Iox7dYDUrY=
Expand Down Expand Up @@ -289,8 +297,8 @@ github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmq
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
github.com/spf13/viper v1.13.0 h1:BWSJ/M+f+3nmdz9bxB+bWX28kkALN2ok11D0rSo8EJU=
github.com/spf13/viper v1.13.0/go.mod h1:Icm2xNL3/8uyh/wFuB1jI7TiTNKp8632Nwegu+zgdYw=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -471,6 +479,7 @@ golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -485,8 +494,8 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 h1:C1tElbkWrsSkn3IRl1GCW/gETw1TywWIPgwZtXTZbYg=
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 h1:Q5284mrmYTpACcm+eAKjKJH48BBwSyfJqmmGDTtT8Vc=
Expand Down
64 changes: 64 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package server

import (
"encoding/json"
"fmt"
"net/http"

"github.com/benleb/gloomberg/internal/collections"
"github.com/benleb/gloomberg/internal/gbl"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/jinzhu/copier"
"github.com/spf13/viper"
)

func StartWebsocketServer(queueWS *chan *collections.Event) {
listenHost := viper.GetString("server.host")
listenPort := viper.GetUint("server.port")
listenOn := fmt.Sprint(listenHost) + ":" + fmt.Sprint(listenPort)

gbl.Log.Infof("starting websocket server on %s\n", listenOn)

http.ListenAndServe(listenOn, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
// handle error
gbl.Log.Error(err)
}

go func() {
defer conn.Close()

for event := range *queueWS {

var publishEvent collections.Event

copier.Copy(&publishEvent, &event)

// salira := publishEvent.Collection.SaLiRa
// publishEvent.Collection.Sal = salira.Value()
// publishEvent.Collection.SaLiRa = nil

// wannabeFloor := publishEvent.Collection.ArtificialFloor
// publishEvent.Collection.MovingAverageValue = wannabeFloor.Value()
// publishEvent.Collection.MovingAverage = nil

// msg := fmt.Sprintf("%+v", event)
marshalledEvent, _ := json.Marshal(publishEvent)
err = wsutil.WriteServerText(conn, marshalledEvent)
if err != nil {
// handle error
gbl.Log.Error(err)
}

}

// msg, op, err := wsutil.ReadClientData(conn)
// if err != nil {
// // handle error
// }
// fmt.Printf("server msg: %s | op: %s | err: %s\n", string(msg), string(op), err)
}()
}))
}
9 changes: 6 additions & 3 deletions internal/subscriptions/sales.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
ensContractAddress = common.HexToAddress("0x57f1887a8BF19b14fC0dF6Fd9B2acc9Af147eA85")
)

func SubscriptionLogsHandler(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCollection, gOwnCollections *collections.Collections, queueLogs *chan types.Log, queueEvents chan *collections.Event) {
func SubscriptionLogsHandler(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCollection, gOwnCollections *collections.Collections, queueLogs *chan types.Log, queueEvents chan *collections.Event, queueWS chan *collections.Event) {
for subLog := range *queueLogs {
// atomic.AddUint64(&stats.queueEvents, 1)
gbl.Log.Debugf("%s | new subscription log (%d): %+v", time.Now().String(), len(*queueLogs), subLog)
Expand Down Expand Up @@ -75,11 +75,11 @@ func SubscriptionLogsHandler(ctx context.Context, node *gbnode.ChainNode, nodes
continue
}

go parseLog(ctx, node, nodes, gOwnCollections, subLog, queueEvents)
go parseLog(ctx, node, nodes, gOwnCollections, subLog, queueEvents, queueWS)
}
}

func parseLog(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCollection, ownCollections *collections.Collections, subLog types.Log, queueEvents chan *collections.Event) {
func parseLog(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCollection, ownCollections *collections.Collections, subLog types.Log, queueEvents chan *collections.Event, queueWS chan *collections.Event) {
// transaction collector to "recognize" multi-item txs
var transco *TransactionCollector

Expand Down Expand Up @@ -253,6 +253,9 @@ func parseLog(ctx context.Context, node *gbnode.ChainNode, nodes *gbnode.NodeCol
// send to formatting
queueEvents <- event

// send to websockets output
queueWS <- event

gbCache := cache.New(ctx)
// cache.StoreEvent(event.Collection.Name, event.TokenID, event.PriceWei.Uint64(), event.TxItemCount, event.Time, eventType.String())
gbCache.StoreEvent(event.Collection.ContractAddress, event.Collection.Name, event.TokenID, event.PriceWei.Uint64(), event.TxItemCount, event.Time, int64(eventType))
Expand Down
Loading

0 comments on commit 1343fa6

Please sign in to comment.