Skip to content

Commit

Permalink
apply: Support merge function for CockroachDB target
Browse files Browse the repository at this point in the history
This change is part of #487 to support three-way merges.

This change updates the apply package to call into the merge function when
targeting CockroachDB or PostgreSQL. The conditional-upsert SQL is extended to
return the index of the conflicting data and the contents of the blocking row.
The blocking and conflicting row data are then used to drive the merge function.

The merge API added in PR #534 is refined. The relevent types are extracted
into their own package, which contains a new "Bag" type. A Bag holds reified
properties and can represent the data in a mutation or in a database row. It
additonally classifies properties as being "mapped" or "unmapped" as to whether
or not the property maps onto a known column. Some of the bookkeeping previously
in the apply code to track missing or extra properties is simplified.

The upsert code also becomes recursive. Mutations are reified into Bags and are
applied. If a Bag generates a conflict, the merge function will be called to
produce a Bag that will be unconditionally applied. Once all conflicts have been
resolved, the accumulated Bags will be upserted by a recursive call to the
upsert method. There is only ever one level of recursion.
  • Loading branch information
bobvawter committed Oct 14, 2023
1 parent 1c00c4a commit efd16e4
Show file tree
Hide file tree
Showing 19 changed files with 921 additions and 223 deletions.
26 changes: 17 additions & 9 deletions internal/script/ident_map.go → internal/script/bag_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,42 @@ package script

import (
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/merge"
"github.com/dop251/goja"
)

// identMapWrapper adapts [ident.Map] to [goja.DynamicObject].
type identMapWrapper struct {
data *ident.Map[any] // Generally expected to be a type returned from the json package.
// bagWrapper adapts [merge.Bag] to [goja.DynamicObject].
type bagWrapper struct {
data *merge.Bag
rt *goja.Runtime
}

var _ goja.DynamicObject = (*identMapWrapper)(nil)
var _ goja.DynamicObject = (*bagWrapper)(nil)

func (m *identMapWrapper) Delete(key string) bool {
// Delete implements goja.DynamicObject. It always returns true, since
// we never reject any deletes.
func (m *bagWrapper) Delete(key string) bool {
m.data.Delete(ident.New(key))
return true
}

func (m *identMapWrapper) Get(key string) goja.Value {
// Get implements goja.DynamicObject.
func (m *bagWrapper) Get(key string) goja.Value {
v, ok := m.data.Get(ident.New(key))
if !ok {
return goja.Undefined()
}
return m.rt.ToValue(v)
}

func (m *identMapWrapper) Has(key string) bool {
// Has implements goja.DynamicObject.
func (m *bagWrapper) Has(key string) bool {
_, ok := m.data.Get(ident.New(key))
return ok
}

func (m *identMapWrapper) Keys() []string {
// Keys implements goja.DynamicObject.
func (m *bagWrapper) Keys() []string {
ret := make([]string, 0, m.data.Len())
// Ignoring error since callback returns nil.
_ = m.data.Range(func(k ident.Ident, v any) error {
Expand All @@ -57,7 +63,9 @@ func (m *identMapWrapper) Keys() []string {
return ret
}

func (m *identMapWrapper) Set(key string, val goja.Value) bool {
// Set implements goja.DynamicObject. It always returns true since we
// never reject any updates.
func (m *bagWrapper) Set(key string, val goja.Value) bool {
m.data.Put(ident.New(key), val.Export())
return true
}
6 changes: 3 additions & 3 deletions internal/script/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ type mapJS func(

// A mergeOp is the input to the user-provided merge function.
type mergeOp struct {
Before goja.Value `goja:"before"` // Backed by identMapWrapper. Nil in 2-way case.
Before goja.Value `goja:"before"` // Backed by bagWrapper. Nil in 2-way case.
Meta map[string]any `goja:"meta"` // Equivalent to dispatch() or map() meta.
Existing goja.Value `goja:"existing"` // Backed by identMapWrapper.
Proposed goja.Value `goja:"proposed"` // Backed by identMapWrapper.
Existing goja.Value `goja:"existing"` // Backed by bagWrapper.
Proposed goja.Value `goja:"proposed"` // Backed by bagWrapper.
}

// A mergeResult is returned by the user-provided merge function.
Expand Down
41 changes: 21 additions & 20 deletions internal/script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/util/applycfg"
"github.com/cockroachdb/cdc-sink/internal/util/diag"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/merge"
"github.com/dop251/goja"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -344,70 +345,70 @@ func (s *UserScript) bindMap(table ident.Table, mapper mapJS) Map {
}
}

// bindMerge exports a user-provided function as a [types.MergeFunc].
func (s *UserScript) bindMerge(table ident.Table, merger mergeJS) types.MergeFunc {
return func(ctx context.Context, tx types.TargetQuerier, con *types.Conflict) (*types.Resolution, bool, error) {
// bindMerge exports a user-provided function as a [merge.Func].
func (s *UserScript) bindMerge(table ident.Table, merger mergeJS) merge.Func {
return func(ctx context.Context, con *merge.Conflict) (*merge.Resolution, error) {
// con.Before will be nil in 2-way merge.
if con.Existing == nil {
return nil, false, errors.New("nil value in Conflict.Existing")
return nil, errors.New("nil value in Conflict.Existing")
}
if con.Proposed == nil {
return nil, false, errors.New("nil value in Conflict.Proposed")
return nil, errors.New("nil value in Conflict.Proposed")
}
var jsResult *mergeResult
err := s.execJS(ctx, func() error {
op := &mergeOp{
Meta: con.Mutation.Meta,
Existing: s.rt.NewDynamicObject(&identMapWrapper{con.Existing, s.rt}),
Proposed: s.rt.NewDynamicObject(&identMapWrapper{con.Proposed, s.rt}),
Meta: con.Proposed.Meta,
Existing: s.rt.NewDynamicObject(&bagWrapper{con.Existing, s.rt}),
Proposed: s.rt.NewDynamicObject(&bagWrapper{con.Proposed, s.rt}),
}
if con.Before != nil {
op.Before = s.rt.NewDynamicObject(&identMapWrapper{con.Before, s.rt})
op.Before = s.rt.NewDynamicObject(&bagWrapper{con.Before, s.rt})
}
var err error
jsResult, err = merger(op)
return err
})
if err != nil {
return nil, false, err
return nil, err
}

if jsResult == nil {
return nil, false, errors.Errorf(
return nil, errors.Errorf(
"%s: merge function did not return a MergeResult object", table.Raw())
}
if jsResult.Drop {
return nil, false, nil
return &merge.Resolution{Drop: true}, nil
}
// Copy the proposed data out to a DLQ.
if jsResult.DLQ != "" {
return &types.Resolution{
return &merge.Resolution{
DLQ: jsResult.DLQ,
}, true, nil
}, nil
}
// By the pigeonhole principle, the user made an error.
if jsResult.Apply == nil {
return nil, false, errors.Errorf(
return nil, errors.Errorf(
"%s: merge function did not return a well-formed MergeResult", table.Raw())
}

// See goja.Object.Export for discussion about the returned type.
switch t := jsResult.Apply.Export().(type) {
case *identMapWrapper:
case *bagWrapper:
// The user returned one of the input wrappers, so we can
// just unwrap and return it.
return &types.Resolution{Apply: t.data}, true, nil
return &merge.Resolution{Apply: t.data}, nil

case map[string]any:
// The user returned a new JS object.
out := &ident.Map[any]{}
out := merge.NewBagFrom(con.Proposed)
for k, v := range t {
out.Put(ident.New(k), v)
}
return &types.Resolution{Apply: out}, true, nil
return &merge.Resolution{Apply: out}, nil

default:
return nil, false, errors.Errorf("%s: unexpected type of apply value: %T", table.Raw(), t)
return nil, errors.Errorf("%s: unexpected type of apply value: %T", table.Raw(), t)
}
}
}
Expand Down
41 changes: 15 additions & 26 deletions internal/script/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/applycfg"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/merge"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -155,16 +156,12 @@ func TestScript(t *testing.T) {
}

if merger := cfg.Merger; a.NotNil(merger) {
result, ok, err := merger.Merge(context.Background(), nil, &types.Conflict{
Mutation: types.Mutation{
Data: []byte(`{"hello":"world!"}`),
},
Before: ident.MapOf[any]("val", 1),
Existing: ident.MapOf[any]("val", 40),
Proposed: ident.MapOf[any]("val", 3),
result, err := merger.Merge(context.Background(), &merge.Conflict{
Before: merge.NewBagOf(nil, nil, "val", 1),
Existing: merge.NewBagOf(nil, nil, "val", 40),
Proposed: merge.NewBagOf(nil, nil, "val", 3),
})
a.NoError(err)
a.True(ok)
if a.NotNil(result.Apply) {
if v, ok := result.Apply.Get(ident.New("val")); a.True(ok) {
a.Equal(int64(42), v)
Expand All @@ -185,34 +182,26 @@ func TestScript(t *testing.T) {
tbl = ident.NewTable(schema, ident.New("merge_dlq_all"))
if cfg := s.Targets.GetZero(tbl); a.NotNil(cfg) {
if merger := cfg.Merger; a.NotNil(merger) {
result, ok, err := merger.Merge(context.Background(), nil, &types.Conflict{
Mutation: types.Mutation{
Data: []byte(`{"hello":"world!"}`),
},
Before: ident.MapOf[any]("val", 1),
Existing: ident.MapOf[any]("val", 0),
Proposed: ident.MapOf[any]("val", 2),
result, err := merger.Merge(context.Background(), &merge.Conflict{
Before: merge.NewBagOf(nil, nil, "val", 1),
Existing: merge.NewBagOf(nil, nil, "val", 0),
Proposed: merge.NewBagOf(nil, nil, "val", 2),
})
a.NoError(err)
a.True(ok)
a.Nil(result.Apply)
a.Equal("dead", result.DLQ)
a.Equal(&merge.Resolution{DLQ: "dead"}, result)
}
}

tbl = ident.NewTable(schema, ident.New("merge_drop_all"))
if cfg := s.Targets.GetZero(tbl); a.NotNil(cfg) {
if merger := cfg.Merger; a.NotNil(merger) {
_, ok, err := merger.Merge(context.Background(), nil, &types.Conflict{
Mutation: types.Mutation{
Data: []byte(`{"hello":"world!"}`),
},
Before: ident.MapOf[any]("val", 1),
Existing: ident.MapOf[any]("val", 0),
Proposed: ident.MapOf[any]("val", 2),
result, err := merger.Merge(context.Background(), &merge.Conflict{
Before: merge.NewBagOf(nil, nil, "val", 1),
Existing: merge.NewBagOf(nil, nil, "val", 0),
Proposed: merge.NewBagOf(nil, nil, "val", 2),
})
a.NoError(err)
a.False(ok)
a.Equal(&merge.Resolution{Drop: true}, result)
}
}
}
Loading

0 comments on commit efd16e4

Please sign in to comment.