Skip to content

Commit

Permalink
adds delete handler to destination (#21)
Browse files Browse the repository at this point in the history
* adds delete action handling

* update log to specify delete instead of insert

Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com>
  • Loading branch information
dylanlott and lovromazgon committed Mar 29, 2022
1 parent acf68d9 commit 53d5c54
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
29 changes: 27 additions & 2 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (d *Destination) handleUpdate(ctx context.Context, r sdk.Record) error {
}

func (d *Destination) handleDelete(ctx context.Context, r sdk.Record) error {
return fmt.Errorf("not impl")
if !hasKey(r) {
return fmt.Errorf("key must be provided on delete actions")
}
return d.remove(ctx, r)
}

func (d *Destination) upsert(ctx context.Context, r sdk.Record) error {
Expand Down Expand Up @@ -170,6 +173,27 @@ func (d *Destination) upsert(ctx context.Context, r sdk.Record) error {
return nil
}

func (d *Destination) remove(ctx context.Context, r sdk.Record) error {
key, err := getKey(r)
if err != nil {
return err
}
keyColumnName := getKeyColumnName(key, d.config.keyColumnName)
tableName, err := d.getTableName(r.Metadata)
if err != nil {
return fmt.Errorf("failed to get table name for write: %w", err)
}
query, args, err := psql.
Delete(tableName).
Where(sq.Eq{keyColumnName: key[keyColumnName]}).
ToSql()
if err != nil {
return fmt.Errorf("error formatting delete query: %w", err)
}
_, err = d.conn.Exec(ctx, query, args...)
return err
}

// insert is an append-only operation that doesn't care about keys, but
// can error on constraints violations so should only be used when no table
// key or unique constraints are otherwise present.
Expand Down Expand Up @@ -306,7 +330,8 @@ func (d *Destination) getTableName(metadata map[string]string) (string, error) {
return tableName, nil
}

// getKeyColumnName will return the name of the first item in the key.
// getKeyColumnName will return the name of the first item in the key or the
// connector-configured default name of the key column name.
func getKeyColumnName(key sdk.StructuredData, defaultKeyName string) string {
if len(key) > 1 {
// Go maps aren't order preserving, so anything over len 1 will have
Expand Down
21 changes: 21 additions & 0 deletions destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ func TestAdapter_Write(t *testing.T) {
},
wantErr: true,
},
{
name: "delete by key",
fields: fields{
conn: getTestPostgres(t),
config: config{},
},
args: args{
ctx: context.Background(),
record: sdk.Record{
Position: sdk.Position("617237"),
Metadata: map[string]string{
"table": "keyed",
"action": "delete",
},
Key: sdk.StructuredData{
"key": "3",
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 53d5c54

Please sign in to comment.