diff --git a/internal/cmd/votr/schema.go b/internal/cmd/votr/schema.go new file mode 100644 index 000000000..2e0264647 --- /dev/null +++ b/internal/cmd/votr/schema.go @@ -0,0 +1,277 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package votr + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/retry" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +var ( + ballots = ident.New("ballots") + candidates = ident.New("candidates") + totals = ident.New("totals") + + names = [...]string{ + "Alice", "Bob", "Carol", "David", "Eve", "Frank", "Gil", + "Hillary", "Indira", "Jill", "Kyle", "Louis", "Mike", "Nancy", + "Oscar", "Paul", "Queen", "Romeo", "Sierra", "Toni", "Ursula", + "Vik", "Walter", "Xerxes", "Yolanda", "Zola", + } + connectors = [...]string{"le", "the"} + epithets = [...]string{ + "Awesome", "Boor", "Concerned", "Dependable", "Elated", "Fancy", + "Grouch", "Hapless", "Indecisive", "Joyful", "Kleptocrat", + "Lesser", "Mannered", "Nice", "Opulent", "Purposeful", "Quiet", + "Remote", "Sulky", "Truthful", "Unfortunate", "Victorious", + "Wastrel", "XIVth", "Yankee", "Zoologist", + } + moods = [...]string{ + "Active", "Bad", "Cheerful", "Down", "Elated", "Frightened", + "Good", "Happy", "Introspective", "Justified", "Kind", "Liked", + "Mad", "Naughty", "Open", "Puzzled", "Questioning", "Romantic", + "Sleepy", "Trusting", "Watchful", "XOXO", "Zen", + } +) + +const ( + // ballots are append-only. + ballotsSchema = `CREATE TABLE IF NOT EXISTS %[1]s ( +candidate UUID NOT NULL REFERENCES %[2]s ON DELETE CASCADE, +ballot UUID NOT NULL DEFAULT gen_random_uuid(), +whence JSONB NOT NULL +DEFAULT jsonb_build_object(%[3]d, cluster_logical_timestamp()::string) +ON UPDATE jsonb_build_object(%[3]d, cluster_logical_timestamp()::string), +src INT NOT NULL DEFAULT %[3]d ON UPDATE %[3]d, +xyzzy INT NOT NULL DEFAULT 0, +PRIMARY KEY (candidate, ballot) +)` + + // candidates might be updated occasionally in a last-one-wins manner. + candidatesSchema = `CREATE TABLE IF NOT EXISTS %[1]s ( +candidate UUID PRIMARY KEY, +whence JSONB NOT NULL +DEFAULT jsonb_build_object(%[2]d, cluster_logical_timestamp()::string) +ON UPDATE jsonb_build_object(%[2]d, cluster_logical_timestamp()::string), +src INT NOT NULL DEFAULT %[2]d ON UPDATE %[2]d, +name TEXT NOT NULL, +mood TEXT NOT NULL, +xyzzy INT NOT NULL DEFAULT 0 +)` + + // totals will show a high-conflict table with custom merge logic. + totalsSchema = `CREATE TABLE IF NOT EXISTS %[1]s ( +candidate UUID PRIMARY KEY REFERENCES %[2]s ON DELETE CASCADE, +whence JSONB NOT NULL +DEFAULT jsonb_build_object(%[3]d, cluster_logical_timestamp()::string) +ON UPDATE jsonb_build_object(%[3]d, cluster_logical_timestamp()::string), +src INT NOT NULL DEFAULT %[3]d ON UPDATE %[3]d, +total INT NOT NULL DEFAULT 0, +xyzzy INT NOT NULL DEFAULT 0 +)` +) + +type schema struct { + ballots ident.Table + candidates ident.Table + enclosing ident.Ident + totals ident.Table + + candidateIds map[uuid.UUID]struct{} + db *sql.DB + region region +} + +func newSchema(db *sql.DB, enclosing ident.Ident, r region) *schema { + enclosing = ident.New(enclosing.Raw() + "_" + r.String()) + s := ident.MustSchema(enclosing, ident.Public) + return &schema{ + ballots: ident.NewTable(s, ballots), + candidateIds: make(map[uuid.UUID]struct{}), + candidates: ident.NewTable(s, candidates), + db: db, + enclosing: enclosing, + region: r, + totals: ident.NewTable(s, totals), + } +} + +func (s *schema) create(ctx context.Context) error { + return retry.Retry(ctx, func(ctx context.Context) error { + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + `CREATE DATABASE IF NOT EXISTS %s `, s.enclosing)); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + candidatesSchema, s.candidates, s.region, + )); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + ballotsSchema, s.ballots, s.candidates, s.region, + )); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + totalsSchema, s.totals, s.candidates, s.region, + )); err != nil { + return errors.WithStack(err) + } + return nil + }) +} + +// doStuff selects a random selection of candidates, distributes the +// requested number of votes across them, and inserts the ballots. +func (s *schema) doStuff(ctx context.Context, votes int) error { + numCandidates := rand.Intn(votes) + 1 // Intn [0,n) + + winners := make([]uuid.UUID, 0, numCandidates) + // Iteration over a map is random enough for our purposes. + for id := range s.candidateIds { + winners = append(winners, id) + if len(winners) == numCandidates { + break + } + } + + voteAllocation := make(map[uuid.UUID]int) + for i := 0; i < votes; i++ { + winnerIdx := i % len(winners) + voteAllocation[winners[winnerIdx]]++ + } + + ballotQ := fmt.Sprintf(`INSERT INTO %s (candidate) +SELECT candidate FROM +(SELECT $1::UUID candidate, generate_series(1, $2))`, s.ballots) + totalQ := fmt.Sprintf(`INSERT INTO %s AS tbl (candidate, total) +VALUES ($1, $2) +ON CONFLICT(candidate) +DO UPDATE SET total=tbl.total+excluded.total`, s.totals) + + return retry.Retry(ctx, func(ctx context.Context) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = tx.Rollback() }() + + for candidate, count := range voteAllocation { + if _, err := tx.ExecContext(ctx, totalQ, candidate, count); err != nil { + return errors.WithStack(err) + } + if _, err := tx.ExecContext(ctx, ballotQ, candidate, count); err != nil { + return errors.WithStack(err) + } + } + return errors.WithStack(tx.Commit()) + }) +} + +func (s *schema) ensureCandidates(ctx context.Context, count int) error { + seed := int64(0) + rnd := rand.New(rand.NewSource(seed)) + + nextMood := func() string { + return moods[rnd.Intn(len(moods))] + } + nextName := func(deconflict int) string { + return fmt.Sprintf("%s %s %s (%d)", + names[rnd.Intn(len(names))], + connectors[rnd.Intn(len(connectors))], + epithets[rnd.Intn(len(epithets))], + deconflict) + } + + // Rows are inserted with deterministic ids. + q := fmt.Sprintf(`UPSERT INTO %s (candidate, name, mood) +VALUES (uuid_generate_v5('455E049E-54B6-41C9-BBCE-1587CC394851', $1), $1, $2) +RETURNING candidate`, s.candidates) + + for i := 0; i < count; i++ { + name := nextName(i) + mood := nextMood() + if err := retry.Retry(ctx, func(ctx context.Context) error { + var id uuid.UUID + if err := s.db.QueryRowContext(ctx, q, name, mood).Scan(&id); err != nil { + return errors.WithStack(err) + } + s.candidateIds[id] = struct{}{} + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (s *schema) validate(ctx context.Context, aost bool) ([]string, error) { + asOf := "" + if aost { + asOf = "AS OF SYSTEM TIME follower_read_timestamp()" + } + + q := fmt.Sprintf(` + WITH counted AS (SELECT candidate, count(*) AS count FROM %s GROUP BY candidate), + verify AS ( + SELECT candidate, + IFNULL(counted.count, 0) expected, + IFNULL(totals.total, 0) actual + FROM counted FULL JOIN %s USING (candidate) + ) +SELECT candidate, expected, actual, name + FROM verify + JOIN %s USING (candidate) + %s + WHERE expected != actual`, s.ballots, s.totals, s.candidates, asOf) + + var ret []string + err := retry.Retry(ctx, func(ctx context.Context) error { + ret = nil // Reset if looping. + + rows, err := s.db.QueryContext(ctx, q) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var id uuid.UUID + var expected, actual int + var name string + if err := rows.Scan(&id, &expected, &actual, &name); err != nil { + return errors.WithStack(err) + } + ret = append(ret, fmt.Sprintf("%s: expected %d had %d (%s)", + id, expected, actual, name)) + } + // Final error check. + return errors.WithStack(rows.Err()) + }) + return ret, err +} diff --git a/internal/cmd/votr/schema_test.go b/internal/cmd/votr/schema_test.go new file mode 100644 index 000000000..5e0610604 --- /dev/null +++ b/internal/cmd/votr/schema_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package votr + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/stretchr/testify/require" +) + +func TestSchema(t *testing.T) { + const numCandidates = 128 + r := require.New(t) + + fixture, cancel, err := base.NewFixture() + r.NoError(err) + defer cancel() + + ctx := fixture.Context + + if fixture.TargetPool.Product != types.ProductCockroachDB { + return + } + + // Steal the enclosing DATABASE name, since we're basically running + // the votr init command. + enclosingDB := fixture.SourceSchema.Schema().Idents(nil)[0] + sch := newSchema(fixture.SourcePool.DB, enclosingDB, 0) + + // Set up the schema, insert some votes, and ensure that everything + // is consistent. + r.NoError(sch.create(ctx)) + r.NoError(sch.ensureCandidates(ctx, numCandidates)) + for i := 0; i < 10; i++ { + r.NoError(sch.doStuff(ctx, 10)) + } + failures, err := sch.validate(ctx, false) + r.NoError(err) + r.Empty(failures) + + // Break the totals table. + _, err = fixture.SourcePool.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET total=total+1 WHERE true`, sch.totals)) + r.NoError(err) + failures, err = sch.validate(ctx, false) + r.NoError(err) + r.NotEmpty(failures) +} diff --git a/internal/cmd/votr/script/votr.ts b/internal/cmd/votr/script/votr.ts new file mode 100644 index 000000000..cc76e6d8a --- /dev/null +++ b/internal/cmd/votr/script/votr.ts @@ -0,0 +1,88 @@ +/* + * Copyright 2023 The Cockroach Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as api from "cdc-sink@v1"; + +const destination: number = DESTINATION_INDEX; + +// If the document has already passed through the destination, we don't +// want it to continue looping around. Since this decision can be made +// without loading the row in the destination, we can use the per-table +// map function, rather than filtering in the merge function. +const filterWhence = (doc: api.Document) => doc.src !== destination ? doc : null; + +// vectorMerge is a helper function to admit an incoming mutation based +// on its vector clock versus the destination's knowledge of the clock. +const vectorMerge = + (fallback?: api.MergeFunction): api.MergeFunction => + (op: api.MergeOperation): api.MergeResult => { + // Replay prevention: Don't process messages from older + // clock values in the src column. + let src: number = op.proposed.src; + if (src === undefined) { + throw new Error("src column missing"); + } + let incomingClock: string = op.proposed.whence[src] ?? 0; + let existingClock: string = op.target.whence[src] ?? 0; + if (incomingClock <= existingClock) { + return {drop: true}; + } + + // Use the target's view of the vector clock, but update it + // with the mutation source's value. + op.proposed.whence = op.target.whence; + op.proposed.whence[src] = incomingClock; + + // Absent a fallback, act like last-one-wins. + if (!fallback) { + return {apply: op.proposed}; + } + + // Delegate to specialty logic. + return fallback(op); + }; + + +api.configureTable(`votr_${destination}.public.ballots`, { + cas: ["xyzzy"], + map: filterWhence, + merge: vectorMerge(), +}); + +api.configureTable(`votr_${destination}.public.candidates`, { + cas: ["xyzzy"], + map: filterWhence, + merge: vectorMerge(), +}); + +api.configureTable(`votr_${destination}.public.totals`, { + cas: ["xyzzy"], + map: filterWhence, + merge: vectorMerge((op: api.MergeOperation): api.MergeResult => { + // Apply a delta based on before and proposed values. + let a: number = op.proposed.total ?? 0; + let b: number = op.before?.total ?? 0; + let delta = a - b; + if (delta === 0) { + return {drop: true}; + } + + op.proposed.total = op.target.total + delta; + return {apply: op.proposed}; + }), +}) \ No newline at end of file diff --git a/internal/cmd/votr/votr.go b/internal/cmd/votr/votr.go new file mode 100644 index 000000000..0f22b14b4 --- /dev/null +++ b/internal/cmd/votr/votr.go @@ -0,0 +1,499 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package votr contains a demonstration workload. +package votr + +import ( + "context" + "embed" + "fmt" + "math" + "math/rand" + "net" + "net/url" + "os" + "os/signal" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/cdc" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/source/server" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/retry" + "github.com/cockroachdb/cdc-sink/internal/util/stdpool" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/cockroachdb/cdc-sink/internal/util/subfs" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +type region int + +func (r region) String() string { + return strconv.Itoa(int(r)) +} + +type config struct { + BallotBatch int + Candidates int + Connect []string + DrainDelay time.Duration + Enclosing ident.Ident + ReportInterval time.Duration + SinkHost string + StopAfter time.Duration + ValidationDelay time.Duration + WorkerDelay time.Duration + Workers int +} + +func (c *config) Bind(f *pflag.FlagSet) { + f.IntVar(&c.BallotBatch, "ballotBatch", 10, + "the number of ballots to record in a single batch") + f.IntVar(&c.Candidates, "candidates", 16, + "the number of candidate rows") + f.StringSliceVar(&c.Connect, "connect", + []string{ + "postgresql://root@localhost:26257/?sslmode=disable", + "postgresql://root@localhost:26258/?sslmode=disable", + }, + "two or more CockroachDB connection strings") + f.DurationVar(&c.DrainDelay, "drainDelay", 10*time.Second, + "pause between stopping workload and stopping cdc-sink processes") + f.Var(ident.NewValue("votr", &c.Enclosing), "schema", + "the enclosing database schema") + f.DurationVar(&c.ReportInterval, "reportAfter", 5*time.Second, + "report number of ballots inserted") + f.StringVar(&c.SinkHost, "sinkHost", "127.0.0.1", + "the hostname to use when creating changefeeds") + f.DurationVar(&c.StopAfter, "stopAfter", 0, + "if non-zero, exit after running for this long") + f.DurationVar(&c.ValidationDelay, "validationDelay", 15*time.Second, + "sleep time between validation cycles") + f.DurationVar(&c.WorkerDelay, "workerDelay", 100*time.Millisecond, + "sleep time between ballot stuffing") + f.IntVar(&c.Workers, "workers", 8, + "the number of concurrent ballot stuffers") +} + +// Command returns the VOTR workload. +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "votr", + Short: "a workload to demonstrate async, two-way replication", + } + cmd.AddCommand(commandInit(), commandRun()) + return cmd +} + +func commandInit() *cobra.Command { + cfg := &config{} + cmd := &cobra.Command{ + Use: "init", + Short: "initialize the VOTR schema", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + if len(cfg.Connect) < 2 { + return errors.New("at least two connection strings are required") + } + + schemas := make([]*schema, 0, len(cfg.Connect)) + cancels := make([]func(), 0, len(cfg.Connect)) + defer func() { + for _, cancel := range cancels { + cancel() + } + }() + + for idx, conn := range cfg.Connect { + sch, cancel, err := openSchema(ctx, cfg, region(idx)) + if err != nil { + return errors.Wrapf(err, "could not connect to region %d at %s", idx, conn) + } + schemas = append(schemas, sch) + cancels = append(cancels, cancel) + } + + for _, sch := range schemas { + if err := sch.create(ctx); err != nil { + return errors.Wrapf(err, "could not create VOTR schema in %s", sch.region) + } + } + + return nil + }, + } + cfg.Bind(cmd.Flags()) + return cmd +} + +func commandRun() *cobra.Command { + cfg := &config{} + cmd := &cobra.Command{ + Use: "run", + Short: "run the VOTR workload", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + if len(cfg.Connect) < 2 { + return errors.New("at least two connection strings are required") + } + + // Create a detached stopper so we can control shutdown. + workerStopper := stopper.WithContext(context.Background()) + + // This goroutine will stop the workload in response to + // being interrupted or if the test duration has elapsed. + workerStopper.Go(func() error { + delay := cfg.StopAfter + if delay == 0 { + delay = math.MaxInt64 + } + select { + case <-workerStopper.Stopping(): + case <-cmd.Context().Done(): + log.Infof("shutdown signal received, interrupt again to quit") + signal.Reset(os.Interrupt) + case <-time.After(delay): + log.Info("stopping workload after configured time") + } + workerStopper.Stop(30 * time.Second) + return nil + }) + + // Run the requested number of workers. + log.Infof("inserting with %d workers across %d candidates", cfg.Workers, cfg.Candidates) + if cfg.WorkerDelay > 0 { + log.Infof("theoretical max regional ballots per reporting interval: %d", + int64(cfg.Workers)*int64(cfg.BallotBatch)* + cfg.ReportInterval.Nanoseconds()/cfg.WorkerDelay.Nanoseconds()) + log.Info("low performance may indicate contention due to too few candidates") + } + + schemas := make([]*schema, 0, len(cfg.Connect)) + cancels := make([]func(), 0, len(cfg.Connect)) + defer func() { + // Cancel in reverse order. + for i := len(cancels) - 1; i >= 0; i-- { + cancels[i]() + } + }() + + for idx := range cfg.Connect { + sch, cancel, err := worker(workerStopper, cfg, region(idx)) + if err != nil { + return err + } + schemas = append(schemas, sch) + cancels = append(cancels, cancel) + } + + // Create a connection between the hub and each of the + // spokes. We could offer additional topologies, such as a + // uni- or bi-directional ring or a fully-connected graph. + hubSch := schemas[0] + schemas = schemas[1:] + + // We want to run the servers with their own lifecycle in + // order to allow in-flight mutations a chance to drain. + svrStopper := stopper.WithContext(context.Background()) + + for idx, leafSch := range schemas { + toLeaf, cancel, err := startServer(svrStopper, cfg, hubSch, leafSch) + if err != nil { + return errors.Wrapf(err, "starting server %s -> %s", hubSch.region, leafSch.region) + } + cancels = append(cancels, cancel) + + // Each leaf needs its own server that will write + // updates to the hub until such time as cdc-sink can + // support multiple different changefeeds writing to the + // same destination schema. See + // https://github.com/cockroachdb/cockroach/issues/112880 + // for a way that this might become trivially easy. + toHub, cancel, err := startServer(svrStopper, cfg, leafSch, hubSch) + if err != nil { + return errors.Wrapf(err, "starting server %s -> %s", leafSch.region, hubSch.region) + } + cancels = append(cancels, cancel) + + if err := createFeed(workerStopper, hubSch, toLeaf); err != nil { + return errors.Wrapf(err, "feed %s -> %s", hubSch.region, toLeaf) + } + if err := createFeed(workerStopper, schemas[idx], toHub); err != nil { + return errors.Wrapf(err, "feed %s -> %s", hubSch.region, toHub) + } + } + + // Wait for the workers to be shut down. + _ = workerStopper.Wait() + + log.Infof("workload stopped, pausing for %s to drain", cfg.DrainDelay) + time.Sleep(cfg.DrainDelay) + svrStopper.Stop(5 * time.Second) + _ = svrStopper.Wait() + return nil + }, + } + cfg.Bind(cmd.Flags()) + return cmd +} + +// createFeed creates a changefeed from the given source to the given +// server. +func createFeed(ctx *stopper.Context, from *schema, to *url.URL) error { + // Set the cluster settings once, if we need to. + var enabled bool + if err := retry.Retry(ctx, func(ctx context.Context) error { + return from.db.QueryRowContext(ctx, "SHOW CLUSTER SETTING kv.rangefeed.enabled").Scan(&enabled) + }); err != nil { + return errors.Wrap(err, "could not check cluster setting") + } + if !enabled { + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, "SET CLUSTER SETTING kv.rangefeed.enabled = true") + return errors.Wrapf(err, "%s: could not enable rangefeeds", from.region) + }); err != nil { + return err + } + } + + lic, hasLic := os.LookupEnv("COCKROACH_DEV_LICENSE") + org, hasOrg := os.LookupEnv("COCKROACH_DEV_ORGANIZATION") + if hasLic && hasOrg { + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, "SET CLUSTER SETTING enterprise.license = $1", lic) + return errors.Wrapf(err, "%s: could not set cluster license", from.region) + }); err != nil { + return err + } + + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, "SET CLUSTER SETTING cluster.organization = $1", org) + return errors.Wrapf(err, "%s: could not set cluster organization", from.region) + }); err != nil { + return err + } + } + + q := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE %s, %s, %s INTO '%s' +WITH diff, updated, resolved='1s', min_checkpoint_frequency='1s', +webhook_sink_config='{"Flush":{"Messages":1000,"Frequency":"1s"}}'`, + from.ballots, from.candidates, from.totals, to.String(), + ) + + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, q) + return errors.Wrapf(err, "%s: could not create changefeed", from.region) + }); err != nil { + return err + } + + log.Infof("created feed from %s into %s", from.region, to) + return nil +} + +func openSchema(ctx context.Context, cfg *config, r region) (*schema, func(), error) { + conn := cfg.Connect[r] + + pool, cancel, err := stdpool.OpenPgxAsTarget(ctx, conn, + stdpool.WithConnectionLifetime(5*time.Minute), + stdpool.WithPoolSize(cfg.Workers+1), + stdpool.WithTransactionTimeout(time.Minute), + ) + if err != nil { + return nil, nil, errors.Wrapf(err, "could not connect to %s database", r) + } + + return newSchema(pool.DB, cfg.Enclosing, r), cancel, nil +} + +//go:embed script/* +var scriptFS embed.FS + +// startServer runs an instance of cdc-sink which will feed into +// the given destination. It returns the base URL for delivering +// messages to the sink. +func startServer(ctx *stopper.Context, cfg *config, src, dest *schema) (*url.URL, func(), error) { + targetConn := cfg.Connect[dest.region] + + stagingName := ident.New(fmt.Sprintf("cdc_sink_%d_%s_%s", os.Getpid(), src.region, dest.region)) + stagingSchema := ident.MustSchema(dest.enclosing, stagingName) + + if _, err := dest.db.ExecContext(ctx, fmt.Sprintf( + `CREATE SCHEMA IF NOT EXISTS %s`, stagingSchema)); err != nil { + return nil, nil, errors.Wrap(err, dest.region.String()) + } + + srvConfig := &server.Config{ + CDC: cdc.Config{ + BaseConfig: logical.BaseConfig{ + BackfillWindow: 0, + ForeignKeysEnabled: true, + RetryDelay: 10 * time.Second, + ScriptConfig: script.Config{ + FS: &subfs.SubstitutingFS{ + FS: scriptFS, + Replacer: strings.NewReplacer( + "DESTINATION_INDEX", dest.region.String(), + "SOURCE_INDEX", src.region.String(), + ), + }, + MainPath: "/script/votr.ts", + }, + StagingConn: targetConn, + StagingSchema: stagingSchema, + TargetConn: targetConn, + }, + FlushEveryTimestamp: true, // Needed for delta behavior. + MetaTableName: ident.New("resolved_timestamps"), + RetireOffset: 24 * time.Hour, // For debugging. + }, + BindAddr: ":0", + DisableAuth: true, + GenerateSelfSigned: true, + } + if err := srvConfig.Preflight(); err != nil { + return nil, nil, errors.Wrap(err, dest.region.String()) + } + srv, cancel, err := server.NewServer(ctx, srvConfig) + if err != nil { + return nil, nil, errors.Wrap(err, dest.region.String()) + } + _, port, err := net.SplitHostPort(srv.GetAddr().String()) + if err != nil { + cancel() + return nil, nil, errors.Wrap(err, dest.region.String()) + } + + sink := &url.URL{ + Scheme: "webhook-https", + Host: fmt.Sprintf("%s:%s", cfg.SinkHost, port), + Path: ident.Join(dest.candidates.Schema(), ident.Raw, '/'), + RawQuery: "insecure_tls_skip_verify=true", + } + + return sink, cancel, nil +} + +// worker will launch a number of goroutines into the context to insert +// ballots and to verify the consistency of the dataset. +func worker(ctx *stopper.Context, cfg *config, r region) (*schema, func(), error) { + sch, closeDB, err := openSchema(ctx, cfg, r) + if err != nil { + return nil, nil, err + } + + if err := sch.ensureCandidates(ctx, cfg.Candidates); err != nil { + closeDB() + return nil, nil, errors.Wrapf(err, "%s: could not create candidate entries", r) + } + + warnings, err := sch.validate(ctx, false) + if err != nil { + closeDB() + return nil, nil, errors.Wrapf(err, "%s: could not perform initial validation", r) + } + if len(warnings) > 0 { + log.WithField( + "inconsistent", warnings, + ).Warnf("%s: workload starting from inconsistent state", r) + } + + // Don't run the workload on the hub if we're in a hub-and-spoke + // model. The ON UPDATE clause in the table definitions isn't able + // to refer to the existing value in the column. Thus, we cannot + // transparently patch the vector clock without making the workload + // itself aware of the vector clock column. + // + // In a two-region setup, we only need to be able to prevent a + // mutation from cycling between the regions, so the vector clock is + // an over-complication. + if len(cfg.Connect) == 2 || r > 0 { + ballotsInserted := &atomic.Int64{} + for i := 0; i < cfg.Workers; i++ { + ctx.Go(func() error { + // Stagger start by a random amount. + sleep := time.Duration(rand.Int63n(cfg.WorkerDelay.Nanoseconds())) + + for { + select { + case <-time.After(sleep): + sleep = cfg.WorkerDelay + case <-ctx.Stopping(): + return nil + case <-ctx.Done(): + return ctx.Err() + } + + if err := sch.doStuff(ctx, cfg.BallotBatch); err != nil { + return errors.Wrap(err, "could not stuff ballots") + } + ballotsInserted.Add(int64(cfg.BallotBatch)) + } + }) + } + + // Print status. + ctx.Go(func() error { + for { + select { + case <-time.After(cfg.ReportInterval): + log.Infof("%s: inserted %d ballots", r, ballotsInserted.Swap(0)) + case <-ctx.Stopping(): + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + // Start a background validation loop. + ctx.Go(func() error { + for { + select { + case <-time.After(cfg.ValidationDelay): + case <-ctx.Stopping(): + return nil + case <-ctx.Done(): + return ctx.Err() + } + + warnings, err := sch.validate(ctx, true) + if err != nil { + return errors.Wrap(err, "could not validate results") + } + if len(warnings) == 0 { + log.Infof("%s: workload is consistent", r) + continue + } + log.WithField( + "inconsistent", warnings, + ).Warnf("%s: workload in inconsistent state", r) + } + }) + + return sch, closeDB, nil +} diff --git a/internal/target/apply/apply.go b/internal/target/apply/apply.go index b2188ddd9..35f9c1498 100644 --- a/internal/target/apply/apply.go +++ b/internal/target/apply/apply.go @@ -188,7 +188,7 @@ func (a *apply) Apply(ctx context.Context, tx types.TargetQuerier, muts []types. if err != nil { a.errors.Inc() } - return err + return errors.Wrap(err, a.target.Raw()) } a.mu.RLock() @@ -543,7 +543,15 @@ func (a *apply) upsertBagsLocked( // Copy the conflicting data from the table into the Conflict. for idx, col := range a.mu.templates.Columns { - c.Target.Put(col.Name, blockingData[idx]) + v := blockingData[idx] + // Allow runtime type fixups + if col.Parse != nil { + v, err = col.Parse(v) + if err != nil { + return errors.Wrapf(err, "could not invoke type helper on column %s", col.Name) + } + } + c.Target.Put(col.Name, v) } // Supply before data if we received it from upstream. diff --git a/internal/target/schemawatch/parse_helpers.go b/internal/target/schemawatch/parse_helpers.go index 8ab74f379..39272eeda 100644 --- a/internal/target/schemawatch/parse_helpers.go +++ b/internal/target/schemawatch/parse_helpers.go @@ -121,10 +121,26 @@ func coerceJSON(a any) (any, error) { return json.Marshal(a) } +// reifyJSON converts an incoming byte array to a reified type. +func reifyJSON(v any) (any, error) { + if buf, ok := v.([]byte); ok { + if err := json.Unmarshal(buf, &v); err != nil { + return nil, err + } + } + return v, nil +} + func parseHelper(product types.Product, typeName string) func(any) (any, error) { switch product { case types.ProductCockroachDB, types.ProductPostgreSQL: - // Just pass through, since we have similar representations. + switch typeName { + case "JSON", "JSONB": + // Ensure that data bound for a JSON column is reified. + return reifyJSON + default: + return nil + } case types.ProductMariaDB, types.ProductMySQL: // Coerce types to the what the mysql driver expects. switch typeName { diff --git a/main.go b/main.go index c26c2bf09..19146052f 100644 --- a/main.go +++ b/main.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/cmd/preflight" "github.com/cockroachdb/cdc-sink/internal/cmd/start" "github.com/cockroachdb/cdc-sink/internal/cmd/version" + "github.com/cockroachdb/cdc-sink/internal/cmd/votr" "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/util/logfmt" joonix "github.com/joonix/log" @@ -112,6 +113,7 @@ func main() { script.HelpCommand(), start.Command(), version.Command(), + votr.Command(), ) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)