-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This change overhauls the cdc-sink code to split it into well-defined packages and APIs. Notable functional changes: A cdc-sink endpoint now operates on an entire database schema (i.e. the second-level namespace) at once, since this is the use-case that has been most prominent in discussions around our preferred microservice architecture. For users that only ever use the "public" schema in their databases, this will handle whole-database use cases. An "immediate" mode is supported, which applies incoming data without waiting for resolved timestamps. This is intended for use when backfilling large datasets or if a high-volume changefeed must catch up after an outage. It is not expected to be the default configuration for cdc-sink. The cdc-sink code can now detect and optionally recover from a limited amount of structural schema drift between the source and target databases. The schema for each target database is held in memory and refreshed from time to time. Drift is checked during resolved-timestamp flushes, which will effectively pause a changefeed until the stored payloads are at least structurally-compatible with the target tables. The new webhook-https:// scheme in CockroachDB v21.2 is now supported. This necessitates that cdc-sink can use a TLS-enabled HTTP server. The option now exists to load a certificate and private key from disk. For testing purposes, a self-signed certifcate can be internally generated by cdc-sink. Notes to reviewers: The internal/types package defines interfaces for the major moving parts of the revised cdc-sink code base. At present, each interface is implemented by a single type, but having small APIs has been useful in identifying the independent parts of cdc-sink. Similarly, the package structure may be overly fine-grained in favor of identifying specific, small portions of code that compose easily. A comment at the top of most files indicates where code was repackaged from. Recommended package review order: types, source/cdc, target/stage, target/apply, target/timestamp
- Loading branch information
Showing
53 changed files
with
4,440 additions
and
3,740 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
// Copyright 2021 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
// Package cdc contains a http.Handler which can receive | ||
// webhook events from a CockroachDB CDC changefeed. | ||
package cdc | ||
|
||
import ( | ||
"log" | ||
"net/http" | ||
"strconv" | ||
|
||
"github.com/cockroachdb/cdc-sink/internal/types" | ||
"github.com/jackc/pgx/v4/pgxpool" | ||
) | ||
|
||
// This file contains code repackaged from main.go | ||
|
||
// ImmediateParam is the name of a query parameter that will place a request | ||
// into "immediate" mode. In this mode, mutations are written directly | ||
// to the target table and resolved timestamps are ignored. | ||
const ImmediateParam = "immediate" | ||
|
||
// Handler is an http.Handler for processing webhook requests | ||
// from a CockroachDB changefeed. | ||
type Handler struct { | ||
Appliers types.Appliers // Update tables within TargetDb. | ||
Pool *pgxpool.Pool // Access to the target cluster. | ||
Stores types.Stagers // Record incoming json blobs. | ||
Swapper types.TimeKeeper // Tracks named timestamps. | ||
Watchers types.Watchers // Schema data. | ||
} | ||
|
||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
ctx := r.Context() | ||
|
||
sendErr := func(err error) { | ||
if err == nil { | ||
http.Error(w, "OK", http.StatusOK) | ||
return | ||
} | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
log.Printf("ERROR %s:\n%v", r.RequestURI, err) | ||
} | ||
|
||
immediate := false | ||
if found := r.URL.Query().Get(ImmediateParam); found != "" { | ||
var err error | ||
immediate, err = strconv.ParseBool(found) | ||
if err != nil { | ||
sendErr(err) | ||
return | ||
} | ||
} | ||
|
||
if ndjson, err := parseNdjsonURL(r.URL.Path); err == nil { | ||
sendErr(h.ndjson(ctx, ndjson, immediate, r.Body)) | ||
return | ||
} | ||
if resolved, err := parseResolvedURL(r.URL.Path); err == nil { | ||
sendErr(h.resolved(ctx, resolved, immediate)) | ||
return | ||
} | ||
if webhook, err := parseWebhookURL(r.URL.Path); err == nil { | ||
if r.Method == "POST" && r.Header.Get("content-type") == "application/json" { | ||
sendErr(h.webhook(ctx, webhook, immediate, r.Body)) | ||
return | ||
} | ||
} | ||
|
||
http.NotFound(w, r) | ||
} |
Oops, something went wrong.