Skip to content

Commit

Permalink
[WIP] votr
Browse files Browse the repository at this point in the history
  • Loading branch information
bobvawter committed Oct 31, 2023
1 parent 785d167 commit 0a36db7
Show file tree
Hide file tree
Showing 11 changed files with 1,081 additions and 4 deletions.
116 changes: 116 additions & 0 deletions internal/cmd/votr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# VOTR

The VOTR workload demonstrates a hub-and-spoke replication workload that
uses cdc-sink's merge features.

The core workload demonstrates a ballot-casting system. There are three tables:
* `candidates` is an infrequently-updated table that uses a version
column and last-one-wins behaviors
* `ballots` is an append-only table with a composite primary key of
`(candidate uuid, ballot uuid)`
* `totals` maintains per-candidate running totals. It is intended to
show how a high-contention table can be logically replicated; this is
not an ideal pattern for CockroachDB.

VOTR shows a hub-and-spoke replication model, but it can also operate in
a two-datacenter (2-DC) model. The `votr_0` database is used as the hub,
and an arbitrary number of "spoke" databases, `votr_N` run the workload.
Each hub-spoke pair has two unidirectional changefeeds, each of which
are processed by independent `cdc-sink` instances. There are a total of
six independent feeds that are operating in this demonstration.

![votr-diagram.png](votr-diagram.png)

The worker attached to each spoke selects a random number of candidates
and casts one or more ballots to each of them. The VOTR worker knows
nothing about the replication model. The workload is roughly:
* `BEGIN`
* For random Candidate and Number of votes:
* `UPDATE totals SET total = total + N WHERE candidate = C`
* `INSERT INTO ballots (candidate) VALUES (C, generate_series(1, N))`
* `COMMIT`

Each spoke also has a validation loop that checks whether or not the
total number of ballots equals the running total of all totals. That is
`SELECT count(*) FROM ballots` should equal `SELECT sum(total) FROM
totals` at any given point in time.

The only SQL client to interact with the `votr_0` hub is cdc-sink.

The tables used by the `VOTR` workload are augmented with a vector-clock
scheme. That is, each table has a `whence JSONB` column that shows which
worker(s) a row incorporates state from. A `src INT` column also allows
the `vectorMerge()` function defined in the [votr.ts user
script](./script/votr.ts) to ensure idempotency by ensuring that the
clock for a particular source only ever rolls forwards. Note that the
functionality does *not* depend on clock synchronization between the
workers, only that each worker has a reasonably well-behaved cluster
timestamp.

# Running the demo

Start one or more local CockroachDB instances. The VOTR workload can use
multiple SQL databases on a single CockroachDB node, or the databases
can be split across multiple clusters.
> `cockroach start-single-node --store type=mem,size=2G --advertise-addr 127.0.0.1:26257 --insecure `
Initialize the VOTR schema on each instance. The number of regions is
specified by repeating the `--connect` argument. It is valid to have
duplicate connection strings to use the same CockroachDB node or cluster
to host multiple VOTR databases. This command-line shows the local
CockroachDB node being used to host the hub and three spokes.
> `cdc-sink votr init
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'`
Run the workload. The `--drainDelay` flag sets a timeout after receiving
a `^C` for the workers to stop, but to allow the cdc-sink instances to
continue to process messages.
> `cdc-sink votr run
> --candidates 128 --workers 1 --drainDelay 1m
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'
> --connect 'postgresql://root@localhost:26257/?sslmode=disable'`
# Useful demo query

Show the count of ballots and sum total across multiple VOTR database on
the same node:
```sql
WITH t (n, t) AS (
SELECT 1 AS n, sum(total)
FROM votr_1.totals
UNION ALL SELECT 2 AS n, sum(total)
FROM votr_2.totals
UNION ALL SELECT 3 AS n, sum(total)
FROM votr_3.totals
UNION ALL SELECT 0 AS n, sum(total)
FROM votr_0.totals
),
c (n, c) AS (
SELECT 1, count(*) FROM votr_1.ballots
UNION ALL SELECT 2, count(*)
FROM votr_2.ballots
UNION ALL SELECT 3, count(*)
FROM votr_3.ballots
UNION ALL SELECT 0, count(*)
FROM votr_0.ballots
)
SELECT *
FROM t JOIN c USING (n) order by n;
```

# Known limitations

* VOTR is a one-shot demo. The cdc-sink HTTP ports are ephemeral, so
it's not straightforward to continue from a previous run. Since the
source changefeeds can't be reused, VOTR would have to pick the
correct changefeed cursor. With more development effort, VOTR could
query the jobs table to determine which port it should attempt to bind
a listener to, in order to use the existing state.
* VOTR cannot clean up the changefeeds that it creates. It's best to use
an in-memory CockroachDB instance(s) as shown above and restart
between runs.
277 changes: 277 additions & 0 deletions internal/cmd/votr/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
// Copyright 2023 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package votr

import (
"context"
"database/sql"
"fmt"
"math/rand"

"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/retry"
"github.com/google/uuid"
"github.com/pkg/errors"
)

var (
ballots = ident.New("ballots")
candidates = ident.New("candidates")
totals = ident.New("totals")

names = [...]string{
"Alice", "Bob", "Carol", "David", "Eve", "Frank", "Gil",
"Hillary", "Indira", "Jill", "Kyle", "Louis", "Mike", "Nancy",
"Oscar", "Paul", "Queen", "Romeo", "Sierra", "Toni", "Ursula",
"Vik", "Walter", "Xerxes", "Yolanda", "Zola",
}
connectors = [...]string{"le", "the"}
epithets = [...]string{
"Awesome", "Boor", "Concerned", "Dependable", "Elated", "Fancy",
"Grouch", "Hapless", "Indecisive", "Joyful", "Kleptocrat",
"Lesser", "Mannered", "Nice", "Opulent", "Purposeful", "Quiet",
"Remote", "Sulky", "Truthful", "Unfortunate", "Victorious",
"Wastrel", "XIVth", "Yankee", "Zoologist",
}
moods = [...]string{
"Active", "Bad", "Cheerful", "Down", "Elated", "Frightened",
"Good", "Happy", "Introspective", "Justified", "Kind", "Liked",
"Mad", "Naughty", "Open", "Puzzled", "Questioning", "Romantic",
"Sleepy", "Trusting", "Watchful", "XOXO", "Zen",
}
)

const (
// ballots are append-only.
ballotsSchema = `CREATE TABLE IF NOT EXISTS %[1]s (
candidate UUID NOT NULL REFERENCES %[2]s ON DELETE CASCADE,
ballot UUID NOT NULL DEFAULT gen_random_uuid(),
whence JSONB NOT NULL
DEFAULT jsonb_build_object(%[3]d, cluster_logical_timestamp()::string)
ON UPDATE jsonb_build_object(%[3]d, cluster_logical_timestamp()::string),
src INT NOT NULL DEFAULT %[3]d ON UPDATE %[3]d,
xyzzy INT NOT NULL DEFAULT 0, -- Hack to force a conflict, until CAS mode has "always" mode.
PRIMARY KEY (candidate, ballot)
)`

// candidates might be updated occasionally in a last-one-wins manner.
candidatesSchema = `CREATE TABLE IF NOT EXISTS %[1]s (
candidate UUID PRIMARY KEY,
whence JSONB NOT NULL
DEFAULT jsonb_build_object(%[2]d, cluster_logical_timestamp()::string)
ON UPDATE jsonb_build_object(%[2]d, cluster_logical_timestamp()::string),
src INT NOT NULL DEFAULT %[2]d ON UPDATE %[2]d,
name TEXT NOT NULL,
mood TEXT NOT NULL,
xyzzy INT NOT NULL DEFAULT 0 -- Hack to force a conflict, until CAS mode has "always" mode.
)`

// totals will show a high-conflict table with custom merge logic.
totalsSchema = `CREATE TABLE IF NOT EXISTS %[1]s (
candidate UUID PRIMARY KEY REFERENCES %[2]s ON DELETE CASCADE,
whence JSONB NOT NULL
DEFAULT jsonb_build_object(%[3]d, cluster_logical_timestamp()::string)
ON UPDATE jsonb_build_object(%[3]d, cluster_logical_timestamp()::string),
src INT NOT NULL DEFAULT %[3]d ON UPDATE %[3]d,
total INT NOT NULL DEFAULT 0,
xyzzy INT NOT NULL DEFAULT 0 -- Hack to force a conflict, until CAS mode has "always" mode.
)`
)

type schema struct {
ballots ident.Table
candidates ident.Table
enclosing ident.Ident
totals ident.Table

candidateIds map[uuid.UUID]struct{}
db *sql.DB
region region
}

func newSchema(db *sql.DB, enclosing ident.Ident, r region) *schema {
enclosing = ident.New(enclosing.Raw() + "_" + r.String())
s := ident.MustSchema(enclosing, ident.Public)
return &schema{
ballots: ident.NewTable(s, ballots),
candidateIds: make(map[uuid.UUID]struct{}),
candidates: ident.NewTable(s, candidates),
db: db,
enclosing: enclosing,
region: r,
totals: ident.NewTable(s, totals),
}
}

func (s *schema) create(ctx context.Context) error {
return retry.Retry(ctx, func(ctx context.Context) error {
if _, err := s.db.ExecContext(ctx, fmt.Sprintf(
`CREATE DATABASE IF NOT EXISTS %s `, s.enclosing)); err != nil {
return errors.WithStack(err)
}

if _, err := s.db.ExecContext(ctx, fmt.Sprintf(
candidatesSchema, s.candidates, s.region,
)); err != nil {
return errors.WithStack(err)
}

if _, err := s.db.ExecContext(ctx, fmt.Sprintf(
ballotsSchema, s.ballots, s.candidates, s.region,
)); err != nil {
return errors.WithStack(err)
}

if _, err := s.db.ExecContext(ctx, fmt.Sprintf(
totalsSchema, s.totals, s.candidates, s.region,
)); err != nil {
return errors.WithStack(err)
}
return nil
})
}

// doStuff selects a random selection of candidates, distributes the
// requested number of votes across them, and inserts the ballots.
func (s *schema) doStuff(ctx context.Context, votes int) error {
numCandidates := rand.Intn(votes) + 1 // Intn [0,n)

winners := make([]uuid.UUID, 0, numCandidates)
// Iteration over a map is random enough for our purposes.
for id := range s.candidateIds {
winners = append(winners, id)
if len(winners) == numCandidates {
break
}
}

voteAllocation := make(map[uuid.UUID]int)
for i := 0; i < votes; i++ {
winnerIdx := i % len(winners)
voteAllocation[winners[winnerIdx]]++
}

ballotQ := fmt.Sprintf(`INSERT INTO %s (candidate)
SELECT candidate FROM
(SELECT $1::UUID candidate, generate_series(1, $2))`, s.ballots)
totalQ := fmt.Sprintf(`INSERT INTO %s AS tbl (candidate, total)
VALUES ($1, $2)
ON CONFLICT(candidate)
DO UPDATE SET total=tbl.total+excluded.total`, s.totals)

return retry.Retry(ctx, func(ctx context.Context) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return errors.WithStack(err)
}
defer func() { _ = tx.Rollback() }()

for candidate, count := range voteAllocation {
if _, err := tx.ExecContext(ctx, totalQ, candidate, count); err != nil {
return errors.WithStack(err)
}
if _, err := tx.ExecContext(ctx, ballotQ, candidate, count); err != nil {
return errors.WithStack(err)
}
}
return errors.WithStack(tx.Commit())
})
}

func (s *schema) ensureCandidates(ctx context.Context, count int) error {
seed := int64(0)
rnd := rand.New(rand.NewSource(seed))

nextMood := func() string {
return moods[rnd.Intn(len(moods))]
}
nextName := func(deconflict int) string {
return fmt.Sprintf("%s %s %s (%d)",
names[rnd.Intn(len(names))],
connectors[rnd.Intn(len(connectors))],
epithets[rnd.Intn(len(epithets))],
deconflict)
}

// Rows are inserted with deterministic ids.
q := fmt.Sprintf(`UPSERT INTO %s (candidate, name, mood)
VALUES (uuid_generate_v5('455E049E-54B6-41C9-BBCE-1587CC394851', $1), $1, $2)
RETURNING candidate`, s.candidates)

for i := 0; i < count; i++ {
name := nextName(i)
mood := nextMood()
if err := retry.Retry(ctx, func(ctx context.Context) error {
var id uuid.UUID
if err := s.db.QueryRowContext(ctx, q, name, mood).Scan(&id); err != nil {
return errors.WithStack(err)
}
s.candidateIds[id] = struct{}{}
return nil
}); err != nil {
return err
}
}

return nil
}

func (s *schema) validate(ctx context.Context, aost bool) ([]string, error) {
asOf := ""
if aost {
asOf = "AS OF SYSTEM TIME follower_read_timestamp()"
}

q := fmt.Sprintf(`
WITH counted AS (SELECT candidate, count(*) AS count FROM %s GROUP BY candidate),
verify AS (
SELECT candidate,
IFNULL(counted.count, 0) expected,
IFNULL(totals.total, 0) actual
FROM counted FULL JOIN %s USING (candidate)
)
SELECT candidate, expected, actual, name
FROM verify
JOIN %s USING (candidate)
%s
WHERE expected != actual`, s.ballots, s.totals, s.candidates, asOf)

var ret []string
err := retry.Retry(ctx, func(ctx context.Context) error {
ret = nil // Reset if looping.

rows, err := s.db.QueryContext(ctx, q)
if err != nil {
return errors.WithStack(err)
}
defer func() { _ = rows.Close() }()

for rows.Next() {
var id uuid.UUID
var expected, actual int
var name string
if err := rows.Scan(&id, &expected, &actual, &name); err != nil {
return errors.WithStack(err)
}
ret = append(ret, fmt.Sprintf("%s: expected %d had %d (%s)",
id, expected, actual, name))
}
// Final error check.
return errors.WithStack(rows.Err())
})
return ret, err
}
Loading

0 comments on commit 0a36db7

Please sign in to comment.