Skip to content

Commit

Permalink
Merge pull request #10 from ProlificLabs/feature/merge_strategies
Browse files Browse the repository at this point in the history
Feature/merge strategies
  • Loading branch information
dan-pulley committed Aug 3, 2023
2 parents 0188dca + 589cdbc commit b234d53
Show file tree
Hide file tree
Showing 9 changed files with 1,094 additions and 194 deletions.
54 changes: 2 additions & 52 deletions clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package datapasta
import (
"context"
"fmt"
"log"
"strings"
)

Expand Down Expand Up @@ -147,61 +146,12 @@ func Download(ctx context.Context, db Database, startTable, startColumn string,
return nil, debugging, err
}
}

return cloneInOrder, debugging, nil
}

// Upload uploads, in naive order, every record in a dump.
// It mutates the elements of `dump`, so you can track changes (for example new primary keys).
func Upload(ctx context.Context, db Database, dump DatabaseDump) error {
fkm := NewForeignKeyMapper(db)
return db.Insert(fkm, dump...)
}

type ForeignKeyMapper func(row map[string]any) func()

// NewForeignKeyMapper returns a function that will update foreign key references in a row to their new values.
// each update returns a function that must be called after the row has been updated with new primary keys.
func NewForeignKeyMapper(db Database) ForeignKeyMapper {
changes := make(map[string]map[any]any)

for _, fk := range db.ForeignKeys() {
changes[fk.BaseTable+"."+fk.BaseCol] = map[any]any{}
}

return func(row map[string]any) func() {
table := row[DumpTableKey].(string)
for k, v := range row {
for _, fk := range db.ForeignKeys() {
if fk.ReferencingTable != table || fk.ReferencingCol != k || v == nil || changes[fk.BaseTable+`.`+fk.BaseCol] == nil {
continue
}

newID, ok := changes[fk.BaseTable+`.`+fk.BaseCol][v]
if !ok {
log.Printf("unable to find mapped id for %s[%s]=%v in %s", table, k, v, fk.BaseTable)
} else {
row[k] = newID
}
}
}

copy := make(map[string]any, len(row))
for k, v := range row {
// does anyone care about this value?
if changes[table+`.`+k] == nil {
continue
}
copy[k] = v
}

return func() {
table := row[DumpTableKey].(string)
for k, v := range row {
if changes[table+"."+k] == nil {
continue
}
changes[table+"."+k][copy[k]] = v
}
}
}
return db.Insert(dump...)
}
20 changes: 15 additions & 5 deletions clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,38 @@ func (d testDB) SelectMatchingRows(tname string, conds map[string][]any) ([]map[
return nil, fmt.Errorf("no mock for %s where %#v", tname, conds)
}

func (d testDB) PrimaryKeys() map[string]string {
return nil
}

func (d testDB) InsertRecord(map[string]any) (any, error) { return nil, nil }

// apply the updates from the cols to the row
func (d testDB) Update(id datapasta.RecordID, cols map[string]any) error { return nil }

// delete the row
func (d testDB) Delete(id datapasta.RecordID) error { return nil }

func (d testDB) Mapping() ([]datapasta.Mapping, error) { return nil, nil }

// upload a batch of records
func (d testDB) Insert(fkm datapasta.ForeignKeyMapper, records ...map[string]any) error {
func (d testDB) Insert(records ...map[string]any) error {
for _, m := range records {
finish := fkm(m)
d.Logf("inserting %#v", m)

if m[datapasta.DumpTableKey] == "company" && m["id"] == 10 {
if m["api_key"] != "obfuscated" {
d.Errorf("didn't obfuscated company 9's api key, got %s", m["api_key"])
}
m["id"] = 11
finish()
continue
}
if m[datapasta.DumpTableKey] == "factory" && m["id"] == 23 {
m["id"] = 12
finish()
continue
}
if m[datapasta.DumpTableKey] == "product" && m["id"] == 5 {
m["id"] = 13
finish()
continue
}
return fmt.Errorf("unexpected insert: %#v", m)
Expand Down
4 changes: 1 addition & 3 deletions integrations/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func TestDatabaseImplementation(t *testing.T, db datapasta.Database, startTable,
old[k] = v
}

fkm := datapasta.NewForeignKeyMapper(db)
if err := db.Insert(fkm, found[0]); err != nil {
if err := db.Insert(found[0]); err != nil {
t.Fatalf("error inserting row: %s", err.Error())
return
}
Expand Down Expand Up @@ -62,4 +61,3 @@ func TestDatabaseImplementation(t *testing.T, db datapasta.Database, startTable,
return
}
}

61 changes: 54 additions & 7 deletions interface.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,77 @@
package datapasta

import (
"fmt"
"log"
)

// Database is the abstraction between the cloning tool and the database.
// The NewPostgres.NewClient method gives you an implementation for Postgres.
type Database interface {

// SelectMatchingRows must return unseen records.
// a Database can't be reused between clones, because it must do internal deduping.
// `conds` will be a map of columns and the values they can have.
SelectMatchingRows(tname string, conds map[string][]any) ([]map[string]any, error)


// insert one record, returning the new id
InsertRecord(record map[string]any) (any, error)

// apply the updates from the cols to the row
Update(id RecordID, cols map[string]any) error

// delete the row
Delete(id RecordID) error

// Insert uploads a batch of records.
// any changes to the records (such as newly generated primary keys) should mutate the record map directly.
// a Destination can't generally be reused between clones, as it may be inside a transaction.
// it's recommended that callers use a Database that wraps a transaction.
Insert(mapper ForeignKeyMapper, records ...map[string]any) error

//
// the records will have primary keys which must be handled.
// the Database is responsible for exposing the resulting primary key mapping in some manner.
Insert(records ...map[string]any) error

// Mapping must return whatever mapping has been created by prior Inserts.
// the implementation may internally choose to track this in the database or in memory.
Mapping() ([]Mapping, error)

// get foriegn key mapping
ForeignKeys() []ForeignKey

// get primary key mapping
PrimaryKeys() map[string]string
}

// ForeignKey contains every RERENCING column and the BASE column it refers to.
// This is used to recurse the database as a graph.
// This is used to recurse the database as a graph.
// Database implementations must provide a complete list of references.
type ForeignKey struct {
BaseTable string `json:"base_table"`
BaseCol string `json:"base_col"`
ReferencingTable string `json:"referencing_table"`
ReferencingCol string `json:"referencing_col"`
}
}

type RecordID struct {
Table string
PrimaryKey any
}

func (r RecordID) String() string {
return fmt.Sprintf(`%s(%v)`, r.Table, r.PrimaryKey)
}

func GetRowIdentifier(pks map[string]string, row map[string]any) RecordID {
table := row[DumpTableKey].(string)
pk, ok := row[pks[table]]
if !ok {
panic("unable to get row identifier")
}
return RecordID{Table: table, PrimaryKey: pk}
}

type Mapping struct {
RecordID
OriginalID any
}

var LogFunc = log.Printf

0 comments on commit b234d53

Please sign in to comment.