Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions redisgraph-bechmark-go.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ func main() {

var rateLimiter = rate.NewLimiter(requestRate, requestBurst)
samplesPerClient := *numberRequests / *clients
samplesPerClientRemainder := *numberRequests % *clients
client_update_tick := 1

connectionStr := fmt.Sprintf("%s:%d", *host, *port)
// a WaitGroup for the goroutines to tell us they've stopped
wg := sync.WaitGroup{}
if !*loop {
log.Printf("Total clients: %d. Commands per client: %d Total commands: %d\n", *clients, samplesPerClient, *numberRequests)
if samplesPerClientRemainder != 0 {
log.Printf("Last client will issue: %d commands.\n", samplesPerClientRemainder+samplesPerClient)
}
} else {
log.Printf("Running in loop until you hit Ctrl+C\n")
}
Expand Down Expand Up @@ -98,12 +102,19 @@ func main() {

dataPointProcessingWg.Add(1)
go processGraphDatapointsChannel(graphDatapointsChann, c1, *numberRequests, &dataPointProcessingWg, &instantHistogramsResetMutex)

// Total commands to be issue per client. Equal for all clients with exception of the last one ( see comment bellow )
clientTotalCmds := samplesPerClient
startTime := time.Now()
for client_id := 0; uint64(client_id) < *clients; client_id++ {
wg.Add(1)
rgs[client_id], conns[client_id] = getStandaloneConn(*graphKey, "tcp", connectionStr, *password)
go ingestionRoutine(&rgs[client_id], true, queries, cdf, samplesPerClient, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann)
// Given the total commands might not be divisible by the #clients
// the last client will send the remainder commands to match the desired request count.
// It's OK to alter clientTotalCmds given this is the last time we use it's value
if uint64(client_id) == (*clients - uint64(1)) {
clientTotalCmds = samplesPerClientRemainder + samplesPerClient
}
go ingestionRoutine(&rgs[client_id], true, queries, cdf, clientTotalCmds, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann)
}

// enter the update loop
Expand Down