Skip to content

Commit

Permalink
Merge pull request #721 from forta-network/ali/wait-init-to-process
Browse files Browse the repository at this point in the history
Wait for agent initalization to start processing
  • Loading branch information
aomerk committed May 18, 2023
2 parents 600462b + 0abe85f commit 3f766fb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
6 changes: 3 additions & 3 deletions services/scanner/agentpool/agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (ap *AgentPool) SendEvaluateTxRequest(req *protocol.EvaluateTxRequest) {
}
var metricsList []*protocol.AgentMetric
for _, agent := range agents {
if !agent.IsInitialized() || !agent.ShouldProcessBlock(req.Event.Block.BlockNumber) {
if !agent.IsReady() || !agent.ShouldProcessBlock(req.Event.Block.BlockNumber) {
continue
}
lg.WithFields(log.Fields{
Expand Down Expand Up @@ -212,7 +212,7 @@ func (ap *AgentPool) SendEvaluateBlockRequest(req *protocol.EvaluateBlockRequest

var metricsList []*protocol.AgentMetric
for _, agent := range agents {
if !agent.IsInitialized() || !agent.ShouldProcessBlock(req.Event.BlockNumber) {
if !agent.IsReady() || !agent.ShouldProcessBlock(req.Event.BlockNumber) {
continue
}

Expand Down Expand Up @@ -293,7 +293,7 @@ func (ap *AgentPool) SendEvaluateAlertRequest(req *protocol.EvaluateAlertRequest
continue
}

if !agent.IsInitialized() {
if !agent.IsReady() {
continue
}
target = agent
Expand Down
7 changes: 7 additions & 0 deletions services/scanner/agentpool/poolagent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func (agent *Agent) processTransactions() {
"evaluate": "transaction",
},
)
// wait for initialization to start processing
<-agent.Initialized()

for request := range agent.txRequests {
if exit := agent.processTransaction(lg, request); exit {
Expand Down Expand Up @@ -447,6 +449,8 @@ func (agent *Agent) processBlocks() {
"evaluate": "block",
},
)
// wait for initialization to start processing
<-agent.Initialized()

for request := range agent.blockRequests {
if exit := agent.processBlock(lg, request); exit {
Expand Down Expand Up @@ -528,6 +532,9 @@ func (agent *Agent) processCombinationAlerts() {
},
)

// wait for initialization to start processing
<-agent.Initialized()

for request := range agent.combinationRequests {
if exit := agent.processCombinationAlert(lg, request); exit {
return
Expand Down

0 comments on commit 3f766fb

Please sign in to comment.