Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ coverage.txt
# Dependency directories (remove the comment below to include it)
vendor/

# Dist binaries
dist/

# Go sum
go.sum

Expand Down
22 changes: 18 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,33 @@ GOGET=$(GOCMD) get
GOMOD=$(GOCMD) mod
GOFMT=$(GOCMD) fmt
BIN_NAME=redisgraph-benchmark-go
DISTDIR = ./dist

# Build-time GIT variables
ifeq ($(GIT_SHA),)
GIT_SHA:=$(shell git rev-parse HEAD)
endif

ifeq ($(GIT_DIRTY),)
GIT_DIRTY:=$(shell git diff --no-ext-diff 2> /dev/null | wc -l)
endif

LDFLAGS = "-X 'main.GitSHA1=$(GIT_SHA)' -X 'main.GitDirty=$(GIT_DIRTY)'"

.PHONY: all test coverage build checkfmt fmt
all: test coverage build checkfmt fmt

build:
$(GOBUILD) .
$(GOBUILD) \
-ldflags=$(LDFLAGS) .

build-race:
$(GOBUILDRACE) .
$(GOBUILDRACE) \
-ldflags=$(LDFLAGS) .

checkfmt:
@echo 'Checking gofmt';\
bash -c "diff -u <(echo -n) <(gofmt -d .)";\
bash -c "diff -u <(echo -n) <(go fmt .)";\
EXIT_CODE=$$?;\
if [ "$$EXIT_CODE" -ne 0 ]; then \
echo '$@: Go files must be formatted with gofmt'; \
Expand All @@ -33,7 +47,7 @@ lint:
golangci-lint run

fmt:
$(GOFMT) ./...
$(GOFMT) .

get:
$(GOGET) -t -v ./...
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# redisgraph-benchmark-go

[![license](https://img.shields.io/github/license/RedisGraph/redisgraph-benchmark-go.svg)](https://github.com/RedisGraph/redisgraph-benchmark-go)
[![GitHub issues](https://img.shields.io/github/release/RedisGraph/redisgraph-benchmark-go.svg)](https://github.com/RedisGraph/redisgraph-benchmark-go/releases/latest)
[![Discord](https://img.shields.io/discord/697882427875393627?style=flat-square)](https://discord.gg/gWBRT6P)

## Overview

This repo contains code to quick benchmark RedisGraph, using the official [redisgraph-go](https://github.com/RedisGraph/redisgraph-go) client.
Expand Down
26 changes: 26 additions & 0 deletions bin_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"strconv"
"strings"
)

// Vars only for git sha and diff handling
var GitSHA1 string = ""
var GitDirty string = "0"

// internal function to return value of GitSHA1 var, which is filled in link time
func toolGitSHA1() string {
return GitSHA1
}

// this internal function will check for the number of altered lines that are not yet committed
// and return true in that case
func toolGitDirty() (dirty bool) {
dirty = false
dirtyLines, err := strconv.Atoi(strings.TrimSpace(GitDirty))
if err == nil {
dirty = (dirtyLines != 0)
}
return
}
22 changes: 21 additions & 1 deletion cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"github.com/HdrHistogram/hdrhistogram-go"
redistimeseries "github.com/RedisTimeSeries/redistimeseries-go"
"github.com/olekukonko/tablewriter"
"os"
"sync/atomic"
Expand Down Expand Up @@ -147,7 +148,7 @@ func renderGraphResultSetTable(queries []string, writer *os.File, tableTitle str
table.Render()
}

func updateCLI(startTime time.Time, tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool) bool {
func updateCLI(startTime time.Time, tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool, client *redistimeseries.Client, suffix string) bool {

start := startTime
prevTime := startTime
Expand Down Expand Up @@ -183,6 +184,25 @@ func updateCLI(startTime time.Time, tick *time.Ticker, c chan os.Signal, message
}
prevMessageCount = currentCmds
prevTime = now
if client != nil {
opts := redistimeseries.DefaultCreateOptions
for _, percentile := range []float64{0, 50.0, 95, 99, 99.9, 100.0} {
overallIncludingRTT := float64(clientSide_AllQueries_OverallLatencies.ValueAtQuantile(percentile)) / 1000.0
overallRunTimeGraph := float64(serverSide_AllQueries_GraphInternalTime_OverallLatencies.ValueAtQuantile(percentile)) / 1000.0
instantIncludingRTT := float64(clientSide_AllQueries_InstantLatencies.ValueAtQuantile(percentile)) / 1000.0
instantRunTimeGraph := float64(serverSide_AllQueries_GraphInternalTime_InstantLatencies.ValueAtQuantile(percentile)) / 1000.0
opts.Labels = map[string]string{"metric": "overallIncludingRTT"}
client.AddWithOptions(fmt.Sprintf("%s:overallIncludingRTT:p%.3f", suffix, percentile), now.UTC().Unix()*1000, overallIncludingRTT, opts)
opts.Labels = map[string]string{"metric": "overallRunTimeGraph"}
client.AddWithOptions(fmt.Sprintf("%s:overallRunTimeGraph:p%.3f", suffix, percentile), now.UTC().Unix()*1000, overallRunTimeGraph, opts)
opts.Labels = map[string]string{"metric": "instantIncludingRTT"}
client.AddWithOptions(fmt.Sprintf("%s:instantIncludingRTT:p%.3f", suffix, percentile), now.UTC().Unix()*1000, instantIncludingRTT, opts)
opts.Labels = map[string]string{"metric": "instantRunTimeGraph"}
client.AddWithOptions(fmt.Sprintf("%s:instantRunTimeGraph:p%.3f", suffix, percentile), now.UTC().Unix()*1000, instantRunTimeGraph, opts)
}
opts.Labels = map[string]string{"metric": "messageRate"}
client.AddWithOptions(fmt.Sprintf("%s:messageRate", suffix), now.UTC().Unix()*1000, messageRate, opts)
}

fmt.Printf("%25.0fs %s %25d %25d [%3.1f%%] %25.2f %19.3f (%3.3f) %20.3f (%3.3f)\t", time.Since(start).Seconds(), completionPercentStr, currentCmds, currentErrs, errorPercent, messageRate, instantP50, p50, instantP50RunTimeGraph, p50RunTimeGraph)
fmt.Printf("\r")
Expand Down
2 changes: 2 additions & 0 deletions globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var totalPropertiesSetPerQuery []uint64
var totalRelationshipsCreatedPerQuery []uint64
var totalRelationshipsDeletedPerQuery []uint64

var randIntPlaceholder string = "__rand_int__"

// no locking is required when using the histograms. data is duplicated on the instant and overall histograms
var clientSide_AllQueries_OverallLatencies *hdrhistogram.Histogram
var serverSide_AllQueries_GraphInternalTime_OverallLatencies *hdrhistogram.Histogram
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ go 1.13
require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1
github.com/RedisGraph/redisgraph-go v1.0.1-0.20210122150500-aa0feaa960ce
github.com/RedisTimeSeries/redistimeseries-go v1.4.4
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/go-cmp v0.5.4 // indirect
github.com/mitchellh/gox v1.0.1 // indirect
github.com/olekukonko/tablewriter v0.0.4
github.com/stretchr/testify v1.7.0 // indirect
github.com/tcnksm/ghr v0.13.0 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
)
3 changes: 2 additions & 1 deletion multi-query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"log"
"math"
"math/rand"
"strconv"
)
Expand Down Expand Up @@ -33,7 +34,7 @@ func prepareCommandsDistribution(cmds []string, cmdRates []float64) (int, []floa
totalRateSum += cmdRates[i]
}
// probability density function
if totalRateSum != 1.0 {
if math.Abs(1.0-totalRateSum) > 0.01 {
log.Fatalf("Total ratio should be 1.0 ( currently is %f )", totalRateSum)
}
// probability density function
Expand Down
48 changes: 39 additions & 9 deletions redisgraph-bechmark-go.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"github.com/RedisGraph/redisgraph-go"
redistimeseries "github.com/RedisTimeSeries/redistimeseries-go"
"github.com/gomodule/redigo/redis"
"golang.org/x/time/rate"
"log"
Expand All @@ -24,22 +25,52 @@ func main() {
numberRequests := flag.Uint64("n", 1000000, "Total number of requests")
debug := flag.Int("debug", 0, "Client debug level.")
randomSeed := flag.Int64("random-seed", 12345, "Random seed to use.")
randomIntMin := flag.Int64("random-int-min", 1, "__rand_int__ lower value limit. __rand_int__ distribution is uniform Random")
randomIntMax := flag.Int64("random-int-max", 1000000, "__rand_int__ upper value limit. __rand_int__ distribution is uniform Random")
graphKey := flag.String("graph-key", "graph", "graph key.")
flag.Var(&benchmarkQueries, "query", "Specify a RedisGraph query to send in quotes. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=2")
flag.Var(&benchmarkQueryRates, "query-ratio", "The query ratio vs other queries used in the same benchmark. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=10 -query=\"MATCH (n) RETURN n\" -query-ratio=1")
jsonOutputFile := flag.String("json-out-file", "benchmark-results.json", "Name of json output file to output benchmark results. If not set, will not print to json.")
//loop := flag.Bool("l", false, "Loop. Run the tests forever.")
// disabling this for now while we refactor the benchmark client (please use a very large total command number in the meantime )
// in the meantime added this two fake vars
cliUpdateTick := flag.Duration("reporting-period", time.Second*10, "Period to report stats.")
// data sink
runName := flag.String("exporter-run-name", "perf-run", "Run name.")
rtsHost := flag.String("exporter-rts-host", "127.0.0.1", "RedisTimeSeries hostname.")
rtsPort := flag.Int("exporter-rts-port", 6379, "RedisTimeSeries port.")
rtsPassword := flag.String("exporter-rts-auth", "", "RedisTimeSeries Password for Redis Auth.")
var rtsAuth *string = nil
rtsEnabled := flag.Bool("enable-exporter-rps", false, "Push results to redistimeseries exporter in real-time. Time granularity is set via the -reporting-period parameter.")
continueOnError := flag.Bool("continue-on-error", false, "Continue benchmark in case of error replies.")

var loopV = false
var loop *bool = &loopV
version := flag.Bool("v", false, "Output version and exit")
flag.Parse()
git_sha := toolGitSHA1()
git_dirty_str := ""
if toolGitDirty() {
git_dirty_str = "-dirty"
}
log.Printf("redisgraph-benchmark-go (git_sha1:%s%s)\n", git_sha, git_dirty_str)
if *version {
os.Exit(0)
}
if *rtsPassword != "" {
rtsAuth = rtsPassword
}
var rtsClient *redistimeseries.Client = nil
if *rtsEnabled == true {
log.Printf("Creating RTS client.\n")
rtsClient = redistimeseries.NewClient(fmt.Sprintf("%s:%d", *rtsHost, *rtsPort), "redisgraph-rts-client", rtsAuth)
} else {
log.Printf("RTS export disabled.\n")
}
if len(benchmarkQueries) < 1 {
log.Fatalf("You need to specify at least a query with the -query parameter. For example: -query=\"CREATE (n)\"")
}
log.Printf("Debug level: %d.\n", *debug)
log.Printf("Using random seed: %d.\n", *randomSeed)
rand.Seed(*randomSeed)
randLimit := *randomIntMax - *randomIntMin
testResult := NewTestResult("", uint(*clients), *numberRequests, uint64(*rps), "")
testResult.SetUsedRandomSeed(*randomSeed)

Expand All @@ -55,7 +86,6 @@ func main() {
var rateLimiter = rate.NewLimiter(requestRate, requestBurst)
samplesPerClient := *numberRequests / *clients
samplesPerClientRemainder := *numberRequests % *clients
client_update_tick := 1

connectionStr := fmt.Sprintf("%s:%d", *host, *port)
// a WaitGroup for the goroutines to tell us they've stopped
Expand All @@ -79,7 +109,7 @@ func main() {

// a WaitGroup for the goroutines to tell us they've stopped
dataPointProcessingWg := sync.WaitGroup{}
graphDatapointsChann := make(chan GraphQueryDatapoint, *numberRequests)
graphDatapointsChann := make(chan GraphQueryDatapoint, *clients)

// listen for C-c
c := make(chan os.Signal, 1)
Expand All @@ -98,7 +128,7 @@ func main() {
log.Println(fmt.Sprintf("Detected RedisGraph version %d\n", redisgraphVersion))
}

tick := time.NewTicker(time.Duration(client_update_tick) * time.Second)
tick := time.NewTicker(*cliUpdateTick)

dataPointProcessingWg.Add(1)
go processGraphDatapointsChannel(graphDatapointsChann, c1, *numberRequests, &dataPointProcessingWg, &instantHistogramsResetMutex)
Expand All @@ -114,11 +144,11 @@ func main() {
if uint64(client_id) == (*clients - uint64(1)) {
clientTotalCmds = samplesPerClientRemainder + samplesPerClient
}
go ingestionRoutine(&rgs[client_id], true, queries, cdf, clientTotalCmds, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann)
go ingestionRoutine(&rgs[client_id], *continueOnError, queries, cdf, *randomIntMin, randLimit, clientTotalCmds, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann)
}

// enter the update loop
updateCLI(startTime, tick, c, *numberRequests, *loop)
// enter the update loopupdateCLIupdateCLI
updateCLI(startTime, tick, c, *numberRequests, *loop, rtsClient, *runName)

endTime := time.Now()
duration := time.Since(startTime)
Expand Down
19 changes: 15 additions & 4 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@ import (
"github.com/RedisGraph/redisgraph-go"
"golang.org/x/time/rate"
"log"
"math/rand"
"strings"
"sync"
"time"
)

func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS []string, commandsCDF []float32, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) {
func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS []string, commandsCDF []float32, randomIntPadding, randomIntMax int64, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) {
defer wg.Done()
for i := 0; uint64(i) < number_samples || loop; i++ {
cmdPos := sample(commandsCDF)
sendCmdLogic(rg, cmdS[cmdPos], cmdPos, continueOnError, debug_level, useLimiter, rateLimiter, statsChannel)
sendCmdLogic(rg, cmdS[cmdPos], randomIntPadding, randomIntMax, cmdPos, continueOnError, debug_level, useLimiter, rateLimiter, statsChannel)
}
}

func sendCmdLogic(rg *redisgraph.Graph, query string, cmdPos int, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) {
func sendCmdLogic(rg *redisgraph.Graph, query string, randomIntPadding, randomIntMax int64, cmdPos int, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) {
if useRateLimiter {
r := rateLimiter.ReserveN(time.Now(), int(1))
time.Sleep(r.Delay())
}
var err error
var queryResult *redisgraph.QueryResult
processedQuery := processQuery(query, randomIntPadding, randomIntMax)
startT := time.Now()
queryResult, err = rg.Query(query)
queryResult, err = rg.Query(processedQuery)
endT := time.Now()

duration := endT.Sub(startT)
Expand Down Expand Up @@ -69,3 +72,11 @@ func sendCmdLogic(rg *redisgraph.Graph, query string, cmdPos int, continueOnErro
}
statsChannel <- datapoint
}

func processQuery(query string, randomIntPadding int64, randomIntMax int64) string {
for strings.Index(query, randIntPlaceholder) != -1 {
randIntString := fmt.Sprintf("%d", rand.Int63n(randomIntMax)+randomIntPadding)
query = strings.Replace(query, randIntPlaceholder, randIntString, 1)
}
return query
}
35 changes: 35 additions & 0 deletions workers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"math/rand"
"testing"
)

func Test_processQuery(t *testing.T) {
type args struct {
query string
randomIntPadding int64
randomIntMax int64
}
rand.Seed(12345)
tests := []struct {
name string
args args
want string
}{
{"no-replacing", args{"CREATE(n)", 0, 0}, "CREATE(n)"},
{"no-replacing", args{"ProblemList=[29849199,27107682]", 0, 0}, "ProblemList=[29849199,27107682]"},
{"no-replacing", args{"ProblemList=[29849199,__rand_int__]", 0, 1}, "ProblemList=[29849199,0]"},
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", 0, 1}, "ProblemList=[0,0]"},
{"no-replacing", args{"ProblemList=[__rand_int__,11]", 0, 1}, "ProblemList=[0,11]"},
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", 0, 10}, "ProblemList=[3,4]"},
{"no-replacing", args{"ProblemList=[__rand_int__,__rand_int__]", -1, 10}, "ProblemList=[7,-1]"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := processQuery(tt.args.query, tt.args.randomIntPadding, tt.args.randomIntMax); got != tt.want {
t.Errorf("processQuery() = %v, want %v", got, tt.want)
}
})
}
}