Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions plugin/testing_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

type WriterTestSuite struct {
t *testing.T
tests WriterTestSuiteTests

plugin *Plugin
Expand Down Expand Up @@ -95,7 +94,6 @@ func WithRandomSeed(seed int64) func(o *WriterTestSuite) {

func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests, opts ...func(o *WriterTestSuite)) {
suite := &WriterTestSuite{
t: t,
tests: tests,
plugin: p,
}
Expand Down Expand Up @@ -145,10 +143,10 @@ func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests,
t.Skip("skipping " + t.Name())
}
t.Run("Basic", func(t *testing.T) {
suite.testDeleteStaleBasic(ctx)
suite.testDeleteStaleBasic(ctx, t)
})
t.Run("All", func(t *testing.T) {
suite.testDeleteStaleAll(ctx)
suite.testDeleteStaleAll(ctx, t)
})
})

Expand All @@ -157,10 +155,10 @@ func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests,
t.Skip("skipping " + t.Name())
}
t.Run("Basic", func(t *testing.T) {
suite.testDeleteRecordBasic(ctx)
suite.testDeleteRecordBasic(ctx, t)
})
t.Run("DeleteAll", func(t *testing.T) {
suite.testDeleteAllRecords(ctx)
suite.testDeleteAllRecords(ctx, t)
})
})

Expand Down
119 changes: 62 additions & 57 deletions plugin/testing_write_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin

import (
"context"
"testing"
"time"

"github.com/apache/arrow/go/v15/arrow"
Expand All @@ -12,7 +13,8 @@ import (
"github.com/stretchr/testify/require"
)

func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context) {
func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context, t *testing.T) {
r := require.New(t)
tableName := s.tableNameForTest("delete_basic")
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
Expand All @@ -24,7 +26,7 @@ func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context) {
schema.CqSyncTimeColumn,
},
}
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
const sourceName = "source-test"

bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
Expand All @@ -33,60 +35,61 @@ func (s *WriterTestSuite) testDeleteStaleBasic(ctx context.Context) {
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
record1 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
record1 = s.handleNulls(record1) // we process nulls after writing

records, err := s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")
r.NoErrorf(err, "failed to read")
r.EqualValuesf(1, TotalRows(records), "unexpected amount of items")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteStale{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteStale{
TableName: table.Name,
SourceName: sourceName,
SyncTime: syncTime,
}), "failed to delete stale records")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete stale")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete stale")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete stale")
r.NoErrorf(err, "failed to read after delete stale")
r.EqualValuesf(1, TotalRows(records), "unexpected amount of items after delete stale")
r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete stale")

bldr.Field(0).(*array.Int64Builder).Append(1)
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second))
record2 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record")
record2 = s.handleNulls(record2) // we process nulls after writing

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read second time")
r.NoErrorf(err, "failed to read second time")
sortRecords(table, records, "id")
require.EqualValuesf(s.t, 2, TotalRows(records), "unexpected amount of items second time")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1, record2}), "record differs after delete stale")
r.EqualValuesf(2, TotalRows(records), "unexpected amount of items second time")
r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1, record2}), "record differs after delete stale")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteStale{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteStale{
TableName: table.Name,
SourceName: sourceName,
SyncTime: syncTime.Add(time.Second),
}), "failed to delete stale records second time")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after second delete stale")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after second delete stale")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record2}), "record differs after second delete stale")
r.NoErrorf(err, "failed to read after second delete stale")
r.EqualValuesf(1, TotalRows(records), "unexpected amount of items after second delete stale")
r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record2}), "record differs after second delete stale")
}

func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context) {
func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context, t *testing.T) {
const rowsPerRecord = 10

r := require.New(t)
tableName := s.tableNameForTest("delete_all")
// https://github.com/golang/go/issues/41087
syncTime := time.Now().UTC().Truncate(time.Microsecond)
table := schema.TestTable(tableName, s.genDatOptions)
table.Columns = append(schema.ColumnList{schema.CqSourceNameColumn, schema.CqSyncTimeColumn}, table.Columns...)
table.Columns[table.Columns.Index("id")].PrimaryKey = true
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")

tg := schema.NewTestDataGenerator()
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
Expand All @@ -95,22 +98,22 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context) {
SourceName: "test",
SyncTime: syncTime, // Generate call may truncate the value further based on the options
})
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: normalRecord}), "failed to insert record")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: normalRecord}), "failed to insert record")
normalRecord = s.handleNulls(normalRecord) // we process nulls after writing

readRecords, err := s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read")
require.EqualValuesf(s.t, rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after read")
r.NoErrorf(err, "failed to read")
r.EqualValuesf(rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after read")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteStale{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteStale{
TableName: table.Name,
SourceName: "test",
SyncTime: syncTime, // Generate call may truncate the value further based on the options
}), "failed to delete stale records")

readRecords, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete stale")
require.EqualValuesf(s.t, rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after delete stale")
r.NoErrorf(err, "failed to read after delete stale")
r.EqualValuesf(rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after delete stale")

// https://github.com/golang/go/issues/41087
syncTime = time.Now().UTC().Truncate(time.Microsecond)
Expand All @@ -121,29 +124,30 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context) {
SourceName: "test",
SyncTime: syncTime, // Generate call may truncate the value further based on the options
})
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: nullRecord}), "failed to insert record second time")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: nullRecord}), "failed to insert record second time")
nullRecord = s.handleNulls(nullRecord) // we process nulls after writing

readRecords, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read second time")
r.NoErrorf(err, "failed to read second time")
sortRecords(table, readRecords, "id")
require.EqualValuesf(s.t, 2*rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second read")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}), "record differs")
r.EqualValuesf(2*rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second read")
r.Emptyf(RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}), "record differs")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteStale{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteStale{
TableName: table.Name,
SourceName: "test",
SyncTime: syncTime, // Generate call may truncate the value further based on the options
}), "failed to delete stale records second time")

readRecords, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after second delete stale")
r.NoErrorf(err, "failed to read after second delete stale")
sortRecords(table, readRecords, "id")
require.EqualValuesf(s.t, rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second delete stale")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{nullRecord}), "record differs")
r.EqualValuesf(rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second delete stale")
r.Emptyf(RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{nullRecord}), "record differs")
}

func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context, t *testing.T) {
r := require.New(t)
tableName := s.tableNameForTest("delete_all_rows")
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
Expand All @@ -155,7 +159,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
schema.CqSyncTimeColumn,
},
}
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
const sourceName = "source-test"

bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
Expand All @@ -164,12 +168,12 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
record1 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
record1 = s.handleNulls(record1) // we process nulls after writing

records, err := s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")
r.NoErrorf(err, "failed to read")
r.EqualValuesf(1, TotalRows(records), "unexpected amount of items")

// create value for delete statement but nothing will be deleted because ID value isn't present
bldrDeleteNoMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
Expand All @@ -181,7 +185,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
bldrDeleteNoMatch.Field(0).(*array.Int64Builder).Append(1)
deleteValue := bldrDeleteNoMatch.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
WhereClause: message.PredicateGroups{
Expand All @@ -200,9 +204,9 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
}), "failed to delete record no match")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete with no match")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete with no match")
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete with no match")
r.NoErrorf(err, "failed to read after delete with no match")
r.EqualValuesf(1, TotalRows(records), "unexpected amount of items after delete with no match")
r.Emptyf(RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete with no match")

// create value for delete statement will be delete One record
bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
Expand All @@ -214,7 +218,7 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
bldrDeleteMatch.Field(0).(*array.Int64Builder).Append(0)
deleteValue = bldrDeleteMatch.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
WhereClause: message.PredicateGroups{
Expand All @@ -233,11 +237,12 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
}), "failed to delete record no match")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete with match")
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete with match")
r.NoErrorf(err, "failed to read after delete with match")
r.EqualValuesf(0, TotalRows(records), "unexpected amount of items after delete with match")
}

func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) {
func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context, t *testing.T) {
r := require.New(t)
tableName := s.tableNameForTest("delete_all_records")
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
Expand All @@ -249,7 +254,7 @@ func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) {
schema.CqSyncTimeColumn,
},
}
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
const sourceName = "source-test"

bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
Expand All @@ -258,37 +263,37 @@ func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) {
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
record1 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")

records, err := s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read")
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")
r.NoErrorf(err, "failed to read")
r.EqualValuesf(1, TotalRows(records), "unexpected amount of items")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
},
}), "failed to delete records")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read after delete all records")
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete stale")
r.NoErrorf(err, "failed to read after delete all records")
r.EqualValuesf(0, TotalRows(records), "unexpected amount of items after delete stale")

bldr.Field(0).(*array.Int64Builder).Append(1)
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second))
record2 := bldr.NewRecord()

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record")
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record")

require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
r.NoErrorf(s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
},
}), "failed to delete records second time")

records, err = s.plugin.readAll(ctx, table)
require.NoErrorf(s.t, err, "failed to read second time")
r.NoErrorf(err, "failed to read second time")
sortRecords(table, records, "id")
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items second time")
r.EqualValuesf(0, TotalRows(records), "unexpected amount of items second time")
}