Skip to content

Commit

Permalink
Merge pull request #6 from ProlificLabs/feature/handle_self_referenti…
Browse files Browse the repository at this point in the history
…al_tables

self-rerential tables
  • Loading branch information
dan-pulley committed May 31, 2023
2 parents d993017 + 593ffac commit 15f7ee5
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 92 deletions.
154 changes: 64 additions & 90 deletions postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/jackc/pgx/v4"
)

const optLogPostgres = false
var OptLogPostgres = true

// NewPostgres returns a pgdb that can generate a Database for datapasta Upload and Download functions.
func NewPostgres(ctx context.Context, c Postgreser) (pgdb, error) {
Expand Down Expand Up @@ -55,18 +55,6 @@ type pgdb struct {
builder squirrel.StatementBuilderType
}

// NewClient creates a pgtx that can be used as a Database for Upload and Download.
// it is recommended you pass an open transaction, so you can control committing or rolling it back.
func (db pgdb) NewClient(ctx context.Context, tx Postgreser) (pgtx, error) {
return pgtx{
pgdb: db,
tx: postgresQueries{tx},
ctx: ctx,
found: map[string][]any{},
foundWithoutPK: map[any]bool{},
}, nil
}

func (db pgdb) ForeignKeys() []ForeignKey {
return db.fks
}
Expand All @@ -83,62 +71,26 @@ type pgtx struct {
foundWithoutPK map[any]bool
}

func (db pgtx) Insert(fkm ForeignKeyMapper, rows ...map[string]any) error {
for _, row := range rows {
finish := fkm(row)

table := row[DumpTableKey].(string)

pk := ""
if pkg, found := db.pkGroups[table]; found {
pk = pkg.ColumnName
}

builder := db.builder.Insert(`"` + table + `"`)
if pk != "" {
builder = builder.Suffix("RETURNING " + pk)
delete(row, pk)
}

keys := make([]string, 0, len(row))
vals := make([]any, 0, len(row))
for k, v := range row {
if k == DumpTableKey {
continue
}
vals = append(vals, v)
keys = append(keys, fmt.Sprintf(`"%s"`, k))
}

sql, args, err := builder.Columns(keys...).Values(vals...).ToSql()
if err != nil {
return fmt.Errorf(`build: %w, args: %s, sql: %s`, err, args, sql)
}

if pk != "" {
newPK := any(nil)
if err := db.tx.db.QueryRow(db.ctx, sql, args...).Scan(&newPK); err != nil {
for i, k := range keys {
log.Printf(`cloning error dump: %s, %T,%#v`, k, vals[i], vals[i])
}
return fmt.Errorf(`query: %w, args: %s, sql: %s`, err, args, sql)
}
row[pk] = newPK
finish()
continue
}

if _, err := db.tx.db.Exec(db.ctx, sql, args...); err != nil {
for i, k := range keys {
log.Printf(`cloning error dump: %s, %T,%#v`, k, vals[i], vals[i])
}
return fmt.Errorf(`query: %w, args: %s, sql: %s`, err, args, sql)
}
// NewBatchClient creates a batching client that can be used as a Database for Upload and Download.
// it is recommended you pass an open transaction, so you can control committing or rolling it back.
// This client is optimized for Postgres to use a temporary table "datapasta_clone" which allows
// the entire upload to be done without any round trips.
func (db pgdb) NewBatchClient(ctx context.Context, tx Postgreser) (pgbatchtx, error) {
child := pgtx{
pgdb: db,
tx: postgresQueries{tx},
ctx: ctx,
found: map[string][]any{},
foundWithoutPK: map[any]bool{},
}
return nil
return pgbatchtx{pgtx: child}, nil
}

func (db pgtx) SelectMatchingRows(tname string, conds map[string][]any) ([]map[string]any, error) {
type pgbatchtx struct {
pgtx
}

func (db pgbatchtx) SelectMatchingRows(tname string, conds map[string][]any) ([]map[string]any, error) {
// build a query to select * where each of the conditions is met
or := squirrel.Or{}
for col, vals := range conds {
Expand Down Expand Up @@ -200,25 +152,6 @@ func (db pgtx) SelectMatchingRows(tname string, conds map[string][]any) ([]map[s
return foundInThisScan, nil
}

// NewBatchClient creates a batching client that can be used as a Database for Upload and Download.
// it is recommended you pass an open transaction, so you can control committing or rolling it back.
// This client is optimized for Postgres to use a temporary table "datapasta_clone" which allows
// the entire upload to be done without any round trips.
func (db pgdb) NewBatchClient(ctx context.Context, tx Postgreser) (pgbatchtx, error) {
child := pgtx{
pgdb: db,
tx: postgresQueries{tx},
ctx: ctx,
found: map[string][]any{},
foundWithoutPK: map[any]bool{},
}
return pgbatchtx{pgtx: child}, nil
}

type pgbatchtx struct {
pgtx
}

func (db pgbatchtx) Insert(fkm ForeignKeyMapper, rows ...map[string]any) error {
if _, err := db.tx.db.Exec(db.ctx, "CREATE TEMPORARY TABLE IF NOT EXISTS datapasta_clone(table_name text, original_id integer, clone_id integer)"); err != nil {
return err
Expand All @@ -227,6 +160,7 @@ func (db pgbatchtx) Insert(fkm ForeignKeyMapper, rows ...map[string]any) error {
start := time.Now()

batch := &pgx.Batch{}
followup := &pgx.Batch{}
for _, row := range rows {
table := row[DumpTableKey].(string)

Expand All @@ -247,17 +181,41 @@ func (db pgbatchtx) Insert(fkm ForeignKeyMapper, rows ...map[string]any) error {
keys := make([]string, 0, len(row))
vals := make([]any, 0, len(row))
for k, v := range row {
if v == nil {
continue
}
if k == DumpTableKey {
continue
}
val := any(v)
deferred := false
for _, fk := range db.fks {
if fk.ReferencingCol == k && fk.ReferencingTable == table {
val = squirrel.Expr("COALESCE((SELECT clone_id FROM datapasta_clone WHERE original_id = ? AND table_name = ?::text), ?)", v, fk.BaseTable, v)
findInMap := squirrel.Expr("COALESCE((SELECT clone_id FROM datapasta_clone WHERE original_id = ? AND table_name = ?::text), ?)", v, fk.BaseTable, v)

if fk.BaseTable == table {
// self-referential columns become NULL and are updated in a second pass by PK
if pk == "" {
return fmt.Errorf("can't have self-referencing tables without primary key")
}
deferred = true
builder := db.builder.Update(`"`+table+`"`).Set(k, findInMap).Where("id=(SELECT clone_id FROM datapasta_clone WHERE original_id = ? AND table_name = ?::text)", oldPK, fk.BaseTable)
sql, args, err := builder.ToSql()
if err != nil {
return fmt.Errorf(`build: %w, args: %s, sql: %s`, err, args, sql)
}
followup.Queue(sql, args...)
} else {
v = findInMap
}

break
}
}
if deferred {
continue
}
keys = append(keys, fmt.Sprintf(`"%s"`, k))
vals = append(vals, val)
vals = append(vals, v)
}

builder = builder.Columns(keys...).Values(vals...)
Expand All @@ -270,6 +228,7 @@ func (db pgbatchtx) Insert(fkm ForeignKeyMapper, rows ...map[string]any) error {
}

prepped := time.Now()
log.Printf("batchrows:%d, followups:%d", batch.Len(), followup.Len())

res := db.tx.db.SendBatch(db.ctx, batch)
for i := 0; i < batch.Len(); i++ {
Expand All @@ -278,12 +237,27 @@ func (db pgbatchtx) Insert(fkm ForeignKeyMapper, rows ...map[string]any) error {
return fmt.Errorf(`batch query %d error: %w`, i, err)
}
}
if err := res.Close(); err != nil {
return fmt.Errorf("failed to execute batch upload: %w", err)
}

if optLogPostgres {
fks := db.tx.db.SendBatch(db.ctx, followup)
for i := 0; i < followup.Len(); i++ {
_, err := fks.Exec()
if err != nil {
return fmt.Errorf(`batch foreign key %d error: %w`, i, err)
}
}
fks.Close()

if OptLogPostgres {
log.Printf("prepping: %s, batching: %s", prepped.Sub(start), time.Since(prepped))
}

return res.Close()
if err := res.Close(); err != nil {
return fmt.Errorf("failed to execute batch followup queries: %w", err)
}
return nil
}

// Postgreser does postgres things.
Expand Down
24 changes: 22 additions & 2 deletions postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package datapasta
import (
"context"
"encoding/json"
"log"
"strings"
"testing"
"time"

Expand All @@ -18,6 +20,7 @@ func TestWithLocalPostgres(t *testing.T) {
t.Skipf("test is used for development against real pulley schema")

company := testCompany
log.Println("starting to clone company", company)

ok := assert.New(t)
conn, err := pgxpool.Connect(context.Background(), `postgresql://postgres:postgres@localhost:5432/postgres`)
Expand Down Expand Up @@ -48,6 +51,7 @@ func TestWithLocalPostgres(t *testing.T) {

for _, row := range res {
CleanupRow(row)
log.Println("cloning", row[DumpTableKey], row["id"])
}

in, _ := json.Marshal(res)
Expand All @@ -56,6 +60,8 @@ func TestWithLocalPostgres(t *testing.T) {

fkm := NewForeignKeyMapper(cli)
start := time.Now()

log.Println("starting to insert company", company)
ok.NoError(cli.Insert(fkm, out...))
upload := time.Since(start)

Expand All @@ -69,9 +75,23 @@ func TestWithLocalPostgres(t *testing.T) {

t.Logf("new id: %d", newID)

newRes, _, err := Download(context.Background(), cli, "company", "id", newID, exportOpts...)
log.Println("starting to download company", newID)
newRes, deb, err := Download(context.Background(), cli, "company", "id", newID, exportOpts...)
ok.NoError(err)
ok.Len(newRes, len(res))

for _, l := range deb {
if !strings.HasSuffix(l, " 0 rows") {
t.Logf("debug: %s ... %s", l[:20], l[len(l)-20:])
}
}

for _, out := range newRes {
if out[DumpTableKey] == "company" {
t.Logf("found cloned company %v", out["id"])
}
}

ok.Equalf(len(res), len(newRes), "expected clone to have the same size export")

t.Logf("durations: download(%s), upload(%s)", download, upload)
}
Expand Down

0 comments on commit 15f7ee5

Please sign in to comment.