Skip to content

Commit

Permalink
Changes for Upgrade tests for systest/21million
Browse files Browse the repository at this point in the history
  • Loading branch information
jbhamra1 committed Aug 4, 2023
1 parent d8d661f commit 352f44b
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 158 deletions.
14 changes: 14 additions & 0 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type ClusterConfig struct {
portOffset int
bulkOutDir string
lambdaURL string
postingDir string
detectRace bool
}

func NewClusterConfig() ClusterConfig {
Expand All @@ -112,6 +114,7 @@ func NewClusterConfig() ClusterConfig {
refillInterval: 20 * time.Second,
uidLease: 50,
portOffset: -1,
detectRace: false,
}
}

Expand Down Expand Up @@ -186,3 +189,14 @@ func (cc ClusterConfig) WithGraphqlLambdaURL(url string) ClusterConfig {
cc.lambdaURL = url
return cc
}

func (cc ClusterConfig) WithPostingListDir(dir string) ClusterConfig {
cc.postingDir = dir
return cc
}

// WithDetectRace detects data race from the alpha logs
func (cc ClusterConfig) WithDetectRace() ClusterConfig {
cc.detectRace = true
return cc
}
3 changes: 3 additions & 0 deletions dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ func (a *alpha) cmd(c *LocalCluster) []string {
if c.conf.lambdaURL != "" {
acmd = append(acmd, fmt.Sprintf(`--graphql=lambda-url=%s`, c.conf.lambdaURL))
}
if len(c.conf.postingDir) > 0 {
acmd = append(acmd, fmt.Sprintf(`-p=%s`, c.conf.postingDir))
}

return acmd
}
Expand Down
35 changes: 35 additions & 0 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package dgraphtest

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"strings"
"time"

Expand Down Expand Up @@ -224,6 +226,10 @@ func (c *LocalCluster) Cleanup(verbose bool) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()

if c.conf.detectRace {
c.DetectRaceInAlphas()
}

ro := types.ContainerRemoveOptions{RemoveVolumes: true, Force: true}
for _, aa := range c.alphas {
if err := c.dcli.ContainerRemove(ctx, aa.cid(), ro); err != nil {
Expand Down Expand Up @@ -864,3 +870,32 @@ func (c *LocalCluster) inspectContainer(containerID string) (string, error) {
}
return string(raw), nil
}

func (c *LocalCluster) DetectRaceInAlphas() bool {
for _, a := range c.alphas {
contLogs, err := c.getLogs(a.containerID)
if err != nil {
continue
}
if CheckIfRace([]byte(contLogs)) {
return true
}
}
return false
}

func CheckIfRace(output []byte) bool {
awkCmd := exec.Command("awk", "/WARNING: DATA RACE/{flag=1}flag;/==================/{flag=0}")
awkCmd.Stdin = bytes.NewReader(output)
out, err := awkCmd.CombinedOutput()
if err != nil {
log.Printf("[ERROR] while getting race content %v", err)
return false
}

if len(out) > 0 {
log.Printf("[WARNING] DATA RACE DETECTED %s\n", string(out))
return true
}
return false
}
43 changes: 43 additions & 0 deletions systest/21million/bulk/integration_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//go:build integration

/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bulk

import (
"path/filepath"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgraph/testutil"
)

func (bsuite *TOMbulkTestSuite) bulkLoader() error {
rdfFile := filepath.Join(testutil.TestDataDirectory, "21million.rdf.gz")
schemaFile := filepath.Join(testutil.TestDataDirectory, "21million.schema")
require.NoError(bsuite.T(), testutil.MakeDirEmpty([]string{"out/0", "out/1", "out/2"}))
return testutil.BulkLoad(testutil.BulkOpts{
Zero: testutil.SockAddrZero,
Shards: 1,
RdfFile: rdfFile,
SchemaFile: schemaFile,
})
}

func (bsuite *TOMbulkTestSuite) StartAlpha() error {
return testutil.StartAlphas("./alpha.yml")
}
49 changes: 49 additions & 0 deletions systest/21million/bulk/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//go:build integration

/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bulk

import (
"testing"

"github.com/stretchr/testify/suite"

"github.com/dgraph-io/dgraph/dgraphtest"
"github.com/dgraph-io/dgraph/testutil"
)

type TOMbulkTestSuite struct {
suite.Suite
dc dgraphtest.Cluster
}

func (bsuite *TOMbulkTestSuite) SetupTest() {
bsuite.dc = dgraphtest.NewComposeCluster()
}

func (bsuite *TOMbulkTestSuite) TearDownTest() {
testutil.DetectRaceInAlphas(testutil.DockerPrefix)
}

func (bsuite *TOMbulkTestSuite) Upgrade() {
// Not implemented for integration tests
}

func TestTOMbulkTestSuite(t *testing.T) {
suite.Run(t, new(TOMbulkTestSuite))
}
115 changes: 83 additions & 32 deletions systest/21million/bulk/run_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build integration
//go:build integration || upgrade

/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
Expand All @@ -19,47 +19,98 @@
package bulk

import (
"context"
"io"
"os"
"path/filepath"
"testing"
"runtime"
"strings"
"time"

"github.com/dgraph-io/dgraph/systest/21million/common"
"github.com/dgraph-io/dgraph/testutil"
"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/dgraphtest"
)

func TestQueries(t *testing.T) {
t.Run("Run queries", common.TestQueriesFor21Million)
}
// JSON output can be hundreds of lines and diffs can scroll off the terminal before you
// can look at them. This option allows saving the JSON to a specified directory instead
// for easier reviewing after the test completes.
//var savedir = flag.String("savedir", "",
// "directory to save json from test failures in")
//var quiet = flag.Bool("quiet", false,
// "just output whether json differs, not a diff")

func TestMain(m *testing.M) {
schemaFile := filepath.Join(testutil.TestDataDirectory, "21million.schema")
rdfFile := filepath.Join(testutil.TestDataDirectory, "21million.rdf.gz")
if err := testutil.MakeDirEmpty([]string{"out/0", "out/1", "out/2"}); err != nil {
os.Exit(1)
}
func (bsuite *TOMbulkTestSuite) TestQueriesFor21Million() {
t := bsuite.T()
_, thisFile, _, _ := runtime.Caller(0)
queryDir := filepath.Join(filepath.Dir(thisFile), "../queries")

if err := testutil.BulkLoad(testutil.BulkOpts{
Zero: testutil.SockAddrZero,
Shards: 1,
RdfFile: rdfFile,
SchemaFile: schemaFile,
}); err != nil {
cleanupAndExit(1)
files, err := os.ReadDir(queryDir)
if err != nil {
t.Fatalf("Error reading directory: %s", err.Error())
}

if err := testutil.StartAlphas("./alpha.yml"); err != nil {
cleanupAndExit(1)
}
//savepath := ""
//diffs := 0
for _, file := range files {
if !strings.HasPrefix(file.Name(), "query-") {
continue
}

exitCode := m.Run()
cleanupAndExit(exitCode)
}
bsuite.Run(file.Name(), func() {
require.NoError(t, bsuite.bulkLoader())

// start alphas
//require.NoError(t, c.Start())
require.NoError(t, bsuite.StartAlpha())

// Upgrade
bsuite.Upgrade()

// For this test we DON'T want to start with an empty database.
dg, cleanup, err := bsuite.dc.Client()
defer cleanup()
require.NoError(t, err)

filename := filepath.Join(queryDir, file.Name())
reader, cleanup := chunker.FileReader(filename, nil)
bytes, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("Error reading file: %s", err.Error())
}
contents := string(bytes[:])
cleanup()

// The test query and expected result are separated by a delimiter.
bodies := strings.SplitN(contents, "\n---\n", 2)
// Dgraph can get into unhealthy state sometime. So, add retry for every query.
for retry := 0; retry < 3; retry++ {
// If a query takes too long to run, it probably means dgraph is stuck and there's
// no point in waiting longer or trying more tests.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
resp, err := dg.NewTxn().Query(ctx, bodies[0])
cancel()

if retry < 2 && (err != nil || ctx.Err() == context.DeadlineExceeded) {
continue
}

if ctx.Err() == context.DeadlineExceeded {
t.Fatal("aborting test due to query timeout")
}

t.Logf("running %s", file.Name())
//if *savedir != "" {
// savepath = filepath.Join(*savedir, file.Name())
//}

func cleanupAndExit(exitCode int) {
if testutil.StopAlphasAndDetectRace([]string{"alpha1"}) {
// if there is race fail the test
exitCode = 1
dgraphtest.CompareJSON(bodies[1], string(resp.GetJson()))
}
})
}
_ = os.RemoveAll("out")
os.Exit(exitCode)
//
//if *savedir != "" && diffs > 0 {
// t.Logf("test json saved in directory: %s", *savedir)
//}
}
44 changes: 44 additions & 0 deletions systest/21million/bulk/upgrade_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//go:build upgrade

/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bulk

import (
"os"
"path/filepath"

"github.com/dgraph-io/dgraph/dgraphtest"
)

func (bsuite *TOMbulkTestSuite) bulkLoader() error {
dataDir := os.Getenv("TEST_DATA_DIRECTORY")
rdfFile := filepath.Join(dataDir, "21million.rdf.gz")
schemaFile := filepath.Join(dataDir, "21million.schema")
return bsuite.lc.BulkLoad(dgraphtest.BulkOpts{
DataFiles: []string{rdfFile},
SchemaFiles: []string{schemaFile},
})
}

func (lsuite *TOMbulkTestSuite) StartAlpha() error {
c := lsuite.lc
if err := c.Start(); err != nil {
return err
}
return c.HealthCheck(false)
}

0 comments on commit 352f44b

Please sign in to comment.