Skip to content

Commit

Permalink
Merge pull request #54 from MariusVanDerWijden/refactor-crawler
Browse files Browse the repository at this point in the history
cmd/crawler: mostly cosmetic changes and refactors
  • Loading branch information
MariusVanDerWijden authored Oct 16, 2023
2 parents 17086ce + 06d8580 commit 484b21b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 60 deletions.
58 changes: 34 additions & 24 deletions cmd/crawler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,33 @@ var (
Usage: "API server for the crawler",
Action: startAPI,
Flags: []cli.Flag{
&apiDBFlag,
&apiListenAddrFlag,
&autovacuumFlag,
&busyTimeoutFlag,
&crawlerDBFlag,
&dropNodesTimeFlag,
apiDBFlag,
apiListenAddrFlag,
autovacuumFlag,
busyTimeoutFlag,
crawlerDBFlag,
dropNodesTimeFlag,
},
}
)

func startAPI(ctx *cli.Context) error {
autovacuum := ctx.String(autovacuumFlag.Name)
busyTimeout := ctx.Uint64(busyTimeoutFlag.Name)
var (
crawlerDBPath = ctx.String(crawlerDBFlag.Name)
apiDBPath = ctx.String(apiDBFlag.Name)
autovacuum = ctx.String(autovacuumFlag.Name)
busyTimeout = ctx.Uint64(busyTimeoutFlag.Name)
)

crawlerDB, err := openSQLiteDB(
ctx.String(crawlerDBFlag.Name),
crawlerDBPath,
autovacuum,
busyTimeout,
)
if err != nil {
return err
}

apiDBPath := ctx.String(apiDBFlag.Name)
shouldInit := false
if _, err := os.Stat(apiDBPath); os.IsNotExist(err) {
shouldInit = true
Expand All @@ -64,17 +67,28 @@ func startAPI(ctx *cli.Context) error {
return err
}
}

// Start daemons
var wg sync.WaitGroup
wg.Add(3)

// Start reading deamon
go newNodeDeamon(&wg, crawlerDB, nodeDB)
go dropDeamon(&wg, nodeDB, ctx.Duration(dropNodesTimeFlag.Name))

// Start reading daemon
go func() {
defer wg.Done()
newNodeDaemon(crawlerDB, nodeDB)
}()
// Start the drop daemon
go func() {
defer wg.Done()
dropDaemon(nodeDB, ctx.Duration(dropNodesTimeFlag.Name))
}()
// Start the API deamon
apiAddress := ctx.String(apiListenAddrFlag.Name)
apiDeamon := api.New(apiAddress, nodeDB)
go apiDeamon.HandleRequests(&wg)
apiDaemon := api.New(apiAddress, nodeDB)
go func() {
defer wg.Done()
apiDaemon.HandleRequests()
}()
wg.Wait()

return nil
Expand Down Expand Up @@ -111,13 +125,10 @@ func transferNewNodes(crawlerDB, nodeDB *sql.DB) error {
return nil
}

// newNodeDeamon reads new nodes from the crawler and puts them in the db
// newNodeDaemon reads new nodes from the crawler and puts them in the db
// Might trigger the invalidation of caches for the api in the future
func newNodeDeamon(wg *sync.WaitGroup, crawlerDB, nodeDB *sql.DB) {
defer wg.Done()

// This is so that we can make some kind of exponential backoff for the
// retries.
func newNodeDaemon(crawlerDB, nodeDB *sql.DB) {
// Exponentially increase the backoff time
retryTimeout := time.Minute

for {
Expand All @@ -134,8 +145,7 @@ func newNodeDeamon(wg *sync.WaitGroup, crawlerDB, nodeDB *sql.DB) {
}
}

func dropDeamon(wg *sync.WaitGroup, db *sql.DB, dropTimeout time.Duration) {
defer wg.Done()
func dropDaemon(db *sql.DB, dropTimeout time.Duration) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

Expand Down
26 changes: 13 additions & 13 deletions cmd/crawler/crawlercmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ var (
Usage: "Crawl the ethereum network",
Action: crawlNodes,
Flags: []cli.Flag{
&autovacuumFlag,
&bootnodesFlag,
&busyTimeoutFlag,
&crawlerDBFlag,
&geoipdbFlag,
&listenAddrFlag,
&nodeFileFlag,
&nodeURLFlag,
&nodedbFlag,
&nodekeyFlag,
&timeoutFlag,
&workersFlag,
autovacuumFlag,
bootnodesFlag,
busyTimeoutFlag,
crawlerDBFlag,
geoipdbFlag,
listenAddrFlag,
nodeFileFlag,
nodeURLFlag,
nodedbFlag,
nodekeyFlag,
timeoutFlag,
workersFlag,
utils.GoerliFlag,
utils.NetworkIdFlag,
utils.SepoliaFlag,
Expand All @@ -77,8 +77,8 @@ func crawlNodes(ctx *cli.Context) error {
if _, err := os.Stat(name); os.IsNotExist(err) {
shouldInit = true
}
var err error

var err error
db, err = openSQLiteDB(
name,
ctx.String(autovacuumFlag.Name),
Expand Down
30 changes: 15 additions & 15 deletions cmd/crawler/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,76 @@ import (
)

var (
apiDBFlag = cli.StringFlag{
apiDBFlag = &cli.StringFlag{
Name: "api-db",
Usage: "API SQLite file name",
Required: true,
}
apiListenAddrFlag = cli.StringFlag{
apiListenAddrFlag = &cli.StringFlag{
Name: "addr",
Usage: "Listening address",
Value: "0.0.0.0:10000",
}
autovacuumFlag = cli.StringFlag{
autovacuumFlag = &cli.StringFlag{
Name: "autovacuum",
Usage: ("Sets the autovacuum value for the databases. Possible values: " +
"NONE, FULL, or INCREMENTAL. " +
"https://www.sqlite.org/pragma.html#pragma_auto_vacuum"),
Value: "INCREMENTAL",
}
bootnodesFlag = cli.StringSliceFlag{
bootnodesFlag = &cli.StringSliceFlag{
Name: "bootnodes",
Usage: ("Comma separated nodes used for bootstrapping. " +
"Defaults to hard-coded values for the selected network"),
}
busyTimeoutFlag = cli.Uint64Flag{
busyTimeoutFlag = &cli.Uint64Flag{
Name: "busy-timeout",
Usage: ("Sets the busy_timeout value for the database in milliseconds. " +
"https://www.sqlite.org/pragma.html#pragma_busy_timeout"),
Value: 3000,
}
crawlerDBFlag = cli.StringFlag{
crawlerDBFlag = &cli.StringFlag{
Name: "crawler-db",
Usage: "Crawler SQLite file name",
Required: true,
}
dropNodesTimeFlag = cli.DurationFlag{
dropNodesTimeFlag = &cli.DurationFlag{
Name: "drop-time",
Usage: "Time to drop crawled nodes without any updates",
Value: 24 * time.Hour,
}
geoipdbFlag = cli.StringFlag{
geoipdbFlag = &cli.StringFlag{
Name: "geoipdb",
Usage: "geoip2 database location",
}
listenAddrFlag = cli.StringFlag{
listenAddrFlag = &cli.StringFlag{
Name: "addr",
Usage: "Listening address",
Value: "0.0.0.0:0",
}
nodedbFlag = cli.StringFlag{
nodedbFlag = &cli.StringFlag{
Name: "nodedb",
Usage: "Nodes database location. Defaults to in memory database",
}
nodeFileFlag = cli.StringFlag{
nodeFileFlag = &cli.StringFlag{
Name: "nodefile",
Usage: "Path to a node file containing nodes to be crawled",
}
nodekeyFlag = cli.StringFlag{
nodekeyFlag = &cli.StringFlag{
Name: "nodekey",
Usage: "Hex-encoded node key",
}
nodeURLFlag = cli.StringFlag{
nodeURLFlag = &cli.StringFlag{
Name: "nodeURL",
Usage: "URL of the node you want to connect to",
// Value: "http://localhost:8545",
}
timeoutFlag = cli.DurationFlag{
timeoutFlag = &cli.DurationFlag{
Name: "timeout",
Usage: "Timeout for the crawling in a round",
Value: 5 * time.Minute,
}
workersFlag = cli.Uint64Flag{
workersFlag = &cli.Uint64Flag{
Name: "workers",
Usage: "Number of workers to start for updating nodes",
Value: 16,
Expand Down
4 changes: 1 addition & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -49,8 +48,7 @@ func (a *Api) dropCacheLoop() {
}
}

func (a *Api) HandleRequests(wg *sync.WaitGroup) {
defer wg.Done()
func (a *Api) HandleRequests() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Hello")) })
router.HandleFunc("/v1/dashboard", a.handleDashboard).Queries("filter", "{filter}")
Expand Down
10 changes: 5 additions & 5 deletions pkg/common/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ func (ns NodeSet) TopN(n int) NodeSet {
return ns
}

byscore := make([]NodeJSON, 0, len(ns))
byScore := make([]NodeJSON, 0, len(ns))
for _, v := range ns {
byscore = append(byscore, v)
byScore = append(byScore, v)
}
sort.Slice(byscore, func(i, j int) bool {
return byscore[i].Score >= byscore[j].Score
sort.Slice(byScore, func(i, j int) bool {
return byScore[i].Score >= byScore[j].Score
})
result := make(NodeSet, n)
for _, v := range byscore[:n] {
for _, v := range byScore[:n] {
result[v.N.ID()] = v
}
return result
Expand Down

0 comments on commit 484b21b

Please sign in to comment.