Skip to content

Commit

Permalink
fix(race): Fix race in DuckDB delete-stale (#11215)
Browse files Browse the repository at this point in the history
  • Loading branch information
candiduslynx committed Jun 5, 2023
1 parent 1b7f65b commit c0b9f0a
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 86 deletions.
25 changes: 16 additions & 9 deletions plugins/destination/duckdb/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,30 @@ type Client struct {

var _ destination.Client = (*Client)(nil)

func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (destination.Client, error) {
func New(ctx context.Context, logger zerolog.Logger, dstSpec specs.Destination) (destination.Client, error) {
var err error
c := &Client{
logger: logger.With().Str("module", "duckdb-dest").Logger(),
spec: dstSpec,
}

var duckdbSpec Spec
c.spec = spec
if err := spec.UnmarshalSpec(&duckdbSpec); err != nil {
var spec Spec
if err := dstSpec.UnmarshalSpec(&spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal duckdb spec: %w", err)
}
c.connector, err = duckdb.NewConnector(duckdbSpec.ConnectionString, nil)
db := sql.OpenDB(c.connector)

c.connector, err = duckdb.NewConnector(spec.ConnectionString, nil)
if err != nil {
return nil, err
}
c.db = db
_, err = c.db.ExecContext(ctx, "INSTALL 'json'; LOAD 'json';")

c.db = sql.OpenDB(c.connector)

err = c.exec(ctx, "INSTALL 'json'; LOAD 'json';")
if err != nil {
return nil, err
}
_, err = c.db.ExecContext(ctx, "INSTALL 'parquet'; LOAD 'parquet';")
err = c.exec(ctx, "INSTALL 'parquet'; LOAD 'parquet';")
if err != nil {
return nil, err
}
Expand All @@ -69,3 +71,8 @@ func (c *Client) Close(_ context.Context) error {
func (c *Client) Metrics() destination.Metrics {
return c.metrics
}

func (c *Client) exec(ctx context.Context, query string, args ...any) error {
_, err := c.db.ExecContext(ctx, query, args...)
return err
}
4 changes: 2 additions & 2 deletions plugins/destination/duckdb/client/delete_stale.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source s
sb.WriteString(" = $1 and ")
sb.WriteString(schema.CqSyncTimeColumn.Name)
sb.WriteString(" < to_timestamp($2)")
sql := sb.String()
if _, err := c.db.ExecContext(ctx, sql, source, syncTime.Unix()); err != nil {
if err := c.exec(ctx, sb.String(), source, syncTime.Unix()); err != nil {
return err
}
}

return nil
}
15 changes: 4 additions & 11 deletions plugins/destination/duckdb/client/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,15 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {

func (c *Client) recreateTable(ctx context.Context, table *schema.Table) error {
sql := "drop table if exists " + sanitizeID(table.Name)
if _, err := c.db.ExecContext(ctx, sql); err != nil {
return fmt.Errorf("failed to drop table %s: %w", table.Name, err)
if err := c.exec(ctx, sql); err != nil {
return err
}
return c.createTableIfNotExist(ctx, table.Name, table)
}

func (c *Client) addColumn(ctx context.Context, tableName string, columnName string, columnType string) error {
sql := "alter table " + sanitizeID(tableName) + " add column " + sanitizeID(columnName) + " " + columnType
if _, err := c.db.ExecContext(ctx, sql); err != nil {
return fmt.Errorf("failed to add column %s on table %s: %w", columnName, tableName, err)
}
return nil
return c.exec(ctx, sql)
}

func (c *Client) createTableIfNotExist(ctx context.Context, tableName string, table *schema.Table) error {
Expand Down Expand Up @@ -227,11 +224,7 @@ func (c *Client) createTableIfNotExist(ctx context.Context, tableName string, ta
sb.WriteString(")")
}
sb.WriteString(")")
_, err := c.db.ExecContext(ctx, sb.String())
if err != nil {
return fmt.Errorf("failed to create table with '%s': %w", sb.String(), err)
}
return nil
return c.exec(ctx, sb.String())
}

func (c *Client) isColumnUnique(ctx context.Context, tableName string, columName string) (bool, error) {
Expand Down
3 changes: 1 addition & 2 deletions plugins/destination/duckdb/client/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, _ string, res ch
}
sb.WriteString(") to '" + f.Name() + "' (FORMAT PARQUET)")

_, err = c.db.ExecContext(ctx, sb.String())
if err != nil {
if err := c.exec(ctx, sb.String()); err != nil {
return err
}
f, err = os.Open(fName)
Expand Down
131 changes: 69 additions & 62 deletions plugins/destination/duckdb/client/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cenkalti/backoff/v4"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -46,62 +47,61 @@ func dtContainsList(dt arrow.DataType) bool {
}
}

func (c *Client) upsert(ctx context.Context, tmpTableName string, tableName string, table *schema.Table) error {
func (c *Client) upsert(ctx context.Context, tmpTableName string, table *schema.Table) error {
var sb strings.Builder
sb.WriteString("insert into " + tableName + " select * from " + tmpTableName + " on conflict (")
sb.WriteString("insert into " + table.Name + " select * from " + tmpTableName + " on conflict (")
sb.WriteString(strings.Join(table.PrimaryKeys(), ", "))
sb.WriteString(" ) do update set ")
indices := nonPkIndices(table)
for i, indice := range indices {
col := table.Columns[indice]
for i, index := range indices {
if i > 0 {
sb.WriteString(", ")
}
col := table.Columns[index]
sb.WriteString(col.Name)
sb.WriteString(" = excluded.")
sb.WriteString(col.Name)
if i < len(indices)-1 {
sb.WriteString(", ")
}
}
if _, err := c.db.ExecContext(ctx, sb.String()); err != nil {
return err
}
return nil
query := sb.String()
// per https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking we might need some retries
// as the upsert for tables with PKs is transformed into delete + insert internally
return backoff.Retry(
func() error {
return c.exec(ctx, query)
},
backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(50*time.Millisecond), 3), ctx),
)
}

func (c *Client) deleteByPK(ctx context.Context, tmpTableName string, tableName string, table *schema.Table) error {
func (c *Client) deleteByPK(ctx context.Context, tmpTableName string, table *schema.Table) error {
var sb strings.Builder
sb.WriteString("delete from " + tableName + " using " + tmpTableName + " where ")
pks := table.PrimaryKeys()
for i, col := range pks {
sb.WriteString(tableName + "." + col)
sb.WriteString(" = ")
sb.WriteString(tmpTableName + "." + col)
if i < len(pks)-1 {
sb.WriteString("delete from " + table.Name + " using " + tmpTableName + " where ")
for i, col := range table.PrimaryKeys() {
if i > 0 {
sb.WriteString(" and ")
}
sb.WriteString(table.Name + "." + col)
sb.WriteString(" = ")
sb.WriteString(tmpTableName + "." + col)
}
if _, err := c.db.ExecContext(ctx, sb.String()); err != nil {
return err
}
return nil

return c.exec(ctx, sb.String())
}

func (c *Client) copyFromFile(ctx context.Context, tableName string, fileName string, sc *arrow.Schema) error {
var sb strings.Builder
sb.WriteString("copy " + tableName + "(")
for i, col := range sc.Fields() {
sb.WriteString(sanitizeID(col.Name))
if i < len(sc.Fields())-1 {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(sanitizeID(col.Name))
}
sb.WriteString(") from '" + fileName + "' (FORMAT PARQUET)")
if _, err := c.db.ExecContext(ctx, sb.String()); err != nil {
return err
}
return nil
return c.exec(ctx, sb.String())
}

func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, records []arrow.Record) error {
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, records []arrow.Record) (err error) {
f, err := os.CreateTemp("", fmt.Sprintf("%s-*.parquet", table.Name))
if err != nil {
return err
Expand Down Expand Up @@ -134,39 +134,46 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, recor
return err
}

if c.spec.WriteMode == specs.WriteModeAppend || len(table.PrimaryKeys()) == 0 {
if err := c.copyFromFile(ctx, table.Name, f.Name(), sc); err != nil {
return err
}
} else {
tmpTableName := table.Name + strings.ReplaceAll(uuid.New().String(), "-", "_")
if err := c.createTableIfNotExist(ctx, tmpTableName, table); err != nil {
return fmt.Errorf("failed to create table %s: %w", tmpTableName, err)
}
if err := c.copyFromFile(ctx, tmpTableName, f.Name(), sc); err != nil {
return fmt.Errorf("failed to copy from file %s: %w", f.Name(), err)
}
if !c.enabledPks() || len(table.PrimaryKeys()) == 0 {
return c.copyFromFile(ctx, table.Name, f.Name(), sc)
}

// At time of writing (March 2023), duckdb does not support updating list columns.
// As a workaround, we delete the row and insert it again. This makes it non-atomic, unfortunately,
// but this is unavoidable until support is added to duckdb itself.
// See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
if containsList(table) {
if err := c.deleteByPK(ctx, tmpTableName, table.Name, table); err != nil {
return err
}
if _, err = c.db.ExecContext(ctx, "insert into "+table.Name+" from "+tmpTableName); err != nil {
return fmt.Errorf("failed to insert into %s from %s: %w", table.Name, tmpTableName, err)
}
} else {
if err := c.upsert(ctx, tmpTableName, table.Name, table); err != nil {
return err
}
}
if _, err = c.db.ExecContext(ctx, "drop table "+tmpTableName); err != nil {
return err
tmpTableName := table.Name + strings.ReplaceAll(uuid.New().String(), "-", "_")
if err := c.createTableIfNotExist(ctx, tmpTableName, table); err != nil {
return fmt.Errorf("failed to create table %s: %w", tmpTableName, err)
}
defer func() {
e := c.exec(ctx, "drop table "+tmpTableName)
if err == nil {
// we preserve original error, so update only on nil err
err = e
}
}()
if err := c.copyFromFile(ctx, tmpTableName, f.Name(), sc); err != nil {
return fmt.Errorf("failed to copy from file %s: %w", f.Name(), err)
}

return nil
// At time of writing (March 2023), duckdb does not support updating list columns.
// As a workaround, we delete the row and insert it again. This makes it non-atomic, unfortunately,
// but this is unavoidable until support is added to duckdb itself.
// See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
if containsList(table) {
return c.deleteInsert(ctx, tmpTableName, table)
}

return c.upsert(ctx, tmpTableName, table)
}

func (c *Client) deleteInsert(ctx context.Context, tmpTableName string, table *schema.Table) error {
if err := c.deleteByPK(ctx, tmpTableName, table); err != nil {
return err
}

// per https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking we might need to retry
return backoff.Retry(
func() error {
return c.exec(ctx, "insert into "+table.Name+" from "+tmpTableName)
},
backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(50*time.Millisecond), 3), ctx),
)
}
1 change: 1 addition & 0 deletions plugins/destination/duckdb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/apache/arrow/go/v13 v13.0.0-20230601164043-3299d12efc91
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cloudquery/plugin-pb-go v1.0.8
github.com/cloudquery/plugin-sdk/v3 v3.10.3
github.com/google/uuid v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions plugins/destination/duckdb/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG
github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down

0 comments on commit c0b9f0a

Please sign in to comment.