Skip to content

Commit

Permalink
workload/schemachange: make RSW more deterministic by using setseed
Browse files Browse the repository at this point in the history
The recently added setseed builtin lets us make the ORDER BY random()
clauses used in this workload deterministic.

Each worker goroutine of the RSW now runs a determistic sequence of
operations given the same COCKROACH_RANDOM_SEED. However, since the
workload is concurrent, the overall RSW is not deterministic, since the
interleaving of goroutine execution is random.

This also makes the "deck" used to decide which operation to execute be
per-worker rather than global.

Release note: None
  • Loading branch information
rafiss committed Feb 12, 2024
1 parent b51da9e commit a5d7d67
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 13 deletions.
56 changes: 51 additions & 5 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3256,6 +3256,9 @@ func (og *operationGenerator) getTableColumns(
func (og *operationGenerator) randColumn(
ctx context.Context, tx pgx.Tx, tableName tree.TableName, pctExisting int,
) (string, error) {
if err := og.setSeedInDB(ctx, tx); err != nil {
return "", err
}
if og.randIntn(100) >= pctExisting {
// We make a unique name for all columns by prefixing them with the table
// index to make it easier to reference columns from different tables.
Expand All @@ -3282,6 +3285,9 @@ ORDER BY random()
func (og *operationGenerator) randColumnWithMeta(
ctx context.Context, tx pgx.Tx, tableName tree.TableName, pctExisting int,
) (column, error) {
if err := og.setSeedInDB(ctx, tx); err != nil {
return column{}, err
}
if og.randIntn(100) >= pctExisting {
// We make a unique name for all columns by prefixing them with the table
// index to make it easier to reference columns from different tables.
Expand Down Expand Up @@ -3371,7 +3377,9 @@ func (og *operationGenerator) randChildColumnForFkRelation(
func (og *operationGenerator) randParentColumnForFkRelation(
ctx context.Context, tx pgx.Tx, unique bool,
) (*tree.TableName, *column, error) {

if err := og.setSeedInDB(ctx, tx); err != nil {
return nil, nil, err
}
subQuery := strings.Builder{}
subQuery.WriteString(`
SELECT table_schema, table_name, column_name, crdb_sql_type, is_nullable, contype, conkey
Expand Down Expand Up @@ -3444,6 +3452,9 @@ func (og *operationGenerator) randParentColumnForFkRelation(
func (og *operationGenerator) randConstraint(
ctx context.Context, tx pgx.Tx, tableName string,
) (string, error) {
if err := og.setSeedInDB(ctx, tx); err != nil {
return "", err
}
q := fmt.Sprintf(`
SELECT constraint_name
FROM [SHOW CONSTRAINTS FROM %s]
Expand All @@ -3461,6 +3472,9 @@ ORDER BY random()
func (og *operationGenerator) randIndex(
ctx context.Context, tx pgx.Tx, tableName tree.TableName, pctExisting int,
) (string, error) {
if err := og.setSeedInDB(ctx, tx); err != nil {
return "", err
}
if og.randIntn(100) >= pctExisting {
// We make a unique name for all indices by prefixing them with the table
// index to make it easier to reference columns from different tables.
Expand All @@ -3485,7 +3499,9 @@ ORDER BY random()
func (og *operationGenerator) randSequence(
ctx context.Context, tx pgx.Tx, pctExisting int, desiredSchema string,
) (*tree.TableName, error) {

if err := og.setSeedInDB(ctx, tx); err != nil {
return nil, err
}
if desiredSchema != "" {
if og.randIntn(100) >= pctExisting {
treeSeqName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{
Expand Down Expand Up @@ -3587,7 +3603,9 @@ ORDER BY random()
func (og *operationGenerator) randTable(
ctx context.Context, tx pgx.Tx, pctExisting int, desiredSchema string,
) (*tree.TableName, error) {

if err := og.setSeedInDB(ctx, tx); err != nil {
return nil, err
}
if desiredSchema != "" {
if og.randIntn(100) >= pctExisting {
treeTableName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{
Expand Down Expand Up @@ -3658,6 +3676,9 @@ ORDER BY random()
func (og *operationGenerator) randView(
ctx context.Context, tx pgx.Tx, pctExisting int, desiredSchema string,
) (*tree.TableName, error) {
if err := og.setSeedInDB(ctx, tx); err != nil {
return nil, err
}
if desiredSchema != "" {
if og.randIntn(100) >= pctExisting {
treeViewName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{
Expand Down Expand Up @@ -3702,6 +3723,9 @@ func (og *operationGenerator) randView(
}, tree.Name(fmt.Sprintf("view_%s", og.newUniqueSeqNumSuffix())))
return &treeViewName, nil
}
if err := og.setSeedInDB(ctx, tx); err != nil {
return nil, err
}
const q = `
SELECT schema_name, table_name
FROM [SHOW TABLES]
Expand Down Expand Up @@ -3812,6 +3836,9 @@ func (og *operationGenerator) createSchema(ctx context.Context, tx pgx.Tx) (*opS
func (og *operationGenerator) randSchema(
ctx context.Context, tx pgx.Tx, pctExisting int,
) (string, error) {
if err := og.setSeedInDB(ctx, tx); err != nil {
return "", err
}
if og.randIntn(100) >= pctExisting {
return fmt.Sprintf("schema_%s", og.newUniqueSeqNumSuffix()), nil
}
Expand Down Expand Up @@ -4309,6 +4336,11 @@ func (og *operationGenerator) randIntn(topBound int) int {
return og.params.rng.Intn(topBound)
}

// randFloat64 returns an float64 in the range [0,1.0).
func (og *operationGenerator) randFloat64() float64 {
return og.params.rng.Float64()
}

func (og *operationGenerator) newUniqueSeqNumSuffix() string {
og.params.seqNum++
return fmt.Sprintf("w%d_%d", og.params.workerID, og.params.seqNum)
Expand All @@ -4335,7 +4367,7 @@ func (og *operationGenerator) typeFromTypeName(
}

// Check if the test is running with a mixed version cluster, with a version
// less than or equal to the target version number. This can be used to detect
// less than the target version number. This can be used to detect
// in mixed version environments if certain errors should be encountered.
func isClusterVersionLessThan(
ctx context.Context, tx pgx.Tx, targetVersion roachpb.Version,
Expand All @@ -4349,5 +4381,19 @@ func isClusterVersionLessThan(
if err != nil {
return false, err
}
return clusterVersion.LessEq(targetVersion), nil
return clusterVersion.Less(targetVersion), nil
}

func (og *operationGenerator) setSeedInDB(ctx context.Context, tx pgx.Tx) error {
if notSupported, err := isClusterVersionLessThan(ctx, tx, clusterversion.V24_1.Version()); err != nil {
return err
} else if notSupported {
// To allow the schemachange workload to work in a mixed-version state,
// this should not be treated as an error.
return nil
}
if _, err := tx.Exec(ctx, "SELECT setseed($1)", og.randFloat64()); err != nil {
return err
}
return nil
}
17 changes: 9 additions & 8 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,17 @@ func (s *schemaChange) Ops(
return workload.QueryLoad{}, err
}
stdoutLog := makeAtomicLog(os.Stdout)
rng, seed := randutil.NewTestRand()
_, seed := randutil.NewTestRand()
stdoutLog.printLn(fmt.Sprintf("using random seed: %d", seed))
ops := newDeck(rng, opWeights...)
// A separate deck is constructed of only schema changes supported
// by the declarative schema changer. This deck has equal weights,
// only for supported schema changes.
// A separate weighting is constructed of only schema changes supported by the
// declarative schema changer. This will be used to make a per-worker deck
// that has equal weights, only for supported schema changes.
declarativeOpWeights := make([]int, len(opWeights))
for idx, weight := range opWeights {
if _, ok := opDeclarativeVersion[opType(idx)]; ok {
declarativeOpWeights[idx] = weight
}
}
declarativeOps := newDeck(rng, declarativeOpWeights...)

ql := workload.QueryLoad{
SQLDatabase: sqlDatabase,
Expand Down Expand Up @@ -251,12 +249,15 @@ func (s *schemaChange) Ops(
// operations.
workerRng := randutil.NewTestRandWithSeed(seed + int64(i))

// Each worker needs its own sequence number generator so that the names of
// generated objects are deterministic across runs.
// Each worker needs its own sequence number generator and operation deck so
// that the names of generated objects and operations are deterministic
// across runs.
seqNum, err := s.initSeqNum(ctx, pool, i)
if err != nil {
return workload.QueryLoad{}, err
}
ops := newDeck(workerRng, opWeights...)
declarativeOps := newDeck(workerRng, declarativeOpWeights...)

opGeneratorParams := operationGeneratorParams{
workerID: i,
Expand Down

0 comments on commit a5d7d67

Please sign in to comment.