Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions internal/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ func (c *client) Write(ctx context.Context, msgs <-chan message.WriteMessage) er
c.migrate(ctx, msg.Table)
case *message.WriteDeleteStale:
c.deleteStale(ctx, msg)
case *message.WriteDeleteRecord:
c.deleteRecord(ctx, msg)
case *message.WriteInsert:
sc := msg.Record.Schema()
tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName)
Expand Down Expand Up @@ -257,3 +259,53 @@ func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
}
c.memoryDB[tableName] = filteredTable
}

func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) {
var filteredTable []arrow.Record
tableName := msg.TableName
for i, row := range c.memoryDB[tableName] {
isMatch := true
// Groups are evaluated as AND
for _, predGroup := range msg.WhereClause {
for _, pred := range predGroup.Predicates {
predResult := evaluatePredicate(pred, row)
if predGroup.GroupingType == "AND" {
isMatch = isMatch && predResult
} else if predResult {
isMatch = true
break
}
}
// If any single predicate group is false then we can break out of the loop
if !isMatch {
break
}
}

if !isMatch {
filteredTable = append(filteredTable, c.memoryDB[tableName][i])
}
}
c.memoryDB[tableName] = filteredTable
}

func evaluatePredicate(pred message.Predicate, record arrow.Record) bool {
sc := record.Schema()
indices := sc.FieldIndices(pred.Column)
if len(indices) == 0 {
return false
}
syncColIndex := indices[0]

if record.Column(syncColIndex).DataType() != pred.Record.Column(0).DataType() {
return false
}
// dataType := record.Column(syncColIndex).DataType()
switch pred.Operator {
case "eq":
return record.Column(syncColIndex).String() == pred.Record.Column(0).String()
// return record.Column(syncColIndex).(*array.String).Value(0) == pred.Record.Column(0).(*array.String).Value(0)
default:
return false
}
}
69 changes: 69 additions & 0 deletions internal/servers/plugin/v3/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,41 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
Record: recordBytes,
},
}
case *message.SyncDeleteRecord:
whereClause := make([]*pb.PredicatesGroup, len(m.WhereClause))
for j, predicateGroup := range m.WhereClause {
whereClause[j] = &pb.PredicatesGroup{
GroupingType: pb.PredicatesGroup_GroupingType(pb.PredicatesGroup_GroupingType_value[predicateGroup.GroupingType]),
Predicates: make([]*pb.Predicate, len(predicateGroup.Predicates)),
}
for i, predicate := range predicateGroup.Predicates {
record, err := pb.RecordToBytes(predicate.Record)
if err != nil {
return status.Errorf(codes.Internal, "failed to encode record: %v", err)
}

whereClause[j].Predicates[i] = &pb.Predicate{
Record: record,
Column: predicate.Column,
Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[predicate.Operator]),
}
}
}

tableRelations := make([]*pb.TableRelation, len(m.TableRelations))
for i, tr := range m.TableRelations {
tableRelations[i] = &pb.TableRelation{
TableName: tr.TableName,
ParentTable: tr.ParentTable,
}
}
pbMsg.Message = &pb.Sync_Response_DeleteRecord{
DeleteRecord: &pb.Sync_MessageDeleteRecord{
TableName: m.TableName,
TableRelations: tableRelations,
WhereClause: whereClause,
},
}
default:
return status.Errorf(codes.Internal, "unknown message type: %T", msg)
}
Expand Down Expand Up @@ -230,6 +265,40 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error {
SourceName: pbMsg.Delete.SourceName,
SyncTime: pbMsg.Delete.SyncTime.AsTime(),
}

case *pb.Write_Request_DeleteRecord:
whereClause := make(message.PredicateGroups, len(pbMsg.DeleteRecord.WhereClause))

for j, predicateGroup := range pbMsg.DeleteRecord.WhereClause {
whereClause[j].Predicates = make(message.Predicates, len(predicateGroup.Predicates))
for i, predicate := range predicateGroup.Predicates {
record, err := pb.NewRecordFromBytes(predicate.Record)
if err != nil {
pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err)
break
}
whereClause[j].Predicates[i] = message.Predicate{
Record: record,
Column: predicate.Column,
Operator: predicate.Operator.String(),
}
}
}

tableRelations := make([]message.TableRelation, len(pbMsg.DeleteRecord.TableRelations))
for i, tr := range pbMsg.DeleteRecord.TableRelations {
tableRelations[i] = message.TableRelation{
TableName: tr.TableName,
ParentTable: tr.ParentTable,
}
}
pluginMessage = &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: pbMsg.DeleteRecord.TableName,
TableRelations: tableRelations,
WhereClause: whereClause,
},
}
}

if pbMsgConvertErr != nil {
Expand Down
10 changes: 10 additions & 0 deletions message/sync_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,13 @@ func (m SyncInserts) GetRecordsForTable(table *schema.Table) []arrow.Record {
}
return slices.Clip(res)
}

type SyncDeleteRecord struct {
syncBaseMessage
// TODO: Instead of using this struct we should derive the DeletionKeys and parent/child relation from the schema.Table itself
DeleteRecord
}

func (m SyncDeleteRecord) GetTable() *schema.Table {
return &schema.Table{Name: m.TableName}
}
41 changes: 41 additions & 0 deletions message/write_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,44 @@ func (m WriteDeleteStales) Exists(tableName string) bool {
return msg.TableName == tableName
})
}

type TableRelation struct {
TableName string
ParentTable string
}

type TableRelations []TableRelation

type Predicate struct {
Operator string
Column string
Record arrow.Record
}

type Predicates []Predicate

type PredicateGroup struct {
// This will be AND or OR
GroupingType string
Predicates Predicates
}

type PredicateGroups []PredicateGroup

type DeleteRecord struct {
TableName string
TableRelations TableRelations
WhereClause PredicateGroups
SyncTime time.Time
}

type WriteDeleteRecord struct {
writeBaseMessage
DeleteRecord
}

func (m WriteDeleteRecord) GetTable() *schema.Table {
return &schema.Table{Name: m.TableName}
}

type WriteDeleteRecords []*WriteDeleteRecord
9 changes: 9 additions & 0 deletions plugin/testing_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests,
})
})

t.Run("TestDeleteRecord", func(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
suite.testDeleteRecordBasic(ctx)
})
t.Run("DeleteAll", func(t *testing.T) {
suite.testDeleteAllRecords(ctx)
})
})

t.Run("TestMigrate", func(t *testing.T) {
if suite.tests.SkipMigrate {
t.Skip("skipping " + t.Name())
Expand Down
150 changes: 150 additions & 0 deletions plugin/testing_write_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,153 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context) {
require.EqualValuesf(s.t, rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second delete stale")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{nullRecord}), "record differs")
}

func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
tableName := s.tableNameForTest("delete_all_rows")
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
table := &schema.Table{
Name: tableName,
Columns: schema.ColumnList{
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64, PrimaryKey: true, NotNull: true},
schema.CqSourceNameColumn,
schema.CqSyncTimeColumn,
},
}
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
const sourceName = "source-test"

bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
bldr.Field(0).(*array.Int64Builder).Append(0)
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
record1 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
record1 = s.handleNulls(record1) // we process nulls after writing

records, err := s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")

// create value for delete statement but nothing will be deleted because ID value isn't present
bldrDeleteNoMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
Name: tableName,
Columns: schema.ColumnList{
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64},
},
}).ToArrowSchema())
bldrDeleteNoMatch.Field(0).(*array.Int64Builder).Append(1)
deleteValue := bldrDeleteNoMatch.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
WhereClause: message.PredicateGroups{
{
GroupingType: "AND",
Predicates: []message.Predicate{
{
Operator: "eq",
Column: "id",
Record: deleteValue,
},
},
},
},
},
}), "failed to delete record no match")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete with no match")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete with no match")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete with no match")

// create value for delete statement will be delete One record
bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
Name: tableName,
Columns: schema.ColumnList{
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64},
},
}).ToArrowSchema())
bldrDeleteMatch.Field(0).(*array.Int64Builder).Append(0)
deleteValue = bldrDeleteMatch.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
WhereClause: message.PredicateGroups{
{
GroupingType: "AND",
Predicates: []message.Predicate{
{
Operator: "eq",
Column: "id",
Record: deleteValue,
},
},
},
},
},
}), "failed to delete record no match")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete with match")
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete with match")
}

func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) {
tableName := s.tableNameForTest("delete_all_records")
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
table := &schema.Table{
Name: tableName,
Columns: schema.ColumnList{
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64, PrimaryKey: true, NotNull: true},
schema.CqSourceNameColumn,
schema.CqSyncTimeColumn,
},
}
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
const sourceName = "source-test"

bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
bldr.Field(0).(*array.Int64Builder).Append(0)
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
record1 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")

records, err := s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
},
}), "failed to delete records")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete all records")
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete stale")

bldr.Field(0).(*array.Int64Builder).Append(1)
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second))
record2 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
},
}), "failed to delete records second time")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read second time")
sortRecords(table, records, "id")
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items second time")
}
Loading