diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..b9bade5 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,34 @@ +# Golang CircleCI 2.0 configuration file +# +# Check https://circleci.com/docs/2.0/language-go/ for more details +version: 2.1 + +jobs: + build: + docker: + - image: circleci/golang:1.14 + + - image: redislabs/redisgraph:edge + + working_directory: /go/src/github.com/RedisGraph/redisgraph-benchmark-go + steps: + - checkout + - run: make test + - run: make flow-test + - run: bash <(curl -s https://codecov.io/bash) + +workflows: + version: 2 + commit: + jobs: + - build + nightly: + triggers: + - schedule: + cron: "0 0 * * *" + filters: + branches: + only: + - master + jobs: + - build diff --git a/.gitignore b/.gitignore index 17f277a..0353dce 100644 --- a/.gitignore +++ b/.gitignore @@ -9,12 +9,15 @@ redisgraph-benchmark-go # Test binary, built with `go test -c` *.test +# dist folder +#dist + # Output of the go coverage tool, specifically when used with LiteIDE *.out coverage.txt # Dependency directories (remove the comment below to include it) -# vendor/ + vendor/ # Go sum go.sum diff --git a/Makefile b/Makefile index 3522366..b12ada5 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,14 @@ # Go parameters GOCMD=GO111MODULE=on go GOBUILD=$(GOCMD) build +GOBUILDRACE=$(GOCMD) build -race GOINSTALL=$(GOCMD) install GOCLEAN=$(GOCMD) clean GOTEST=$(GOCMD) test GOGET=$(GOCMD) get GOMOD=$(GOCMD) mod GOFMT=$(GOCMD) fmt +BIN_NAME=redisgraph-benchmark-go .PHONY: all test coverage build checkfmt fmt all: test coverage build checkfmt fmt @@ -14,6 +16,9 @@ all: test coverage build checkfmt fmt build: $(GOBUILD) . +build-race: + $(GOBUILDRACE) . + checkfmt: @echo 'Checking gofmt';\ bash -c "diff -u <(echo -n) <(gofmt -d .)";\ @@ -40,6 +45,9 @@ test: get coverage: get test $(GOTEST) -race -coverprofile=coverage.txt -covermode=atomic . +flow-test: build-race + ./$(BIN_NAME) -n 100000 -query "CREATE(n)" -query-ratio 1 -query "MATCH (n) RETURN n LIMIT 1" -query-ratio 2 + release: $(GOGET) github.com/mitchellh/gox $(GOGET) github.com/tcnksm/ghr diff --git a/cli.go b/cli.go new file mode 100644 index 0000000..dac6941 --- /dev/null +++ b/cli.go @@ -0,0 +1,202 @@ +package main + +import ( + "fmt" + "github.com/HdrHistogram/hdrhistogram-go" + "github.com/olekukonko/tablewriter" + "os" + "sync/atomic" + "time" +) + +type arrayStringParameters []string + +func (i *arrayStringParameters) String() string { + return "my string representation" +} + +func (i *arrayStringParameters) Set(value string) error { + *i = append(*i, value) + return nil +} + +func printFinalSummary(queries []string, queryRates []int, totalMessages uint64, duration time.Duration) { + writer := os.Stdout + messageRate := float64(totalMessages) / float64(duration.Seconds()) + + fmt.Printf("\n") + fmt.Printf("################# RUNTIME STATS #################\n") + fmt.Printf("Total Duration %.3f Seconds\n", duration.Seconds()) + fmt.Printf("Total Commands issued %d\n", totalCommands) + fmt.Printf("Total Errors %d ( %3.3f %%)\n", totalErrors, float64(totalErrors/totalCommands*100.0)) + fmt.Printf("Throughput summary: %.0f requests per second\n", messageRate) + renderGraphResultSetTable(queries, writer, "## Overall RedisGraph resultset stats table\n") + renderGraphInternalExecutionTimeTable(queries, writer, "## Overall RedisGraph Internal Execution Time summary table\n", serverSide_PerQuery_GraphInternalTime_OverallLatencies, serverSide_AllQueries_GraphInternalTime_OverallLatencies) + renderTable(queries, writer, "## Overall Client Latency summary table\n", true, true, errorsPerQuery, duration, clientSide_PerQuery_OverallLatencies, clientSide_AllQueries_OverallLatencies) +} + +func renderTable(queries []string, writer *os.File, tableTitle string, includeCalls bool, includeErrors bool, errorSlice []uint64, duration time.Duration, detailedHistogram []*hdrhistogram.Histogram, overallHistogram *hdrhistogram.Histogram) { + fmt.Fprintf(writer, tableTitle) + data := make([][]string, len(queries)+1) + for i := 0; i < len(queries); i++ { + insertTableLine(queries[i], data, i, includeCalls, includeErrors, errorSlice, duration, detailedHistogram[i]) + } + insertTableLine("Total", data, len(queries), includeCalls, includeErrors, errorSlice, duration, overallHistogram) + table := tablewriter.NewWriter(writer) + initialHeader := []string{"Query"} + if includeCalls { + initialHeader = append(initialHeader, "Ops/sec") + initialHeader = append(initialHeader, "Total Calls") + } + if includeErrors { + initialHeader = append(initialHeader, "Total Errors") + } + initialHeader = append(initialHeader, "Avg. latency(ms)", "p50 latency(ms)", "p95 latency(ms)", "p99 latency(ms)") + table.SetHeader(initialHeader) + table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false}) + table.SetCenterSeparator("|") + table.AppendBulk(data) + table.Render() +} +func renderGraphInternalExecutionTimeTable(queries []string, writer *os.File, tableTitle string, detailedHistogram []*hdrhistogram.Histogram, overallHistogram *hdrhistogram.Histogram) { + fmt.Fprintf(writer, tableTitle) + initialHeader := []string{"Query", " Internal Avg. latency(ms)", "Internal p50 latency(ms)", "Internal p95 latency(ms)", "Internal p99 latency(ms)"} + data := make([][]string, len(queries)+1) + i := 0 + for i = 0; i < len(queries); i++ { + data[i] = make([]string, 5) + data[i][0] = queries[i] + data[i][1] = fmt.Sprintf("%.3f", float64(detailedHistogram[i].Mean()/1000.0)) + data[i][2] = fmt.Sprintf("%.3f", float64(detailedHistogram[i].ValueAtQuantile(50.0))/1000.0) + data[i][3] = fmt.Sprintf("%.3f", float64(detailedHistogram[i].ValueAtQuantile(95.0))/1000.0) + data[i][4] = fmt.Sprintf("%.3f", float64(detailedHistogram[i].ValueAtQuantile(99.0))/1000.0) + } + data[i] = make([]string, 5) + data[i][0] = "Total" + data[i][1] = fmt.Sprintf("%.3f", float64(overallHistogram.Mean()/1000.0)) + data[i][2] = fmt.Sprintf("%.3f", float64(overallHistogram.ValueAtQuantile(50.0))/1000.0) + data[i][3] = fmt.Sprintf("%.3f", float64(overallHistogram.ValueAtQuantile(95.0))/1000.0) + data[i][4] = fmt.Sprintf("%.3f", float64(overallHistogram.ValueAtQuantile(99.0))/1000.0) + table := tablewriter.NewWriter(writer) + table.SetHeader(initialHeader) + table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false}) + table.SetCenterSeparator("|") + table.AppendBulk(data) // Add Bulk Data + table.Render() +} + +func insertTableLine(queryName string, data [][]string, i int, includeCalls, includeErrors bool, errorsSlice []uint64, duration time.Duration, histogram *hdrhistogram.Histogram) { + data[i] = make([]string, 5) + latencyPadding := 0 + data[i][0] = queryName + if includeCalls { + totalCmds := histogram.TotalCount() + cmdRate := float64(totalCmds) / float64(duration.Seconds()) + data[i][1] = fmt.Sprintf("%.f", cmdRate) + data[i][2] = fmt.Sprintf("%d", histogram.TotalCount()) + data[i] = append(data[i], "", "") + latencyPadding += 2 + + } + if includeErrors { + var errorV uint64 + // total errors + if i == (len(data) - 1) { + errorV = totalErrors + } else { + errorV = errorsSlice[i] + } + data[i][1+latencyPadding] = fmt.Sprintf("%d", errorV) + data[i] = append(data[i], "") + latencyPadding++ + } + data[i][1+latencyPadding] = fmt.Sprintf("%.3f", float64(histogram.Mean()/1000.0)) + data[i][2+latencyPadding] = fmt.Sprintf("%.3f", float64(histogram.ValueAtQuantile(50.0))/1000.0) + data[i][3+latencyPadding] = fmt.Sprintf("%.3f", float64(histogram.ValueAtQuantile(95.0))/1000.0) + data[i][4+latencyPadding] = fmt.Sprintf("%.3f", float64(histogram.ValueAtQuantile(99.0))/1000.0) +} + +func renderGraphResultSetTable(queries []string, writer *os.File, tableTitle string) { + fmt.Fprintf(writer, tableTitle) + initialHeader := []string{"Query", "Nodes created", "Nodes deleted", "Labels added", "Properties set", " Relationships created", " Relationships deleted"} + data := make([][]string, len(queries)+1) + i := 0 + for i = 0; i < len(queries); i++ { + data[i] = make([]string, 7) + data[i][0] = queries[i] + data[i][1] = fmt.Sprintf("%d", totalNodesCreatedPerQuery[i]) + data[i][2] = fmt.Sprintf("%d", totalNodesDeletedPerQuery[i]) + data[i][3] = fmt.Sprintf("%d", totalLabelsAddedPerQuery[i]) + data[i][4] = fmt.Sprintf("%d", totalPropertiesSetPerQuery[i]) + data[i][5] = fmt.Sprintf("%d", totalRelationshipsCreatedPerQuery[i]) + data[i][6] = fmt.Sprintf("%d", totalRelationshipsDeletedPerQuery[i]) + } + data[i] = make([]string, 7) + data[i][0] = "Total" + data[i][1] = fmt.Sprintf("%d", totalNodesCreated) + data[i][2] = fmt.Sprintf("%d", totalNodesDeleted) + data[i][3] = fmt.Sprintf("%d", totalLabelsAdded) + data[i][4] = fmt.Sprintf("%d", totalPropertiesSet) + data[i][5] = fmt.Sprintf("%d", totalRelationshipsCreated) + data[i][6] = fmt.Sprintf("%d", totalRelationshipsDeleted) + table := tablewriter.NewWriter(writer) + table.SetHeader(initialHeader) + table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false}) + table.SetCenterSeparator("|") + table.AppendBulk(data) // Add Bulk Data + table.Render() +} + +func updateCLI(startTime time.Time, tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool) bool { + + start := startTime + prevTime := startTime + prevMessageCount := uint64(0) + var currentCmds uint64 + var currentErrs uint64 + messageRateTs := []float64{} + fmt.Printf("%26s %7s %25s %25s %7s %25s %25s %26s\n", "Test time", " ", "Total Commands", "Total Errors", "", "Command Rate", "Client p50 with RTT(ms)", "Graph Internal Time p50 (ms)") + for { + select { + case <-tick.C: + { + now := time.Now() + took := now.Sub(prevTime) + currentCmds = atomic.LoadUint64(&totalCommands) + currentErrs = atomic.LoadUint64(&totalErrors) + messageRate := calculateRateMetrics(int64(currentCmds), int64(prevMessageCount), took) + completionPercentStr := "[----%]" + if !loop { + completionPercent := float64(currentCmds) / float64(message_limit) * 100.0 + completionPercentStr = fmt.Sprintf("[%3.1f%%]", completionPercent) + } + errorPercent := float64(currentErrs) / float64(currentCmds) * 100.0 + + instantHistogramsResetMutex.Lock() + p50 := float64(clientSide_AllQueries_OverallLatencies.ValueAtQuantile(50.0)) / 1000.0 + p50RunTimeGraph := float64(serverSide_AllQueries_GraphInternalTime_OverallLatencies.ValueAtQuantile(50.0)) / 1000.0 + instantP50 := float64(clientSide_AllQueries_InstantLatencies.ValueAtQuantile(50.0)) / 1000.0 + instantP50RunTimeGraph := float64(serverSide_AllQueries_GraphInternalTime_InstantLatencies.ValueAtQuantile(50.0)) / 1000.0 + instantHistogramsResetMutex.Unlock() + if currentCmds != 0 { + messageRateTs = append(messageRateTs, messageRate) + } + prevMessageCount = currentCmds + prevTime = now + + fmt.Printf("%25.0fs %s %25d %25d [%3.1f%%] %25.2f %19.3f (%3.3f) %20.3f (%3.3f)\t", time.Since(start).Seconds(), completionPercentStr, currentCmds, currentErrs, errorPercent, messageRate, instantP50, p50, instantP50RunTimeGraph, p50RunTimeGraph) + fmt.Printf("\r") + if message_limit > 0 && currentCmds >= message_limit && !loop { + return true + } + // The locks we acquire here do not affect the clients + resetInstantHistograms() + break + } + + case <-c: + fmt.Println("\nReceived Ctrl-c - shutting down cli updater go-routine") + return false + } + } +} diff --git a/dist/redisgraph-benchmark-go_darwin_amd64 b/dist/redisgraph-benchmark-go_darwin_amd64 deleted file mode 100755 index db94a46..0000000 Binary files a/dist/redisgraph-benchmark-go_darwin_amd64 and /dev/null differ diff --git a/dist/redisgraph-benchmark-go_linux_amd64 b/dist/redisgraph-benchmark-go_linux_amd64 deleted file mode 100755 index 958c0e0..0000000 Binary files a/dist/redisgraph-benchmark-go_linux_amd64 and /dev/null differ diff --git a/globals.go b/globals.go new file mode 100644 index 0000000..682ec19 --- /dev/null +++ b/globals.go @@ -0,0 +1,73 @@ +package main + +import ( + "github.com/HdrHistogram/hdrhistogram-go" + "golang.org/x/time/rate" + "math" + "sync" +) + +var totalCommands uint64 +var totalEmptyResultsets uint64 +var totalErrors uint64 +var errorsPerQuery []uint64 + +var totalNodesCreated uint64 +var totalNodesDeleted uint64 +var totalLabelsAdded uint64 +var totalPropertiesSet uint64 +var totalRelationshipsCreated uint64 +var totalRelationshipsDeleted uint64 + +var totalNodesCreatedPerQuery []uint64 +var totalNodesDeletedPerQuery []uint64 +var totalLabelsAddedPerQuery []uint64 +var totalPropertiesSetPerQuery []uint64 +var totalRelationshipsCreatedPerQuery []uint64 +var totalRelationshipsDeletedPerQuery []uint64 + +// no locking is required when using the histograms. data is duplicated on the instant and overall histograms +var clientSide_AllQueries_OverallLatencies *hdrhistogram.Histogram +var serverSide_AllQueries_GraphInternalTime_OverallLatencies *hdrhistogram.Histogram + +var clientSide_PerQuery_OverallLatencies []*hdrhistogram.Histogram +var serverSide_PerQuery_GraphInternalTime_OverallLatencies []*hdrhistogram.Histogram + +// this mutex does not affect any of the client go-routines ( it's only to sync between main thread and datapoints processer go-routines ) +var instantHistogramsResetMutex sync.Mutex +var clientSide_AllQueries_InstantLatencies *hdrhistogram.Histogram +var serverSide_AllQueries_GraphInternalTime_InstantLatencies *hdrhistogram.Histogram + +var benchmarkQueries arrayStringParameters +var benchmarkQueryRates arrayStringParameters + +const Inf = rate.Limit(math.MaxFloat64) + +func createRequiredGlobalStructs(totalDifferentCommands int) { + errorsPerQuery = make([]uint64, totalDifferentCommands) + totalNodesCreatedPerQuery = make([]uint64, totalDifferentCommands) + totalNodesDeletedPerQuery = make([]uint64, totalDifferentCommands) + totalLabelsAddedPerQuery = make([]uint64, totalDifferentCommands) + totalPropertiesSetPerQuery = make([]uint64, totalDifferentCommands) + totalRelationshipsCreatedPerQuery = make([]uint64, totalDifferentCommands) + totalRelationshipsDeletedPerQuery = make([]uint64, totalDifferentCommands) + + clientSide_AllQueries_OverallLatencies = hdrhistogram.New(1, 90000000000, 4) + clientSide_AllQueries_InstantLatencies = hdrhistogram.New(1, 90000000000, 4) + serverSide_AllQueries_GraphInternalTime_OverallLatencies = hdrhistogram.New(1, 90000000000, 4) + serverSide_AllQueries_GraphInternalTime_InstantLatencies = hdrhistogram.New(1, 90000000000, 4) + + clientSide_PerQuery_OverallLatencies = make([]*hdrhistogram.Histogram, totalDifferentCommands) + serverSide_PerQuery_GraphInternalTime_OverallLatencies = make([]*hdrhistogram.Histogram, totalDifferentCommands) + for i := 0; i < totalDifferentCommands; i++ { + clientSide_PerQuery_OverallLatencies[i] = hdrhistogram.New(1, 90000000000, 4) + serverSide_PerQuery_GraphInternalTime_OverallLatencies[i] = hdrhistogram.New(1, 90000000000, 4) + } +} + +func resetInstantHistograms() { + instantHistogramsResetMutex.Lock() + clientSide_AllQueries_InstantLatencies.Reset() + serverSide_AllQueries_GraphInternalTime_InstantLatencies.Reset() + instantHistogramsResetMutex.Unlock() +} diff --git a/go.mod b/go.mod index b3fc469..1e7cb66 100644 --- a/go.mod +++ b/go.mod @@ -7,5 +7,7 @@ require ( github.com/RedisGraph/redisgraph-go v1.0.1-0.20210122150500-aa0feaa960ce github.com/gomodule/redigo v2.0.0+incompatible github.com/google/go-cmp v0.5.4 // indirect + github.com/olekukonko/tablewriter v0.0.4 + github.com/stretchr/testify v1.7.0 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 ) diff --git a/multi-query.go b/multi-query.go new file mode 100644 index 0000000..594b358 --- /dev/null +++ b/multi-query.go @@ -0,0 +1,51 @@ +package main + +import ( + "log" + "math/rand" + "strconv" +) + +func sample(cdf []float32) int { + r := rand.Float32() + bucket := 0 + for r > cdf[bucket] { + bucket++ + } + return bucket +} + +func prepareCommandsDistribution(cmds []string, cmdRates []int) (int, []float32) { + var totalRates int = 0 + var totalDifferentCommands = len(cmds) + var err error + for i, rawCmdString := range benchmarkQueries { + cmds[i] = rawCmdString + if i >= len(benchmarkQueryRates) { + cmdRates[i] = 1 + + } else { + cmdRates[i], err = strconv.Atoi(benchmarkQueryRates[i]) + if err != nil { + log.Fatalf("Error while converting query-rate param %s: %v", benchmarkQueryRates[i], err) + } + } + totalRates += cmdRates[i] + } + // probability density function + if len(benchmarkQueryRates) > 0 && (len(benchmarkQueryRates) != len(benchmarkQueries)) { + log.Fatalf("When specifiying -query-rate parameter, you need to have the same number of -query and -query-rate parameters. Number of time -query ( %d ) != Number of times -query-params ( %d )", len(benchmarkQueries), len(benchmarkQueryRates)) + } + pdf := make([]float32, len(benchmarkQueries)) + cdf := make([]float32, len(benchmarkQueries)) + for i := 0; i < len(cmdRates); i++ { + pdf[i] = float32(cmdRates[i]) / float32(totalRates) + cdf[i] = 0 + } + // get cdf + cdf[0] = pdf[0] + for i := 1; i < len(cmdRates); i++ { + cdf[i] = cdf[i-1] + pdf[i] + } + return totalDifferentCommands, cdf +} diff --git a/redisgraph-bechmark-go.go b/redisgraph-bechmark-go.go index 80008cd..d388f62 100644 --- a/redisgraph-bechmark-go.go +++ b/redisgraph-bechmark-go.go @@ -3,101 +3,18 @@ package main import ( "flag" "fmt" - hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" "github.com/RedisGraph/redisgraph-go" "github.com/gomodule/redigo/redis" "golang.org/x/time/rate" "log" - "math" + "math/rand" "os" "os/signal" "strings" "sync" - "sync/atomic" "time" ) -var totalCommands uint64 -var totalEmptyResultsets uint64 -var totalErrors uint64 - -var totalNodesCreated uint64 -var totalNodesDeleted uint64 -var totalLabelsAdded uint64 -var totalPropertiesSet uint64 -var totalRelationshipsCreated uint64 -var totalRelationshipsDeleted uint64 - -var latencies *hdrhistogram.Histogram -var instantLatencies *hdrhistogram.Histogram -var graphRunTimeLatencies *hdrhistogram.Histogram -var instantGraphRunTimeLatencies *hdrhistogram.Histogram - -const Inf = rate.Limit(math.MaxFloat64) - -func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS string, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter) { - defer wg.Done() - for i := 0; uint64(i) < number_samples || loop; i++ { - sendCmdLogic(rg, cmdS, continueOnError, debug_level, useLimiter, rateLimiter) - } -} - -func sendCmdLogic(rg *redisgraph.Graph, query string, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) { - if useRateLimiter { - r := rateLimiter.ReserveN(time.Now(), int(1)) - time.Sleep(r.Delay()) - } - var err error - var queryResult *redisgraph.QueryResult - startT := time.Now() - queryResult, err = rg.Query(query) - endT := time.Now() - atomic.AddUint64(&totalCommands, uint64(1)) - duration := endT.Sub(startT) - if err != nil { - if continueOnError { - atomic.AddUint64(&totalErrors, uint64(1)) - if debug_level > 0 { - log.Println(fmt.Sprintf("Received an error with the following query(s): %v, error: %v", query, err)) - } - } else { - log.Fatalf("Received an error with the following query(s): %v, error: %v", query, err) - } - } else { - err = graphRunTimeLatencies.RecordValue(int64(queryResult.InternalExecutionTime() * 1000.0)) - if err != nil { - log.Fatalf("Received an error while recording RedisGraph InternalExecutionTime latencies: %v", err) - } - err = instantGraphRunTimeLatencies.RecordValue(int64(queryResult.InternalExecutionTime() * 1000.0)) - if err != nil { - log.Fatalf("Received an error while recording RedisGraph instant (last sec ) InternalExecutionTime latencies: %v", err) - } - if debug_level > 1 { - fmt.Printf("Issued query: %s\n", query) - fmt.Printf("Pretty printing result:\n") - queryResult.PrettyPrint() - fmt.Printf("\n") - } - if queryResult.Empty() { - atomic.AddUint64(&totalEmptyResultsets, uint64(1)) - } - atomic.AddUint64(&totalNodesCreated, uint64(queryResult.NodesCreated())) - atomic.AddUint64(&totalNodesDeleted, uint64(queryResult.NodesDeleted())) - atomic.AddUint64(&totalLabelsAdded, uint64(queryResult.LabelsAdded())) - atomic.AddUint64(&totalPropertiesSet, uint64(queryResult.PropertiesSet())) - atomic.AddUint64(&totalRelationshipsCreated, uint64(queryResult.RelationshipsCreated())) - atomic.AddUint64(&totalRelationshipsDeleted, uint64(queryResult.RelationshipsDeleted())) - } - err = latencies.RecordValue(duration.Microseconds()) - if err != nil { - log.Fatalf("Received an error while recording latencies: %v", err) - } - err = instantLatencies.RecordValue(duration.Microseconds()) - if err != nil { - log.Fatalf("Received an error while recording latencies: %v", err) - } -} - func main() { host := flag.String("h", "127.0.0.1", "Server hostname.") port := flag.Int("p", 6379, "Server port.") @@ -106,14 +23,25 @@ func main() { clients := flag.Uint64("c", 50, "number of clients.") numberRequests := flag.Uint64("n", 1000000, "Total number of requests") debug := flag.Int("debug", 0, "Client debug level.") - loop := flag.Bool("l", false, "Loop. Run the tests forever.") + randomSeed := flag.Int64("random-seed", 12345, "Random seed to use.") graphKey := flag.String("graph-key", "graph", "graph key.") + flag.Var(&benchmarkQueries, "query", "Specify a RedisGraph query to send in quotes. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=2") + flag.Var(&benchmarkQueryRates, "query-ratio", "The query ratio vs other queries used in the same benchmark. Each command that you specify is run with its ratio. For example: -query=\"CREATE (n)\" -query-ratio=10 -query=\"MATCH (n) RETURN n\" -query-ratio=1") + jsonOutputFile := flag.String("json-out-file", "benchmark-results.json", "Name of json output file to output benchmark results. If not set, will not print to json.") + //loop := flag.Bool("l", false, "Loop. Run the tests forever.") + // disabling this for now while we refactor the benchmark client (please use a very large total command number in the meantime ) + // in the meantime added this two fake vars + var loopV = false + var loop *bool = &loopV flag.Parse() - args := flag.Args() - if len(args) < 1 { - log.Fatalf("You need to specify a query after the flag command arguments.") + if len(benchmarkQueries) < 1 { + log.Fatalf("You need to specify at least a query with the -query parameter. For example: -query=\"CREATE (n)\"") } fmt.Printf("Debug level: %d.\n", *debug) + fmt.Printf("Using random seed: %d.\n", *randomSeed) + rand.Seed(*randomSeed) + testResult := NewTestResult("", uint(*clients), *numberRequests, uint64(*rps), "") + testResult.SetUsedRandomSeed(*randomSeed) var requestRate = Inf var requestBurst = 1 @@ -127,12 +55,8 @@ func main() { var rateLimiter = rate.NewLimiter(requestRate, requestBurst) samplesPerClient := *numberRequests / *clients client_update_tick := 1 - latencies = hdrhistogram.New(1, 90000000000, 3) - instantLatencies = hdrhistogram.New(1, 90000000000, 3) - graphRunTimeLatencies = hdrhistogram.New(1, 90000000000, 3) - instantGraphRunTimeLatencies = hdrhistogram.New(1, 90000000000, 3) + connectionStr := fmt.Sprintf("%s:%d", *host, *port) - stopChan := make(chan struct{}) // a WaitGroup for the goroutines to tell us they've stopped wg := sync.WaitGroup{} if !*loop { @@ -140,119 +64,64 @@ func main() { } else { fmt.Printf("Running in loop until you hit Ctrl+C\n") } - query := strings.Join(args, " ") - rgs := make([]redisgraph.Graph, *clients, *clients) - conns := make([]redis.Conn, *clients, *clients) + queries := make([]string, len(benchmarkQueries)) + cmdRates := make([]int, len(benchmarkQueries)) + totalDifferentCommands, cdf := prepareCommandsDistribution(queries, cmdRates) - for client_id := 0; uint64(client_id) < *clients; client_id++ { - wg.Add(1) - cmd := make([]string, len(args)) - copy(cmd, args) - rgs[client_id], conns[client_id] = getStandaloneConn(*graphKey, "tcp", connectionStr, *password) - go ingestionRoutine(&rgs[client_id], true, query, samplesPerClient, *loop, *debug, &wg, useRateLimiter, rateLimiter) - } + createRequiredGlobalStructs(totalDifferentCommands) + + rgs := make([]redisgraph.Graph, *clients) + conns := make([]redis.Conn, *clients) + + // a WaitGroup for the goroutines to tell us they've stopped + dataPointProcessingWg := sync.WaitGroup{} + graphDatapointsChann := make(chan GraphQueryDatapoint, *numberRequests) // listen for C-c c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) + c1 := make(chan os.Signal, 1) + signal.Notify(c1, os.Interrupt) + tick := time.NewTicker(time.Duration(client_update_tick) * time.Second) - // enter the update loop - closed, _, duration, totalMessages, _ := updateCLI(tick, c, *numberRequests, *loop) + dataPointProcessingWg.Add(1) + go processGraphDatapointsChannel(graphDatapointsChann, c1, *numberRequests, &dataPointProcessingWg, &instantHistogramsResetMutex) - // benchmarked ended, close the connections - for _, standaloneConn := range conns { - standaloneConn.Close() + startTime := time.Now() + for client_id := 0; uint64(client_id) < *clients; client_id++ { + wg.Add(1) + rgs[client_id], conns[client_id] = getStandaloneConn(*graphKey, "tcp", connectionStr, *password) + go ingestionRoutine(&rgs[client_id], true, queries, cdf, samplesPerClient, *loop, *debug, &wg, useRateLimiter, rateLimiter, graphDatapointsChann) } - messageRate := float64(totalMessages) / float64(duration.Seconds()) - p50IngestionMs := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 - p95IngestionMs := float64(latencies.ValueAtQuantile(95.0)) / 1000.0 - p99IngestionMs := float64(latencies.ValueAtQuantile(99.0)) / 1000.0 - graph_p50IngestionMs := float64(graphRunTimeLatencies.ValueAtQuantile(50.0)) / 1000.0 - graph_p95IngestionMs := float64(graphRunTimeLatencies.ValueAtQuantile(95.0)) / 1000.0 - graph_p99IngestionMs := float64(graphRunTimeLatencies.ValueAtQuantile(99.0)) / 1000.0 + // enter the update loop + updateCLI(startTime, tick, c, *numberRequests, *loop) - fmt.Printf("\n") - fmt.Printf("################# RUNTIME STATS #################\n") - fmt.Printf("Total Duration %.3f Seconds\n", duration.Seconds()) - fmt.Printf("Total Commands issued %d\n", totalCommands) - fmt.Printf("Total Errors %d ( %3.3f %%)\n", totalErrors, float64(totalErrors/totalCommands*100.0)) - fmt.Printf("Throughput summary: %.0f requests per second\n", messageRate) - fmt.Printf("Overall Client Latency summary (msec):\n") - fmt.Printf(" %9s %9s %9s\n", "p50", "p95", "p99") - fmt.Printf(" %9.3f %9.3f %9.3f\n", p50IngestionMs, p95IngestionMs, p99IngestionMs) - fmt.Printf("################## GRAPH STATS ##################\n") - fmt.Printf("Total Empty resultsets %d ( %3.3f %%)\n", totalEmptyResultsets, float64(totalEmptyResultsets/totalCommands*100.0)) - fmt.Printf("Total Nodes created %d\n", totalNodesCreated) - fmt.Printf("Total Nodes deleted %d\n", totalNodesDeleted) - fmt.Printf("Total Labels added %d\n", totalLabelsAdded) - fmt.Printf("Total Properties set %d\n", totalPropertiesSet) - fmt.Printf("Total Relationships created %d\n", totalRelationshipsCreated) - fmt.Printf("Total Relationships deleted %d\n", totalRelationshipsDeleted) - fmt.Printf("Overall RedisGraph Internal Execution time Latency summary (msec):\n") - fmt.Printf(" %9s %9s %9s\n", "p50", "p95", "p99") - fmt.Printf(" %9.3f %9.3f %9.3f\n", graph_p50IngestionMs, graph_p95IngestionMs, graph_p99IngestionMs) + endTime := time.Now() + duration := time.Since(startTime) - if closed { - return + // benchmarked ended, close the connections + for _, standaloneConn := range conns { + standaloneConn.Close() } - // tell the goroutine to stop - close(stopChan) - // and wait for them both to reply back - wg.Wait() -} - -func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool) (bool, time.Time, time.Duration, uint64, []float64) { - - start := time.Now() - prevTime := time.Now() - prevMessageCount := uint64(0) - messageRateTs := []float64{} - fmt.Printf("%26s %7s %25s %25s %7s %25s %25s %26s\n", "Test time", " ", "Total Commands", "Total Errors", "", "Command Rate", "Client p50 with RTT(ms)", "Graph Internal Time p50 (ms)") - for { - select { - case <-tick.C: - { - now := time.Now() - took := now.Sub(prevTime) - messageRate := float64(totalCommands-prevMessageCount) / float64(took.Seconds()) - completionPercentStr := "[----%]" - if !loop { - completionPercent := float64(totalCommands) / float64(message_limit) * 100.0 - completionPercentStr = fmt.Sprintf("[%3.1f%%]", completionPercent) - } - errorPercent := float64(totalErrors) / float64(totalCommands) * 100.0 - - p50 := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 - p50RunTimeGraph := float64(graphRunTimeLatencies.ValueAtQuantile(50.0)) / 1000.0 - instantP50 := float64(instantLatencies.ValueAtQuantile(50.0)) / 1000.0 - instantP50RunTimeGraph := float64(instantGraphRunTimeLatencies.ValueAtQuantile(50.0)) / 1000.0 - instantGraphRunTimeLatencies.Reset() - instantLatencies.Reset() - if prevMessageCount == 0 && totalCommands != 0 { - start = time.Now() - } - if totalCommands != 0 { - messageRateTs = append(messageRateTs, messageRate) - } - prevMessageCount = totalCommands - prevTime = now + //wait for all stats to be processed + dataPointProcessingWg.Wait() - fmt.Printf("%25.0fs %s %25d %25d [%3.1f%%] %25.2f %19.3f (%3.3f) %20.3f (%3.3f)\t", time.Since(start).Seconds(), completionPercentStr, totalCommands, totalErrors, errorPercent, messageRate, instantP50, p50, instantP50RunTimeGraph, p50RunTimeGraph) - fmt.Printf("\r") - if message_limit > 0 && totalCommands >= uint64(message_limit) && !loop { - return true, start, time.Since(start), totalCommands, messageRateTs - } + testResult.FillDurationInfo(startTime, endTime, duration) + testResult.BenchmarkFullyRun = totalCommands == *numberRequests + testResult.IssuedCommands = totalCommands + testResult.OverallGraphInternalQuantiles = GetOverallQuantiles(queries, serverSide_PerQuery_GraphInternalTime_OverallLatencies, serverSide_AllQueries_GraphInternalTime_OverallLatencies) + testResult.OverallClientQuantiles = GetOverallQuantiles(queries, clientSide_PerQuery_OverallLatencies, clientSide_AllQueries_OverallLatencies) + testResult.OverallQueryRates = GetOverallRatesMap(duration, queries, clientSide_PerQuery_OverallLatencies, clientSide_AllQueries_OverallLatencies) + testResult.Totals = GetTotalsMap(queries, clientSide_PerQuery_OverallLatencies, clientSide_AllQueries_OverallLatencies, errorsPerQuery, totalNodesCreatedPerQuery, totalNodesDeletedPerQuery, totalLabelsAddedPerQuery, totalPropertiesSetPerQuery, totalRelationshipsCreatedPerQuery, totalRelationshipsDeletedPerQuery) - break - } + // final merge of pending stats + printFinalSummary(queries, cmdRates, totalCommands, duration) - case <-c: - fmt.Println("\nreceived Ctrl-c - shutting down") - return true, start, time.Since(start), totalCommands, messageRateTs - } + if strings.Compare(*jsonOutputFile, "") != 0 { + saveJsonResult(testResult, jsonOutputFile) } } diff --git a/standalone_conn.go b/standalone_conn.go index c60c53a..7e5e8ef 100644 --- a/standalone_conn.go +++ b/standalone_conn.go @@ -7,7 +7,7 @@ import ( ) func getStandaloneConn(graphName, network, addr string, password string) (graph rg.Graph, conn redis.Conn) { - var err error = nil + var err error if password != "" { conn, err = redis.Dial(network, addr, redis.DialPassword(password)) } else { @@ -16,6 +16,5 @@ func getStandaloneConn(graphName, network, addr string, password string) (graph if err != nil { log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err) } - graph = rg.GraphNew(graphName, conn) - return graph, conn + return rg.GraphNew(graphName, conn), conn } diff --git a/test_result.go b/test_result.go new file mode 100644 index 0000000..208ad7e --- /dev/null +++ b/test_result.go @@ -0,0 +1,237 @@ +package main + +import ( + "encoding/json" + "fmt" + "github.com/HdrHistogram/hdrhistogram-go" + "io/ioutil" + "log" + "os" + "sync" + "sync/atomic" + "time" +) + +const resultFormatVersion = "0.0.1" + +type GraphQueryDatapoint struct { + CmdPos int // command that was used + ClientDurationMicros int64 + GraphInternalDurationMicros int64 + Error bool + Empty bool + NodesCreated uint64 + NodesDeleted uint64 + LabelsAdded uint64 + PropertiesSet uint64 + RelationshipsCreated uint64 + RelationshipsDeleted uint64 +} + +type TestResult struct { + + // Test Configs + resultFormatVersion string `json:"ResultFormatVersion"` + Metadata string `json:"Metadata"` + Clients uint `json:"Clients"` + MaxRps uint64 `json:"MaxRps"` + RandomSeed int64 `json:"RandomSeed"` + BenchmarkConfiguredCommandsLimit uint64 `json:"BenchmarkConfiguredCommandsLimit"` + IssuedCommands uint64 `json:"IssuedCommands"` + BenchmarkFullyRun bool `json:"BenchmarkFullyRun"` + + // Test Description + TestDescription string `json:"TestDescription"` + + // DB Spefic Configs + DBSpecificConfigs map[string]interface{} `json:"DBSpecificConfigs"` + + StartTime int64 `json:"StartTime"` + EndTime int64 `json:"EndTime"` + DurationMillis int64 `json:"DurationMillis"` + + // Populated after benchmark + // Benchmark Totals + Totals map[string]interface{} `json:"Totals"` + + // Overall Rates + OverallQueryRates map[string]interface{} `json:"OverallQueryRates"` + + // Overall Client Quantiles + OverallClientQuantiles map[string]interface{} `json:"OverallClientQuantiles"` + + // Overall Graph Internal Quantiles + OverallGraphInternalQuantiles map[string]interface{} `json:"OverallGraphInternalQuantiles"` + + // Per second ( tick ) client stats + ClientRunTimeStats map[int64]interface{} `json:"ClientRunTimeStats"` + + // Per second ( tick ) server stats + ServerRunTimeStats map[int64]interface{} `json:"ServerRunTimeStats"` +} + +func NewTestResult(metadata string, clients uint, commandsLimit uint64, maxRps uint64, testDescription string) *TestResult { + return &TestResult{resultFormatVersion: resultFormatVersion, BenchmarkConfiguredCommandsLimit: commandsLimit, BenchmarkFullyRun: false, Metadata: metadata, Clients: clients, MaxRps: maxRps, TestDescription: testDescription} +} + +func (r *TestResult) SetUsedRandomSeed(seed int64) *TestResult { + r.RandomSeed = seed + return r +} + +func (r *TestResult) FillDurationInfo(startTime time.Time, endTime time.Time, duration time.Duration) { + r.StartTime = startTime.Unix() + r.EndTime = endTime.Unix() + r.DurationMillis = duration.Milliseconds() +} + +func processGraphDatapointsChannel(graphStatsChann chan GraphQueryDatapoint, c chan os.Signal, numberRequests uint64, wg *sync.WaitGroup, instantMutex *sync.Mutex) { + defer wg.Done() + var totalProcessedCommands uint64 = 0 + for { + select { + case dp := <-graphStatsChann: + { + cmdPos := dp.CmdPos + clientDurationMicros := dp.ClientDurationMicros + instantMutex.Lock() + clientSide_PerQuery_OverallLatencies[cmdPos].RecordValue(clientDurationMicros) + clientSide_AllQueries_OverallLatencies.RecordValue(clientDurationMicros) + graphInternalDurationMicros := dp.GraphInternalDurationMicros + serverSide_PerQuery_GraphInternalTime_OverallLatencies[cmdPos].RecordValue(graphInternalDurationMicros) + serverSide_AllQueries_GraphInternalTime_OverallLatencies.RecordValue(graphInternalDurationMicros) + instantMutex.Unlock() + // Only needs to be atomic due to CLI print + atomic.AddUint64(&totalCommands, uint64(1)) + if dp.Error { + // Only needs to be atomic due to CLI print + atomic.AddUint64(&totalErrors, uint64(1)) + errorsPerQuery[cmdPos]++ + } else { + totalNodesCreated = totalNodesCreated + dp.NodesCreated + totalNodesDeleted = totalNodesDeleted + dp.NodesDeleted + totalLabelsAdded = totalLabelsAdded + dp.LabelsAdded + totalPropertiesSet = totalPropertiesSet + dp.PropertiesSet + totalRelationshipsCreated = totalRelationshipsCreated + dp.RelationshipsCreated + totalRelationshipsDeleted = totalRelationshipsDeleted + dp.RelationshipsDeleted + + totalNodesCreatedPerQuery[cmdPos] = totalNodesCreatedPerQuery[cmdPos] + dp.NodesCreated + totalNodesDeletedPerQuery[cmdPos] = totalNodesDeletedPerQuery[cmdPos] + dp.NodesDeleted + totalLabelsAddedPerQuery[cmdPos] = totalLabelsAddedPerQuery[cmdPos] + dp.LabelsAdded + totalPropertiesSetPerQuery[cmdPos] = totalPropertiesSetPerQuery[cmdPos] + dp.PropertiesSet + totalRelationshipsCreatedPerQuery[cmdPos] = totalRelationshipsCreatedPerQuery[cmdPos] + dp.RelationshipsCreated + totalRelationshipsDeletedPerQuery[cmdPos] = totalRelationshipsDeletedPerQuery[cmdPos] + dp.RelationshipsDeleted + + if dp.Empty { + totalEmptyResultsets++ + } + } + + instantMutex.Lock() + clientSide_AllQueries_InstantLatencies.RecordValue(clientDurationMicros) + serverSide_AllQueries_GraphInternalTime_InstantLatencies.RecordValue(graphInternalDurationMicros) + instantMutex.Unlock() + + totalProcessedCommands++ + // if all commands have been processed return + // otherwise keep looping + if totalProcessedCommands >= numberRequests { + return + } + break + } + + case <-c: + fmt.Println("\nReceived Ctrl-c - shutting down datapoints processor go-routine") + return + } + } +} + +func saveJsonResult(testResult *TestResult, jsonOutputFile *string) { + file, err := json.MarshalIndent(testResult, "", " ") + if err != nil { + log.Fatal(err) + } + fmt.Printf("Saving JSON results file to %s\n", *jsonOutputFile) + err = ioutil.WriteFile(*jsonOutputFile, file, 0644) + if err != nil { + log.Fatal(err) + } +} + +func calculateRateMetrics(current, prev int64, took time.Duration) (rate float64) { + rate = float64(current-prev) / float64(took.Seconds()) + return +} + +func generateQuantileMap(hist *hdrhistogram.Histogram) (int64, map[string]float64) { + ops := hist.TotalCount() + q0 := 0.0 + q50 := 0.0 + q95 := 0.0 + q99 := 0.0 + q999 := 0.0 + q100 := 0.0 + if ops > 0 { + q0 = float64(hist.ValueAtQuantile(0.0)) / 10e2 + q50 = float64(hist.ValueAtQuantile(50.0)) / 10e2 + q95 = float64(hist.ValueAtQuantile(95.0)) / 10e2 + q99 = float64(hist.ValueAtQuantile(99.0)) / 10e2 + q999 = float64(hist.ValueAtQuantile(99.90)) / 10e2 + q100 = float64(hist.ValueAtQuantile(100.0)) / 10e2 + } + + mp := map[string]float64{"q0": q0, "q50": q50, "q95": q95, "q99": q99, "q999": q999, "q100": q100} + return ops, mp +} + +func GetOverallQuantiles(cmds []string, perQueryHistograms []*hdrhistogram.Histogram, totalsHistogram *hdrhistogram.Histogram) map[string]interface{} { + perQueryQuantileMap := map[string]interface{}{} + for i, query := range cmds { + _, quantileMap := generateQuantileMap(perQueryHistograms[i]) + perQueryQuantileMap[query] = quantileMap + } + _, totalMap := generateQuantileMap(totalsHistogram) + perQueryQuantileMap["Total"] = totalMap + return perQueryQuantileMap +} + +func GetOverallRatesMap(took time.Duration, cmds []string, perQueryHistograms []*hdrhistogram.Histogram, totalsHistogram *hdrhistogram.Histogram) map[string]interface{} { + ///////// + // Overall Rates + ///////// + perQueryRatesMap := map[string]interface{}{} + for i, query := range cmds { + count := perQueryHistograms[i].TotalCount() + rate := calculateRateMetrics(count, 0, took) + perQueryRatesMap[query] = rate + } + count := totalsHistogram.TotalCount() + rate := calculateRateMetrics(count, 0, took) + perQueryRatesMap["Total"] = rate + return perQueryRatesMap +} + +func GetTotalsMap(queries []string, latenciesPerQuery []*hdrhistogram.Histogram, totalLatencies *hdrhistogram.Histogram, errorsPerQuery, totalNodesCreatedPerQuery, totalNodesDeletedPerQuery, totalLabelsAddedPerQuery, totalPropertiesSetPerQuery, totalRelationshipsCreatedPerQuery, totalRelationshipsDeletedPerQuery []uint64) map[string]interface{} { + totalsMap := map[string]interface{}{} + + for i, query := range queries { + totalsMap[query] = generateTotalMap(uint64(latenciesPerQuery[i].TotalCount()), errorsPerQuery[i], totalNodesCreatedPerQuery[i], totalNodesDeletedPerQuery[i], totalLabelsAddedPerQuery[i], totalPropertiesSetPerQuery[i], totalRelationshipsCreatedPerQuery[i], totalRelationshipsDeletedPerQuery[i]) + } + totalsMap["Total"] = generateTotalMap(uint64(totalLatencies.TotalCount()), CountTotal(errorsPerQuery), CountTotal(totalNodesCreatedPerQuery), CountTotal(totalNodesDeletedPerQuery), CountTotal(totalLabelsAddedPerQuery), CountTotal(totalPropertiesSetPerQuery), CountTotal(totalRelationshipsCreatedPerQuery), CountTotal(totalRelationshipsDeletedPerQuery)) + return totalsMap +} + +func CountTotal(slice []uint64) (res uint64) { + res = 0 + for _, i2 := range slice { + res += i2 + } + return +} + +func generateTotalMap(IssuedQueries, Errors, NodesCreated, NodesDeleted, LabelsAdded, PropertiesSet, RelationshipsCreated, RelationshipsDeleted uint64) interface{} { + mp := map[string]uint64{"IssuedQueries": IssuedQueries, "Errors": Errors, "NodesCreated": NodesCreated, "NodesDeleted": NodesDeleted, "LabelsAdded": LabelsAdded, "PropertiesSet": PropertiesSet, "RelationshipsCreated": RelationshipsCreated, "RelationshipsDeleted": RelationshipsDeleted} + return mp +} diff --git a/workers.go b/workers.go new file mode 100644 index 0000000..80ea5b4 --- /dev/null +++ b/workers.go @@ -0,0 +1,71 @@ +package main + +import ( + "fmt" + "github.com/RedisGraph/redisgraph-go" + "golang.org/x/time/rate" + "log" + "sync" + "time" +) + +func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS []string, commandsCDF []float32, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) { + defer wg.Done() + for i := 0; uint64(i) < number_samples || loop; i++ { + cmdPos := sample(commandsCDF) + sendCmdLogic(rg, cmdS[cmdPos], cmdPos, continueOnError, debug_level, useLimiter, rateLimiter, statsChannel) + } +} + +func sendCmdLogic(rg *redisgraph.Graph, query string, cmdPos int, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, statsChannel chan GraphQueryDatapoint) { + if useRateLimiter { + r := rateLimiter.ReserveN(time.Now(), int(1)) + time.Sleep(r.Delay()) + } + var err error + var queryResult *redisgraph.QueryResult + startT := time.Now() + queryResult, err = rg.Query(query) + endT := time.Now() + + duration := endT.Sub(startT) + datapoint := GraphQueryDatapoint{ + CmdPos: cmdPos, + ClientDurationMicros: duration.Microseconds(), + GraphInternalDurationMicros: 0, + Error: false, + Empty: true, + NodesCreated: 0, + NodesDeleted: 0, + LabelsAdded: 0, + PropertiesSet: 0, + RelationshipsCreated: 0, + RelationshipsDeleted: 0, + } + if err != nil { + datapoint.Error = true + if continueOnError { + if debug_level > 0 { + log.Println(fmt.Sprintf("Received an error with the following query(s): %v, error: %v", query, err)) + } + } else { + log.Fatalf("Received an error with the following query(s): %v, error: %v", query, err) + } + } else { + datapoint.GraphInternalDurationMicros = int64(queryResult.InternalExecutionTime() * 1000.0) + if debug_level > 1 { + fmt.Printf("Issued query: %s\n", query) + fmt.Printf("Pretty printing result:\n") + queryResult.PrettyPrint() + fmt.Printf("\n") + } + datapoint.Empty = queryResult.Empty() + datapoint.NodesCreated = uint64(queryResult.NodesCreated()) + datapoint.NodesDeleted = uint64(queryResult.NodesDeleted()) + datapoint.LabelsAdded = uint64(queryResult.LabelsAdded()) + datapoint.PropertiesSet = uint64(queryResult.PropertiesSet()) + datapoint.RelationshipsCreated = uint64(queryResult.RelationshipsCreated()) + datapoint.RelationshipsDeleted = uint64(queryResult.RelationshipsDeleted()) + } + statsChannel <- datapoint +}