Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/crawler: mostly cosmetic changes and refactors #54

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions cmd/crawler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,33 @@ var (
Usage: "API server for the crawler",
Action: startAPI,
Flags: []cli.Flag{
&apiDBFlag,
&apiListenAddrFlag,
&autovacuumFlag,
&busyTimeoutFlag,
&crawlerDBFlag,
&dropNodesTimeFlag,
apiDBFlag,
apiListenAddrFlag,
autovacuumFlag,
busyTimeoutFlag,
crawlerDBFlag,
dropNodesTimeFlag,
},
}
)

func startAPI(ctx *cli.Context) error {
autovacuum := ctx.String(autovacuumFlag.Name)
busyTimeout := ctx.Uint64(busyTimeoutFlag.Name)
var (
crawlerDBPath = ctx.String(crawlerDBFlag.Name)
apiDBPath = ctx.String(apiDBFlag.Name)
autovacuum = ctx.String(autovacuumFlag.Name)
busyTimeout = ctx.Uint64(busyTimeoutFlag.Name)
)

crawlerDB, err := openSQLiteDB(
ctx.String(crawlerDBFlag.Name),
crawlerDBPath,
autovacuum,
busyTimeout,
)
if err != nil {
return err
}

apiDBPath := ctx.String(apiDBFlag.Name)
shouldInit := false
if _, err := os.Stat(apiDBPath); os.IsNotExist(err) {
shouldInit = true
Expand All @@ -64,17 +67,28 @@ func startAPI(ctx *cli.Context) error {
return err
}
}

// Start daemons
var wg sync.WaitGroup
wg.Add(3)

// Start reading deamon
go newNodeDeamon(&wg, crawlerDB, nodeDB)
go dropDeamon(&wg, nodeDB, ctx.Duration(dropNodesTimeFlag.Name))

// Start reading daemon
go func() {
defer wg.Done()
newNodeDaemon(crawlerDB, nodeDB)
}()
// Start the drop daemon
go func() {
defer wg.Done()
dropDaemon(nodeDB, ctx.Duration(dropNodesTimeFlag.Name))
}()
// Start the API deamon
apiAddress := ctx.String(apiListenAddrFlag.Name)
apiDeamon := api.New(apiAddress, nodeDB)
go apiDeamon.HandleRequests(&wg)
apiDaemon := api.New(apiAddress, nodeDB)
go func() {
defer wg.Done()
apiDaemon.HandleRequests()
}()
wg.Wait()

return nil
Expand Down Expand Up @@ -112,13 +126,10 @@ func transferNewNodes(crawlerDB, nodeDB *sql.DB) error {
return nil
}

// newNodeDeamon reads new nodes from the crawler and puts them in the db
// newNodeDaemon reads new nodes from the crawler and puts them in the db
// Might trigger the invalidation of caches for the api in the future
func newNodeDeamon(wg *sync.WaitGroup, crawlerDB, nodeDB *sql.DB) {
defer wg.Done()

// This is so that we can make some kind of exponential backoff for the
// retries.
func newNodeDaemon(crawlerDB, nodeDB *sql.DB) {
// Exponentially increase the backoff time
retryTimeout := time.Minute

for {
Expand All @@ -135,8 +146,7 @@ func newNodeDeamon(wg *sync.WaitGroup, crawlerDB, nodeDB *sql.DB) {
}
}

func dropDeamon(wg *sync.WaitGroup, db *sql.DB, dropTimeout time.Duration) {
defer wg.Done()
func dropDaemon(db *sql.DB, dropTimeout time.Duration) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

Expand Down
26 changes: 13 additions & 13 deletions cmd/crawler/crawlercmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ var (
Usage: "Crawl the ethereum network",
Action: crawlNodes,
Flags: []cli.Flag{
&autovacuumFlag,
&bootnodesFlag,
&busyTimeoutFlag,
&crawlerDBFlag,
&geoipdbFlag,
&listenAddrFlag,
&nodeFileFlag,
&nodeURLFlag,
&nodedbFlag,
&nodekeyFlag,
&timeoutFlag,
&workersFlag,
autovacuumFlag,
bootnodesFlag,
busyTimeoutFlag,
crawlerDBFlag,
geoipdbFlag,
listenAddrFlag,
nodeFileFlag,
nodeURLFlag,
nodedbFlag,
nodekeyFlag,
timeoutFlag,
workersFlag,
utils.GoerliFlag,
utils.NetworkIdFlag,
utils.SepoliaFlag,
Expand All @@ -77,8 +77,8 @@ func crawlNodes(ctx *cli.Context) error {
if _, err := os.Stat(name); os.IsNotExist(err) {
shouldInit = true
}
var err error

var err error
db, err = openSQLiteDB(
name,
ctx.String(autovacuumFlag.Name),
Expand Down
30 changes: 15 additions & 15 deletions cmd/crawler/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,76 @@ import (
)

var (
apiDBFlag = cli.StringFlag{
apiDBFlag = &cli.StringFlag{
Name: "api-db",
Usage: "API SQLite file name",
Required: true,
}
apiListenAddrFlag = cli.StringFlag{
apiListenAddrFlag = &cli.StringFlag{
Name: "addr",
Usage: "Listening address",
Value: "0.0.0.0:10000",
}
autovacuumFlag = cli.StringFlag{
autovacuumFlag = &cli.StringFlag{
Name: "autovacuum",
Usage: ("Sets the autovacuum value for the databases. Possible values: " +
"NONE, FULL, or INCREMENTAL. " +
"https://www.sqlite.org/pragma.html#pragma_auto_vacuum"),
Value: "INCREMENTAL",
}
bootnodesFlag = cli.StringSliceFlag{
bootnodesFlag = &cli.StringSliceFlag{
Name: "bootnodes",
Usage: ("Comma separated nodes used for bootstrapping. " +
"Defaults to hard-coded values for the selected network"),
}
busyTimeoutFlag = cli.Uint64Flag{
busyTimeoutFlag = &cli.Uint64Flag{
Name: "busy-timeout",
Usage: ("Sets the busy_timeout value for the database in milliseconds. " +
"https://www.sqlite.org/pragma.html#pragma_busy_timeout"),
Value: 3000,
}
crawlerDBFlag = cli.StringFlag{
crawlerDBFlag = &cli.StringFlag{
Name: "crawler-db",
Usage: "Crawler SQLite file name",
Required: true,
}
dropNodesTimeFlag = cli.DurationFlag{
dropNodesTimeFlag = &cli.DurationFlag{
Name: "drop-time",
Usage: "Time to drop crawled nodes without any updates",
Value: 24 * time.Hour,
}
geoipdbFlag = cli.StringFlag{
geoipdbFlag = &cli.StringFlag{
Name: "geoipdb",
Usage: "geoip2 database location",
}
listenAddrFlag = cli.StringFlag{
listenAddrFlag = &cli.StringFlag{
Name: "addr",
Usage: "Listening address",
Value: "0.0.0.0:0",
}
nodedbFlag = cli.StringFlag{
nodedbFlag = &cli.StringFlag{
Name: "nodedb",
Usage: "Nodes database location. Defaults to in memory database",
}
nodeFileFlag = cli.StringFlag{
nodeFileFlag = &cli.StringFlag{
Name: "nodefile",
Usage: "Path to a node file containing nodes to be crawled",
}
nodekeyFlag = cli.StringFlag{
nodekeyFlag = &cli.StringFlag{
Name: "nodekey",
Usage: "Hex-encoded node key",
}
nodeURLFlag = cli.StringFlag{
nodeURLFlag = &cli.StringFlag{
Name: "nodeURL",
Usage: "URL of the node you want to connect to",
// Value: "http://localhost:8545",
}
timeoutFlag = cli.DurationFlag{
timeoutFlag = &cli.DurationFlag{
Name: "timeout",
Usage: "Timeout for the crawling in a round",
Value: 5 * time.Minute,
}
workersFlag = cli.Uint64Flag{
workersFlag = &cli.Uint64Flag{
Name: "workers",
Usage: "Number of workers to start for updating nodes",
Value: 16,
Expand Down
4 changes: 1 addition & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -49,8 +48,7 @@ func (a *Api) dropCacheLoop() {
}
}

func (a *Api) HandleRequests(wg *sync.WaitGroup) {
defer wg.Done()
func (a *Api) HandleRequests() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Hello")) })
router.HandleFunc("/v1/dashboard", a.handleDashboard).Queries("filter", "{filter}")
Expand Down
10 changes: 5 additions & 5 deletions pkg/common/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ func (ns NodeSet) TopN(n int) NodeSet {
return ns
}

byscore := make([]NodeJSON, 0, len(ns))
byScore := make([]NodeJSON, 0, len(ns))
for _, v := range ns {
byscore = append(byscore, v)
byScore = append(byScore, v)
}
sort.Slice(byscore, func(i, j int) bool {
return byscore[i].Score >= byscore[j].Score
sort.Slice(byScore, func(i, j int) bool {
return byScore[i].Score >= byScore[j].Score
})
result := make(NodeSet, n)
for _, v := range byscore[:n] {
for _, v := range byScore[:n] {
result[v.N.ID()] = v
}
return result
Expand Down
Loading