Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func WithTestIgnoreNullsInLists() func(o *PluginTestSuiteRunnerOptions) {
}
}

func WithTestSourceTimePrecision(precision time.Duration) func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.TimePrecision = precision
}
}

func WithTestSourceSkipLists() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipLists = true
Expand Down Expand Up @@ -173,7 +179,11 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
tests: tests,
}

opts := PluginTestSuiteRunnerOptions{}
opts := PluginTestSuiteRunnerOptions{
TestSourceOptions: schema.TestSourceOptions{
TimePrecision: time.Microsecond,
},
}
for _, o := range testOptions {
o(&opts)
}
Expand Down
7 changes: 4 additions & 3 deletions plugins/destination/plugin_testing_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
}
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
TimePrecision: testOpts.TimePrecision,
}
resource1 := schema.GenTestData(source, opts)[0]
if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil {
Expand Down
16 changes: 9 additions & 7 deletions plugins/destination/plugin_testing_overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
}

opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
TimePrecision: testOpts.TimePrecision,
}
resources := schema.GenTestData(table, opts)
if err := p.writeAll(ctx, sourceSpec, syncTime, resources); err != nil {
Expand Down Expand Up @@ -72,10 +73,11 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
cqIDInds := resources[0].Schema().FieldIndices(schema.CqIDColumn.Name)
u := resources[0].Column(cqIDInds[0]).(*types.UUIDArray).Value(0)
opts = schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: secondSyncTime,
MaxRows: 1,
StableUUID: u,
SourceName: sourceName,
SyncTime: secondSyncTime,
MaxRows: 1,
StableUUID: u,
TimePrecision: testOpts.TimePrecision,
}
updatedResource := schema.GenTestData(table, opts)
// write second time
Expand Down
16 changes: 9 additions & 7 deletions plugins/destination/plugin_testing_overwrite_delete_stale.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
}

opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
TimePrecision: testOpts.TimePrecision,
}
resources := schema.GenTestData(table, opts)
incResources := schema.GenTestData(incTable, opts)
Expand Down Expand Up @@ -87,10 +88,11 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
cqIDInds := resources[0].Schema().FieldIndices(schema.CqIDColumn.Name)
u := resources[0].Column(cqIDInds[0]).(*types.UUIDArray).Value(0)
opts = schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: secondSyncTime,
StableUUID: u,
MaxRows: 1,
SourceName: sourceName,
SyncTime: secondSyncTime,
StableUUID: u,
MaxRows: 1,
TimePrecision: testOpts.TimePrecision,
}
updatedResources := schema.GenTestData(table, opts)
updatedIncResources := schema.GenTestData(incTable, opts)
Expand Down
7 changes: 4 additions & 3 deletions plugins/destination/plugin_testing_write_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
}

opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
TimePrecision: testOpts.TimePrecision,
}
record1 := schema.GenTestData(table, opts)
if err := p.writeAll(ctx, specSource, syncTime, record1); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion schema/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TestSourceOptions struct {
SkipDurations bool
SkipTimes bool // time of day types
SkipLargeTypes bool // e.g. large binary, large string
TimePrecision time.Duration
}

// TestSourceColumns returns columns for all Arrow types and composites thereof. TestSourceOptions controls
Expand Down Expand Up @@ -257,7 +258,8 @@ type GenTestDataOptions struct {
// StableUUID is the UUID to use for all rows. If set to uuid.Nil, a new UUID will be generated
StableUUID uuid.UUID
// StableTime is the time to use for all rows other than sync time. If set to time.Time{}, a new time will be generated
StableTime time.Time
StableTime time.Time
TimePrecision time.Duration
}

// GenTestData generates a slice of arrow.Records with the given schema and options.
Expand Down Expand Up @@ -407,6 +409,8 @@ func getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOpt
} else if !opts.StableTime.IsZero() {
t = opts.StableTime
}
t = t.Truncate(opts.TimePrecision)

switch timestampType {
case arrow.FixedWidthTypes.Timestamp_s:
return strconv.FormatInt(t.Unix(), 10)
Expand Down