Skip to content

Commit

Permalink
Queue the requests
Browse files Browse the repository at this point in the history
  • Loading branch information
musabgultekin committed Oct 22, 2021
1 parent 6415a77 commit d4d9c07
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions geziyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Geziyor struct {
sync.RWMutex
hostSems map[string]chan struct{}
}
requestsQueue chan struct {
*client.Request
callback func(g *Geziyor, r *client.Response)
}
shutdown bool
}

Expand Down Expand Up @@ -112,6 +116,11 @@ func NewGeziyor(opt *Options) *Geziyor {
hostSems map[string]chan struct{}
}{hostSems: make(map[string]chan struct{})}
}
queueSize := 1000000
geziyor.requestsQueue = make(chan struct {
*client.Request
callback func(g *Geziyor, r *client.Response)
}, queueSize)

// Base Middlewares
metricsMiddleware := &middleware.Metrics{Metrics: geziyor.metrics}
Expand Down Expand Up @@ -154,6 +163,9 @@ func (g *Geziyor) Start() {
signal.Notify(shutdownChan, os.Interrupt)
go g.interruptSignalWaiter(shutdownChan, shutdownDoneChan)

// Start workers
go g.startWorkers()

// Start Requests
if g.Opt.StartRequestsFunc != nil {
g.Opt.StartRequestsFunc(g)
Expand Down Expand Up @@ -212,7 +224,10 @@ func (g *Geziyor) Do(req *client.Request, callback func(g *Geziyor, r *client.Re
if req.Synchronized {
g.do(req, callback)
} else {
go g.do(req, callback)
g.requestsQueue <- struct {
*client.Request
callback func(g *Geziyor, r *client.Response)
}{Request: req, callback: callback}
}
}

Expand Down Expand Up @@ -303,7 +318,6 @@ func (g *Geziyor) interruptSignalWaiter(shutdownChan chan os.Signal, shutdownDon
case <-shutdownChan:
internal.Logger.Println("Received SIGINT, shutting down gracefully. Send again to force")
g.shutdown = true
signal.Stop(shutdownChan)
case <-shutdownDoneChan:
return
}
Expand Down Expand Up @@ -349,3 +363,29 @@ func (g *Geziyor) startExporters() {
}()
}
}

func (g *Geziyor) startWorkers() {
if g.Opt.ConcurrentRequests != 0 {
for i := 0; i < g.Opt.ConcurrentRequests; i++ {
go func() {
for req := range g.requestsQueue {
if g.shutdown {
g.wgRequests.Done()
continue
}
g.do(req.Request, req.callback)
}
}()
}
} else {
go func() {
for req := range g.requestsQueue {
if g.shutdown {
g.wgRequests.Done()
continue
}
go g.do(req.Request, req.callback)
}
}()
}
}

0 comments on commit d4d9c07

Please sign in to comment.