From f37ffe86b2453d6f261ebef6f3c44f3074aaf7ca Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 9 Oct 2023 14:44:37 -0500 Subject: [PATCH 01/15] working! --- go.mod | 1 + internal/servers/plugin/v3/plugin.go | 48 +++++++++++++++++++ message/sync_message.go | 10 ++++ message/write_message.go | 23 +++++++++ writers/mixedbatchwriter/mixedbatchwriter.go | 11 +++++ .../mixedbatchwriter/mixedbatchwriter_test.go | 9 ++++ writers/mixedbatchwriter/unimplemented.go | 6 +++ .../mixedbatchwriter/unimplemented_test.go | 1 + writers/msgtype.go | 3 ++ 9 files changed, 112 insertions(+) diff --git a/go.mod b/go.mod index 51739fc41b..e516daf045 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/cloudquery/plugin-sdk/v4 go 1.21.1 +replace github.com/cloudquery/plugin-pb-go => /Users/benbernays/Documents/GitHub/plugin-pb-go require ( github.com/apache/arrow/go/v14 v14.0.0-20230929201650-00efb06dc0de github.com/bradleyjkemp/cupaloy/v2 v2.8.0 diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index c88205c5c2..1a537024f3 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -156,6 +156,29 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { Record: recordBytes, }, } + case *message.SyncDeleteRecord: + deletionKeys := make(map[string][]byte, len(m.DeleteKeys)) + for key, value := range m.DeleteKeys { + recordBytes, err := pb.RecordToBytes(value) + if err != nil { + return status.Errorf(codes.Internal, "failed to encode record: %v", err) + } + deletionKeys[key] = recordBytes + } + tableRelations := make([]*pb.TableRelation, len(m.TableRelations)) + for i, tr := range m.TableRelations { + tableRelations[i] = &pb.TableRelation{ + TableName: tr.TableName, + ParentTable: tr.ParentTable, + } + } + pbMsg.Message = &pb.Sync_Response_Delete{ + Delete: &pb.Sync_MessageDeleteRecord{ + TableName: m.TableName, + TableRelations: tableRelations, + DeletionKeys: deletionKeys, + }, + } default: return status.Errorf(codes.Internal, "unknown message type: %T", msg) } @@ -230,6 +253,31 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { SourceName: pbMsg.Delete.SourceName, SyncTime: pbMsg.Delete.SyncTime.AsTime(), } + + case *pb.Write_Request_DeleteRecord: + deletionKeys := make(map[string]arrow.Record, len(pbMsg.DeleteRecord.DeletionKeys)) + for key, value := range pbMsg.DeleteRecord.DeletionKeys { + record, err := pb.NewRecordFromBytes(value) + if err != nil { + pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) + break + } + deletionKeys[key] = record + } + tableRelations := make([]message.TableRelation, len(pbMsg.DeleteRecord.TableRelations)) + for i, tr := range pbMsg.DeleteRecord.TableRelations { + tableRelations[i] = message.TableRelation{ + TableName: tr.TableName, + ParentTable: tr.ParentTable, + } + } + pluginMessage = &message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: pbMsg.DeleteRecord.TableName, + TableRelations: tableRelations, + DeleteKeys: deletionKeys, + }, + } } if pbMsgConvertErr != nil { diff --git a/message/sync_message.go b/message/sync_message.go index 87cea8ea70..e18bf43ee9 100644 --- a/message/sync_message.go +++ b/message/sync_message.go @@ -111,3 +111,13 @@ func (m SyncInserts) GetRecordsForTable(table *schema.Table) []arrow.Record { } return slices.Clip(res) } + +type SyncDeleteRecord struct { + syncBaseMessage + // TODO: Instead of using this struct we should derive the DeletionKeys and parent/child relation from the schema.Table itself + DeleteRecord +} + +func (m SyncDeleteRecord) GetTable() *schema.Table { + return &schema.Table{Name: m.TableName} +} diff --git a/message/write_message.go b/message/write_message.go index 88991e8b3b..60932492e8 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -128,3 +128,26 @@ func (m WriteDeleteStales) Exists(tableName string) bool { return msg.TableName == tableName }) } + +type TableRelation struct { + TableName string + ParentTable string +} +type TableRelations []TableRelation +type DeleteRecord struct { + TableName string + TableRelations TableRelations + DeleteKeys map[string]arrow.Record + SyncTime time.Time +} + +type WriteDeleteRecord struct { + writeBaseMessage + DeleteRecord +} + +func (m WriteDeleteRecord) GetTable() *schema.Table { + return &schema.Table{Name: m.TableName} +} + +type WriteDeleteRecords []*WriteDeleteRecord diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index f3eb1cc2d8..10fe2aeb29 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -15,6 +15,7 @@ type Client interface { MigrateTableBatch(ctx context.Context, messages message.WriteMigrateTables) error InsertBatch(ctx context.Context, messages message.WriteInserts) error DeleteStaleBatch(ctx context.Context, messages message.WriteDeleteStales) error + DeleteRecordsBatch(ctx context.Context, messages message.WriteDeleteRecords) error } type MixedBatchWriter struct { @@ -97,6 +98,12 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri batch: make([]*message.WriteDeleteStale, 0, w.batchSize), writeFunc: w.client.DeleteStaleBatch, } + + deleteRecord := &batchManager[message.WriteDeleteRecords, *message.WriteDeleteRecord]{ + batch: make([]*message.WriteDeleteRecord, 0, w.batchSize), + writeFunc: w.client.DeleteRecordsBatch, + } + flush := func(msgType writers.MsgType) error { if msgType == writers.MsgTypeUnset { return nil @@ -108,6 +115,8 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri return insert.flush(ctx) case writers.MsgTypeDeleteStale: return deleteStale.flush(ctx) + case writers.MsgTypeDeleteRecord: + return deleteRecord.flush(ctx) default: panic("unknown message type") } @@ -138,6 +147,8 @@ loop: err = insert.append(ctx, v) case *message.WriteDeleteStale: err = deleteStale.append(ctx, v) + case *message.WriteDeleteRecord: + err = deleteRecord.append(ctx, v) default: panic("unknown message type") } diff --git a/writers/mixedbatchwriter/mixedbatchwriter_test.go b/writers/mixedbatchwriter/mixedbatchwriter_test.go index a321c08722..47d4f1eb07 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter_test.go +++ b/writers/mixedbatchwriter/mixedbatchwriter_test.go @@ -45,6 +45,15 @@ func (c *testMixedBatchClient) DeleteStaleBatch(_ context.Context, messages mess return nil } +func (c *testMixedBatchClient) DeleteRecordsBatch(_ context.Context, messages message.WriteDeleteRecords) error { + m := make([]message.WriteMessage, len(messages)) + for i, msg := range messages { + m[i] = msg + } + c.receivedBatches = append(c.receivedBatches, m) + return nil +} + var _ Client = (*testMixedBatchClient)(nil) type testMessages struct { diff --git a/writers/mixedbatchwriter/unimplemented.go b/writers/mixedbatchwriter/unimplemented.go index 1b7b3f8b7d..674afa8ccb 100644 --- a/writers/mixedbatchwriter/unimplemented.go +++ b/writers/mixedbatchwriter/unimplemented.go @@ -19,3 +19,9 @@ type UnimplementedDeleteStaleBatch struct{} func (UnimplementedDeleteStaleBatch) DeleteStaleBatch(context.Context, message.WriteDeleteStales) error { return fmt.Errorf("DeleteStaleBatch: %w", plugin.ErrNotImplemented) } + +type UnimplementedDeleteRecordsBatch struct{} + +func (UnimplementedDeleteRecordsBatch) DeleteRecordsBatch(context.Context, message.WriteDeleteRecords) error { + return fmt.Errorf("DeleteRecordsBatch: %w", plugin.ErrNotImplemented) +} diff --git a/writers/mixedbatchwriter/unimplemented_test.go b/writers/mixedbatchwriter/unimplemented_test.go index 10814fb042..64cfd05b60 100644 --- a/writers/mixedbatchwriter/unimplemented_test.go +++ b/writers/mixedbatchwriter/unimplemented_test.go @@ -10,6 +10,7 @@ import ( type testDummyClient struct { mixedbatchwriter.IgnoreMigrateTableBatch mixedbatchwriter.UnimplementedDeleteStaleBatch + mixedbatchwriter.UnimplementedDeleteRecordsBatch } func (testDummyClient) InsertBatch(context.Context, message.WriteInserts) error { diff --git a/writers/msgtype.go b/writers/msgtype.go index ebdaa71e38..360f9e8d3c 100644 --- a/writers/msgtype.go +++ b/writers/msgtype.go @@ -13,6 +13,7 @@ const ( MsgTypeMigrateTable MsgTypeInsert MsgTypeDeleteStale + MsgTypeDeleteRecord ) func MsgID(msg message.WriteMessage) MsgType { @@ -23,6 +24,8 @@ func MsgID(msg message.WriteMessage) MsgType { return MsgTypeInsert case *message.WriteDeleteStale: return MsgTypeDeleteStale + case *message.WriteDeleteRecord: + return MsgTypeDeleteRecord } panic("unknown message type: " + reflect.TypeOf(msg).Name()) } From 18e21fe38009f6a08173ca16af6cd64d0b2156da Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 10 Oct 2023 15:23:33 -0500 Subject: [PATCH 02/15] Update write_message.go --- message/write_message.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/message/write_message.go b/message/write_message.go index 60932492e8..1398b4490c 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -133,11 +133,25 @@ type TableRelation struct { TableName string ParentTable string } + +type Predicate struct { + Operator string + Column string + Record arrow.Record +} +type WhereClause struct { + And []Predicate + Or []Predicate +} + type TableRelations []TableRelation + +type WhereClauses []WhereClause + type DeleteRecord struct { TableName string TableRelations TableRelations - DeleteKeys map[string]arrow.Record + WhereClauses WhereClauses SyncTime time.Time } From c756106ab706f8e771ee854b991873fa6d098a35 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 10 Oct 2023 15:23:35 -0500 Subject: [PATCH 03/15] Update plugin.go --- internal/servers/plugin/v3/plugin.go | 74 +++++++++++++++++++++------- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index 1a537024f3..b2b1cefc2b 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -157,14 +157,34 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { }, } case *message.SyncDeleteRecord: - deletionKeys := make(map[string][]byte, len(m.DeleteKeys)) - for key, value := range m.DeleteKeys { - recordBytes, err := pb.RecordToBytes(value) - if err != nil { - return status.Errorf(codes.Internal, "failed to encode record: %v", err) + whereClauses := make([]*pb.WhereClause, len(m.WhereClauses)) + for i, whereClause := range m.WhereClauses { + whereClauses[i].And = make([]*pb.Predicate, len(whereClause.And)) + for j, value := range whereClause.And { + record, err := pb.RecordToBytes(value.Record) + if err != nil { + return status.Errorf(codes.Internal, "failed to encode record: %v", err) + } + whereClauses[i].And[j] = &pb.Predicate{ + Record: record, + Column: value.Column, + Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), + } + } + whereClauses[i].Or = make([]*pb.Predicate, len(whereClause.Or)) + for j, value := range whereClause.Or { + record, err := pb.RecordToBytes(value.Record) + if err != nil { + return status.Errorf(codes.Internal, "failed to encode record: %v", err) + } + whereClauses[i].Or[j] = &pb.Predicate{ + Record: record, + Column: value.Column, + Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), + } } - deletionKeys[key] = recordBytes } + tableRelations := make([]*pb.TableRelation, len(m.TableRelations)) for i, tr := range m.TableRelations { tableRelations[i] = &pb.TableRelation{ @@ -172,11 +192,11 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { ParentTable: tr.ParentTable, } } - pbMsg.Message = &pb.Sync_Response_Delete{ - Delete: &pb.Sync_MessageDeleteRecord{ + pbMsg.Message = &pb.Sync_Response_DeleteRecord{ + DeleteRecord: &pb.Sync_MessageDeleteRecord{ TableName: m.TableName, TableRelations: tableRelations, - DeletionKeys: deletionKeys, + WhereClauses: whereClauses, }, } default: @@ -255,14 +275,34 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { } case *pb.Write_Request_DeleteRecord: - deletionKeys := make(map[string]arrow.Record, len(pbMsg.DeleteRecord.DeletionKeys)) - for key, value := range pbMsg.DeleteRecord.DeletionKeys { - record, err := pb.NewRecordFromBytes(value) - if err != nil { - pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) - break + whereClauses := make(message.WhereClauses, len(pbMsg.DeleteRecord.WhereClauses)) + for i, whereClause := range pbMsg.DeleteRecord.WhereClauses { + whereClauses[i].And = make([]message.Predicate, len(whereClause.And)) + for j, value := range whereClause.And { + record, err := pb.NewRecordFromBytes(value.Record) + if err != nil { + pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) + break + } + whereClauses[i].And[j] = message.Predicate{ + Record: record, + Column: value.Column, + Operator: value.Operator.String(), + } + } + whereClauses[i].Or = make([]message.Predicate, len(whereClause.Or)) + for j, value := range whereClause.Or { + record, err := pb.NewRecordFromBytes(value.Record) + if err != nil { + pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) + break + } + whereClauses[i].Or[j] = message.Predicate{ + Record: record, + Column: value.Column, + Operator: value.Operator.String(), + } } - deletionKeys[key] = record } tableRelations := make([]message.TableRelation, len(pbMsg.DeleteRecord.TableRelations)) for i, tr := range pbMsg.DeleteRecord.TableRelations { @@ -275,7 +315,7 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { DeleteRecord: message.DeleteRecord{ TableName: pbMsg.DeleteRecord.TableName, TableRelations: tableRelations, - DeleteKeys: deletionKeys, + WhereClauses: whereClauses, }, } } From 3888ccd90e2437c4cdd0f32680ce13bdfca4d248 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 10 Oct 2023 15:26:25 -0500 Subject: [PATCH 04/15] use v13 not v14 --- examples/simple_plugin/plugin/client.go | 2 +- examples/simple_plugin/services/test.go | 2 +- internal/clients/state/v3/state.go | 8 ++++---- internal/memdb/memdb.go | 4 ++-- internal/pk/pk.go | 2 +- internal/servers/destination/v0/destinations.go | 4 ++-- internal/servers/destination/v0/schemav2tov3.go | 6 +++--- internal/servers/destination/v1/convert.go | 4 ++-- internal/servers/destination/v1/destinations.go | 6 +++--- internal/servers/plugin/v3/plugin.go | 2 +- message/sync_message.go | 2 +- message/write_message.go | 2 +- plugin/diff.go | 6 +++--- plugin/nulls.go | 6 +++--- plugin/plugin.go | 2 +- plugin/plugin_destination.go | 2 +- plugin/plugin_read.go | 2 +- plugin/sort.go | 4 ++-- plugin/testing_write.go | 2 +- plugin/testing_write_delete.go | 6 +++--- plugin/testing_write_insert.go | 6 +++--- plugin/testing_write_migrate.go | 2 +- plugin/testing_write_upsert.go | 6 +++--- scalar/binary.go | 2 +- scalar/bool.go | 2 +- scalar/date32.go | 2 +- scalar/date64.go | 2 +- scalar/decimal.go | 6 +++--- scalar/duration.go | 2 +- scalar/errors.go | 2 +- scalar/float.go | 2 +- scalar/inet.go | 2 +- scalar/int.go | 2 +- scalar/interval.go | 2 +- scalar/json.go | 2 +- scalar/list.go | 2 +- scalar/mac.go | 2 +- scalar/scalar.go | 8 ++++---- scalar/string.go | 4 ++-- scalar/struct.go | 2 +- scalar/time.go | 2 +- scalar/timestamp.go | 2 +- scalar/uint.go | 2 +- scalar/uuid.go | 2 +- scheduler/scheduler.go | 4 ++-- schema/arrow.go | 2 +- schema/column.go | 2 +- schema/errors.go | 2 +- schema/meta.go | 2 +- schema/table.go | 2 +- schema/testdata.go | 6 +++--- schema/validators.go | 2 +- transformers/struct.go | 2 +- types/extensions.go | 2 +- types/inet.go | 4 ++-- types/json.go | 4 ++-- types/mac.go | 4 ++-- types/register.go | 2 +- types/uuid.go | 4 ++-- writers/batchwriter/batchwriter.go | 2 +- writers/mixedbatchwriter/mixedbatchwriter.go | 2 +- writers/streamingbatchwriter/streamingbatchwriter.go | 2 +- 62 files changed, 96 insertions(+), 96 deletions(-) diff --git a/examples/simple_plugin/plugin/client.go b/examples/simple_plugin/plugin/client.go index 7f2eedaba4..9a186c829e 100644 --- a/examples/simple_plugin/plugin/client.go +++ b/examples/simple_plugin/plugin/client.go @@ -5,7 +5,7 @@ import ( "encoding/json" "fmt" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/examples/simple_plugin/client" "github.com/cloudquery/plugin-sdk/examples/simple_plugin/services" "github.com/cloudquery/plugin-sdk/v4/message" diff --git a/examples/simple_plugin/services/test.go b/examples/simple_plugin/services/test.go index ba97b22097..b72cc72a85 100644 --- a/examples/simple_plugin/services/test.go +++ b/examples/simple_plugin/services/test.go @@ -3,7 +3,7 @@ package services import ( "context" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/examples/simple_plugin/client" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/clients/state/v3/state.go b/internal/clients/state/v3/state.go index a8e3accece..acc1d39a02 100644 --- a/internal/clients/state/v3/state.go +++ b/internal/clients/state/v3/state.go @@ -6,10 +6,10 @@ import ( "io" "sync" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/ipc" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow/memory" pb "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 719eafe9c1..7e37560677 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/schema" diff --git a/internal/pk/pk.go b/internal/pk/pk.go index eae0f90dc6..ca8c5f2806 100644 --- a/internal/pk/pk.go +++ b/internal/pk/pk.go @@ -3,7 +3,7 @@ package pk import ( "strings" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 461902d589..05236e8a1a 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -6,8 +6,8 @@ import ( "io" "sync" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" pbBase "github.com/cloudquery/plugin-pb-go/pb/base/v0" pb "github.com/cloudquery/plugin-pb-go/pb/destination/v0" "github.com/cloudquery/plugin-pb-go/specs" diff --git a/internal/servers/destination/v0/schemav2tov3.go b/internal/servers/destination/v0/schemav2tov3.go index 35c33dfc7f..3b63448b15 100644 --- a/internal/servers/destination/v0/schemav2tov3.go +++ b/internal/servers/destination/v0/schemav2tov3.go @@ -4,9 +4,9 @@ import ( "encoding/json" "strings" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" schemav2 "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/types" diff --git a/internal/servers/destination/v1/convert.go b/internal/servers/destination/v1/convert.go index 36cd8406fe..7fc57f2f01 100644 --- a/internal/servers/destination/v1/convert.go +++ b/internal/servers/destination/v1/convert.go @@ -3,8 +3,8 @@ package destination import ( "bytes" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/ipc" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/servers/destination/v1/destinations.go b/internal/servers/destination/v1/destinations.go index 8b57a84b93..5a72861c24 100644 --- a/internal/servers/destination/v1/destinations.go +++ b/internal/servers/destination/v1/destinations.go @@ -7,9 +7,9 @@ import ( "io" "sync" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/ipc" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow/memory" pb "github.com/cloudquery/plugin-pb-go/pb/destination/v1" "github.com/cloudquery/plugin-pb-go/specs" "github.com/cloudquery/plugin-sdk/v4/message" diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index b2b1cefc2b..3193b2bab8 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -5,7 +5,7 @@ import ( "fmt" "io" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" pb "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" diff --git a/message/sync_message.go b/message/sync_message.go index e18bf43ee9..fd200dbd9d 100644 --- a/message/sync_message.go +++ b/message/sync_message.go @@ -3,7 +3,7 @@ package message import ( "slices" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/message/write_message.go b/message/write_message.go index 1398b4490c..db44890d68 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -4,7 +4,7 @@ import ( "slices" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/diff.go b/plugin/diff.go index 8149df6d4f..d32bb9d585 100644 --- a/plugin/diff.go +++ b/plugin/diff.go @@ -4,9 +4,9 @@ import ( "fmt" "strings" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" ) func RecordsDiff(sc *arrow.Schema, have, want []arrow.Record) string { diff --git a/plugin/nulls.go b/plugin/nulls.go index 73acdb9311..9cec98d6d1 100644 --- a/plugin/nulls.go +++ b/plugin/nulls.go @@ -1,9 +1,9 @@ package plugin import ( - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" ) func stripNullsFromLists(list array.ListLike) array.ListLike { diff --git a/plugin/plugin.go b/plugin/plugin.go index a65400bc77..5d798502f2 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -6,7 +6,7 @@ import ( "fmt" "sync" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" diff --git a/plugin/plugin_destination.go b/plugin/plugin_destination.go index 21174d6091..e8b9b8aefd 100644 --- a/plugin/plugin_destination.go +++ b/plugin/plugin_destination.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/plugin_read.go b/plugin/plugin_read.go index e09db9058d..66b5d23868 100644 --- a/plugin/plugin_read.go +++ b/plugin/plugin_read.go @@ -3,7 +3,7 @@ package plugin import ( "context" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/sort.go b/plugin/sort.go index 99602511f8..81acd39165 100644 --- a/plugin/sort.go +++ b/plugin/sort.go @@ -3,8 +3,8 @@ package plugin import ( "sort" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/testing_write.go b/plugin/testing_write.go index 6d52573167..4d2893c918 100644 --- a/plugin/testing_write.go +++ b/plugin/testing_write.go @@ -5,7 +5,7 @@ import ( "math/rand" "testing" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index baab1d666f..907e58473f 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -4,9 +4,9 @@ import ( "context" "time" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/stretchr/testify/require" diff --git a/plugin/testing_write_insert.go b/plugin/testing_write_insert.go index 79ec87708a..cff3e19f84 100644 --- a/plugin/testing_write_insert.go +++ b/plugin/testing_write_insert.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/testing_write_migrate.go b/plugin/testing_write_migrate.go index f6784fef52..5ee3811b19 100644 --- a/plugin/testing_write_migrate.go +++ b/plugin/testing_write_migrate.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/types" diff --git a/plugin/testing_write_upsert.go b/plugin/testing_write_upsert.go index a7af1a5cf4..b96d56f7c8 100644 --- a/plugin/testing_write_upsert.go +++ b/plugin/testing_write_upsert.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/scalar/binary.go b/scalar/binary.go index e675b011a8..2f14afa0a5 100644 --- a/scalar/binary.go +++ b/scalar/binary.go @@ -3,7 +3,7 @@ package scalar import ( "bytes" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Binary struct { diff --git a/scalar/bool.go b/scalar/bool.go index 38b481d7bb..ebbcc64957 100644 --- a/scalar/bool.go +++ b/scalar/bool.go @@ -3,7 +3,7 @@ package scalar import ( "strconv" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Bool struct { diff --git a/scalar/date32.go b/scalar/date32.go index f2757b7492..ca7fdee83b 100644 --- a/scalar/date32.go +++ b/scalar/date32.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Date32 struct { diff --git a/scalar/date64.go b/scalar/date64.go index b73e43dfc3..62007f0f9f 100644 --- a/scalar/date64.go +++ b/scalar/date64.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Date64 struct { diff --git a/scalar/decimal.go b/scalar/decimal.go index 49009c5de4..d31c85b6ab 100644 --- a/scalar/decimal.go +++ b/scalar/decimal.go @@ -1,9 +1,9 @@ package scalar import ( - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/decimal128" - "github.com/apache/arrow/go/v14/arrow/decimal256" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/decimal128" + "github.com/apache/arrow/go/v13/arrow/decimal256" ) type Decimal256 struct { diff --git a/scalar/duration.go b/scalar/duration.go index 17a3e1b032..770875ec72 100644 --- a/scalar/duration.go +++ b/scalar/duration.go @@ -4,7 +4,7 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Duration struct { diff --git a/scalar/errors.go b/scalar/errors.go index 91960e2934..f64ad4b402 100644 --- a/scalar/errors.go +++ b/scalar/errors.go @@ -3,7 +3,7 @@ package scalar import ( "fmt" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) const ( diff --git a/scalar/float.go b/scalar/float.go index f3a097ed63..690ddc3a98 100644 --- a/scalar/float.go +++ b/scalar/float.go @@ -4,7 +4,7 @@ import ( "math" "strconv" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Float struct { diff --git a/scalar/inet.go b/scalar/inet.go index e587a3b8cb..3d6163cfc7 100644 --- a/scalar/inet.go +++ b/scalar/inet.go @@ -6,7 +6,7 @@ import ( "net" "strings" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/scalar/int.go b/scalar/int.go index 6035042b48..d702492c94 100644 --- a/scalar/int.go +++ b/scalar/int.go @@ -4,7 +4,7 @@ import ( "math" "strconv" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Int struct { diff --git a/scalar/interval.go b/scalar/interval.go index 96cfb5ce4f..d39269516f 100644 --- a/scalar/interval.go +++ b/scalar/interval.go @@ -3,7 +3,7 @@ package scalar import ( "encoding/json" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type MonthInterval struct { diff --git a/scalar/json.go b/scalar/json.go index 62c160893e..c0c5fceea3 100644 --- a/scalar/json.go +++ b/scalar/json.go @@ -5,7 +5,7 @@ import ( "encoding/json" "reflect" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/scalar/list.go b/scalar/list.go index b1b1cd2ba0..3cf3e26939 100644 --- a/scalar/list.go +++ b/scalar/list.go @@ -4,7 +4,7 @@ import ( "reflect" "strings" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type List struct { diff --git a/scalar/mac.go b/scalar/mac.go index 7973c2ad44..5350a64bee 100644 --- a/scalar/mac.go +++ b/scalar/mac.go @@ -3,7 +3,7 @@ package scalar import ( "net" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/scalar/scalar.go b/scalar/scalar.go index 501502b7e5..08718a858e 100644 --- a/scalar/scalar.go +++ b/scalar/scalar.go @@ -3,10 +3,10 @@ package scalar import ( "fmt" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/float16" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/float16" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/types" "golang.org/x/exp/maps" ) diff --git a/scalar/string.go b/scalar/string.go index 7b137bfca1..7997aded97 100644 --- a/scalar/string.go +++ b/scalar/string.go @@ -3,8 +3,8 @@ package scalar import ( "fmt" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" ) const nullValueStr = array.NullValueStr diff --git a/scalar/struct.go b/scalar/struct.go index 2f2a0b1de3..e53f34cc16 100644 --- a/scalar/struct.go +++ b/scalar/struct.go @@ -5,7 +5,7 @@ import ( "encoding/json" "reflect" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Struct struct { diff --git a/scalar/time.go b/scalar/time.go index 99e65daba9..617ec9ac88 100644 --- a/scalar/time.go +++ b/scalar/time.go @@ -1,7 +1,7 @@ package scalar import ( - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Time struct { diff --git a/scalar/timestamp.go b/scalar/timestamp.go index 76abd3cada..371f676572 100644 --- a/scalar/timestamp.go +++ b/scalar/timestamp.go @@ -6,7 +6,7 @@ import ( "math" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) const ( diff --git a/scalar/uint.go b/scalar/uint.go index f3cfdd2adf..61daf95cc2 100644 --- a/scalar/uint.go +++ b/scalar/uint.go @@ -4,7 +4,7 @@ import ( "math" "strconv" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type Uint struct { diff --git a/scalar/uuid.go b/scalar/uuid.go index a3c61f408c..dfae523cbd 100644 --- a/scalar/uuid.go +++ b/scalar/uuid.go @@ -4,7 +4,7 @@ import ( "encoding/hex" "fmt" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/types" "github.com/google/uuid" ) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 2332500e33..67bc01bad8 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,8 +8,8 @@ import ( "sync/atomic" "time" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scalar" diff --git a/schema/arrow.go b/schema/arrow.go index d653e7c6e9..8e83d8e994 100644 --- a/schema/arrow.go +++ b/schema/arrow.go @@ -1,7 +1,7 @@ package schema import ( - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) const ( diff --git a/schema/column.go b/schema/column.go index a080f048c5..7ad2062b0a 100644 --- a/schema/column.go +++ b/schema/column.go @@ -5,7 +5,7 @@ import ( "encoding/json" "strings" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) type ColumnList []Column diff --git a/schema/errors.go b/schema/errors.go index aacb755e5a..8535000f08 100644 --- a/schema/errors.go +++ b/schema/errors.go @@ -4,7 +4,7 @@ package schema import ( "fmt" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) const ( diff --git a/schema/meta.go b/schema/meta.go index b184207078..bd5ca2de7e 100644 --- a/schema/meta.go +++ b/schema/meta.go @@ -3,7 +3,7 @@ package schema import ( "context" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/scalar" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/schema/table.go b/schema/table.go index 67c33df51e..6b9e6f973e 100644 --- a/schema/table.go +++ b/schema/table.go @@ -6,7 +6,7 @@ import ( "regexp" "slices" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/glob" ) diff --git a/schema/testdata.go b/schema/testdata.go index adf191e58b..de6e89bfdf 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -8,9 +8,9 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" - "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/types" "github.com/google/uuid" "golang.org/x/exp/rand" diff --git a/schema/validators.go b/schema/validators.go index 7c8962d017..659237a36b 100644 --- a/schema/validators.go +++ b/schema/validators.go @@ -1,7 +1,7 @@ package schema import ( - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" ) func FindEmptyColumns(table *Table, records []arrow.Record) []string { diff --git a/transformers/struct.go b/transformers/struct.go index bb0bd75883..5e0b8c5da3 100644 --- a/transformers/struct.go +++ b/transformers/struct.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/types" diff --git a/types/extensions.go b/types/extensions.go index 384d5cc5fa..9f30f1c705 100644 --- a/types/extensions.go +++ b/types/extensions.go @@ -1,6 +1,6 @@ package types -import "github.com/apache/arrow/go/v14/arrow" +import "github.com/apache/arrow/go/v13/arrow" var ExtensionTypes = struct { UUID arrow.ExtensionType diff --git a/types/inet.go b/types/inet.go index f1a26d778f..3a2bbe78d6 100644 --- a/types/inet.go +++ b/types/inet.go @@ -7,8 +7,8 @@ import ( "reflect" "strings" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" "github.com/goccy/go-json" ) diff --git a/types/json.go b/types/json.go index ba90a3fdb8..3068e13fd2 100644 --- a/types/json.go +++ b/types/json.go @@ -8,8 +8,8 @@ import ( "github.com/goccy/go-json" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" ) type JSONBuilder struct { diff --git a/types/mac.go b/types/mac.go index b06bc9f9da..469e3dc06f 100644 --- a/types/mac.go +++ b/types/mac.go @@ -7,8 +7,8 @@ import ( "reflect" "strings" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" "github.com/goccy/go-json" ) diff --git a/types/register.go b/types/register.go index 0c614d49cb..65d9822281 100644 --- a/types/register.go +++ b/types/register.go @@ -1,6 +1,6 @@ package types -import "github.com/apache/arrow/go/v14/arrow" +import "github.com/apache/arrow/go/v13/arrow" func RegisterAllExtensions() error { if err := arrow.RegisterExtensionType(&UUIDType{}); err != nil { diff --git a/types/uuid.go b/types/uuid.go index e026e7ab91..c85d55464f 100644 --- a/types/uuid.go +++ b/types/uuid.go @@ -8,8 +8,8 @@ import ( "github.com/goccy/go-json" - "github.com/apache/arrow/go/v14/arrow" - "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" "github.com/google/uuid" ) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 61a044f2a1..2166ea8206 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/apache/arrow/go/v14/arrow/util" + "github.com/apache/arrow/go/v13/arrow/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/writers" diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index 10fe2aeb29..37ab09f835 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/apache/arrow/go/v14/arrow/util" + "github.com/apache/arrow/go/v13/arrow/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/writers" "github.com/rs/zerolog" diff --git a/writers/streamingbatchwriter/streamingbatchwriter.go b/writers/streamingbatchwriter/streamingbatchwriter.go index 2273a0dbf3..50c1779070 100644 --- a/writers/streamingbatchwriter/streamingbatchwriter.go +++ b/writers/streamingbatchwriter/streamingbatchwriter.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/apache/arrow/go/v14/arrow/util" + "github.com/apache/arrow/go/v13/arrow/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/writers" From fe933687e208c181df4cc367e117c644d3005e99 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 10 Oct 2023 15:27:10 -0500 Subject: [PATCH 05/15] v14 --- examples/simple_plugin/plugin/client.go | 2 +- examples/simple_plugin/services/test.go | 2 +- internal/clients/state/v3/state.go | 8 ++++---- internal/memdb/memdb.go | 4 ++-- internal/pk/pk.go | 2 +- internal/servers/destination/v0/destinations.go | 4 ++-- internal/servers/destination/v0/schemav2tov3.go | 6 +++--- internal/servers/destination/v1/convert.go | 4 ++-- internal/servers/destination/v1/destinations.go | 6 +++--- internal/servers/plugin/v3/plugin.go | 2 +- message/sync_message.go | 2 +- message/write_message.go | 2 +- plugin/diff.go | 6 +++--- plugin/nulls.go | 6 +++--- plugin/plugin.go | 2 +- plugin/plugin_destination.go | 2 +- plugin/plugin_read.go | 2 +- plugin/sort.go | 4 ++-- plugin/testing_write.go | 2 +- plugin/testing_write_delete.go | 6 +++--- plugin/testing_write_insert.go | 6 +++--- plugin/testing_write_migrate.go | 2 +- plugin/testing_write_upsert.go | 6 +++--- scalar/binary.go | 2 +- scalar/bool.go | 2 +- scalar/date32.go | 2 +- scalar/date64.go | 2 +- scalar/decimal.go | 6 +++--- scalar/duration.go | 2 +- scalar/errors.go | 2 +- scalar/float.go | 2 +- scalar/inet.go | 2 +- scalar/int.go | 2 +- scalar/interval.go | 2 +- scalar/json.go | 2 +- scalar/list.go | 2 +- scalar/mac.go | 2 +- scalar/scalar.go | 8 ++++---- scalar/string.go | 4 ++-- scalar/struct.go | 2 +- scalar/time.go | 2 +- scalar/timestamp.go | 2 +- scalar/uint.go | 2 +- scalar/uuid.go | 2 +- scheduler/scheduler.go | 4 ++-- schema/arrow.go | 2 +- schema/column.go | 2 +- schema/errors.go | 2 +- schema/meta.go | 2 +- schema/table.go | 2 +- schema/testdata.go | 6 +++--- schema/validators.go | 2 +- transformers/struct.go | 2 +- types/extensions.go | 2 +- types/inet.go | 4 ++-- types/json.go | 4 ++-- types/mac.go | 4 ++-- types/register.go | 2 +- types/uuid.go | 4 ++-- writers/batchwriter/batchwriter.go | 2 +- writers/mixedbatchwriter/mixedbatchwriter.go | 2 +- writers/streamingbatchwriter/streamingbatchwriter.go | 2 +- 62 files changed, 96 insertions(+), 96 deletions(-) diff --git a/examples/simple_plugin/plugin/client.go b/examples/simple_plugin/plugin/client.go index 9a186c829e..7f2eedaba4 100644 --- a/examples/simple_plugin/plugin/client.go +++ b/examples/simple_plugin/plugin/client.go @@ -5,7 +5,7 @@ import ( "encoding/json" "fmt" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/examples/simple_plugin/client" "github.com/cloudquery/plugin-sdk/examples/simple_plugin/services" "github.com/cloudquery/plugin-sdk/v4/message" diff --git a/examples/simple_plugin/services/test.go b/examples/simple_plugin/services/test.go index b72cc72a85..ba97b22097 100644 --- a/examples/simple_plugin/services/test.go +++ b/examples/simple_plugin/services/test.go @@ -3,7 +3,7 @@ package services import ( "context" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/examples/simple_plugin/client" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/clients/state/v3/state.go b/internal/clients/state/v3/state.go index acc1d39a02..a8e3accece 100644 --- a/internal/clients/state/v3/state.go +++ b/internal/clients/state/v3/state.go @@ -6,10 +6,10 @@ import ( "io" "sync" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/ipc" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/ipc" + "github.com/apache/arrow/go/v14/arrow/memory" pb "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 7e37560677..719eafe9c1 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/schema" diff --git a/internal/pk/pk.go b/internal/pk/pk.go index ca8c5f2806..eae0f90dc6 100644 --- a/internal/pk/pk.go +++ b/internal/pk/pk.go @@ -3,7 +3,7 @@ package pk import ( "strings" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 05236e8a1a..461902d589 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -6,8 +6,8 @@ import ( "io" "sync" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" pbBase "github.com/cloudquery/plugin-pb-go/pb/base/v0" pb "github.com/cloudquery/plugin-pb-go/pb/destination/v0" "github.com/cloudquery/plugin-pb-go/specs" diff --git a/internal/servers/destination/v0/schemav2tov3.go b/internal/servers/destination/v0/schemav2tov3.go index 3b63448b15..35c33dfc7f 100644 --- a/internal/servers/destination/v0/schemav2tov3.go +++ b/internal/servers/destination/v0/schemav2tov3.go @@ -4,9 +4,9 @@ import ( "encoding/json" "strings" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" schemav2 "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/types" diff --git a/internal/servers/destination/v1/convert.go b/internal/servers/destination/v1/convert.go index 7fc57f2f01..36cd8406fe 100644 --- a/internal/servers/destination/v1/convert.go +++ b/internal/servers/destination/v1/convert.go @@ -3,8 +3,8 @@ package destination import ( "bytes" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/ipc" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/internal/servers/destination/v1/destinations.go b/internal/servers/destination/v1/destinations.go index 5a72861c24..8b57a84b93 100644 --- a/internal/servers/destination/v1/destinations.go +++ b/internal/servers/destination/v1/destinations.go @@ -7,9 +7,9 @@ import ( "io" "sync" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/ipc" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/ipc" + "github.com/apache/arrow/go/v14/arrow/memory" pb "github.com/cloudquery/plugin-pb-go/pb/destination/v1" "github.com/cloudquery/plugin-pb-go/specs" "github.com/cloudquery/plugin-sdk/v4/message" diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index 3193b2bab8..b2b1cefc2b 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -5,7 +5,7 @@ import ( "fmt" "io" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" pb "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" diff --git a/message/sync_message.go b/message/sync_message.go index fd200dbd9d..e18bf43ee9 100644 --- a/message/sync_message.go +++ b/message/sync_message.go @@ -3,7 +3,7 @@ package message import ( "slices" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/message/write_message.go b/message/write_message.go index db44890d68..1398b4490c 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -4,7 +4,7 @@ import ( "slices" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/diff.go b/plugin/diff.go index d32bb9d585..8149df6d4f 100644 --- a/plugin/diff.go +++ b/plugin/diff.go @@ -4,9 +4,9 @@ import ( "fmt" "strings" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" ) func RecordsDiff(sc *arrow.Schema, have, want []arrow.Record) string { diff --git a/plugin/nulls.go b/plugin/nulls.go index 9cec98d6d1..73acdb9311 100644 --- a/plugin/nulls.go +++ b/plugin/nulls.go @@ -1,9 +1,9 @@ package plugin import ( - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" ) func stripNullsFromLists(list array.ListLike) array.ListLike { diff --git a/plugin/plugin.go b/plugin/plugin.go index 5d798502f2..a65400bc77 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -6,7 +6,7 @@ import ( "fmt" "sync" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" diff --git a/plugin/plugin_destination.go b/plugin/plugin_destination.go index e8b9b8aefd..21174d6091 100644 --- a/plugin/plugin_destination.go +++ b/plugin/plugin_destination.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/plugin_read.go b/plugin/plugin_read.go index 66b5d23868..e09db9058d 100644 --- a/plugin/plugin_read.go +++ b/plugin/plugin_read.go @@ -3,7 +3,7 @@ package plugin import ( "context" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/sort.go b/plugin/sort.go index 81acd39165..99602511f8 100644 --- a/plugin/sort.go +++ b/plugin/sort.go @@ -3,8 +3,8 @@ package plugin import ( "sort" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/testing_write.go b/plugin/testing_write.go index 4d2893c918..6d52573167 100644 --- a/plugin/testing_write.go +++ b/plugin/testing_write.go @@ -5,7 +5,7 @@ import ( "math/rand" "testing" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index 907e58473f..baab1d666f 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -4,9 +4,9 @@ import ( "context" "time" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/stretchr/testify/require" diff --git a/plugin/testing_write_insert.go b/plugin/testing_write_insert.go index cff3e19f84..79ec87708a 100644 --- a/plugin/testing_write_insert.go +++ b/plugin/testing_write_insert.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/plugin/testing_write_migrate.go b/plugin/testing_write_migrate.go index 5ee3811b19..f6784fef52 100644 --- a/plugin/testing_write_migrate.go +++ b/plugin/testing_write_migrate.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/types" diff --git a/plugin/testing_write_upsert.go b/plugin/testing_write_upsert.go index b96d56f7c8..a7af1a5cf4 100644 --- a/plugin/testing_write_upsert.go +++ b/plugin/testing_write_upsert.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) diff --git a/scalar/binary.go b/scalar/binary.go index 2f14afa0a5..e675b011a8 100644 --- a/scalar/binary.go +++ b/scalar/binary.go @@ -3,7 +3,7 @@ package scalar import ( "bytes" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Binary struct { diff --git a/scalar/bool.go b/scalar/bool.go index ebbcc64957..38b481d7bb 100644 --- a/scalar/bool.go +++ b/scalar/bool.go @@ -3,7 +3,7 @@ package scalar import ( "strconv" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Bool struct { diff --git a/scalar/date32.go b/scalar/date32.go index ca7fdee83b..f2757b7492 100644 --- a/scalar/date32.go +++ b/scalar/date32.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Date32 struct { diff --git a/scalar/date64.go b/scalar/date64.go index 62007f0f9f..b73e43dfc3 100644 --- a/scalar/date64.go +++ b/scalar/date64.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Date64 struct { diff --git a/scalar/decimal.go b/scalar/decimal.go index d31c85b6ab..49009c5de4 100644 --- a/scalar/decimal.go +++ b/scalar/decimal.go @@ -1,9 +1,9 @@ package scalar import ( - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/decimal128" - "github.com/apache/arrow/go/v13/arrow/decimal256" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/decimal128" + "github.com/apache/arrow/go/v14/arrow/decimal256" ) type Decimal256 struct { diff --git a/scalar/duration.go b/scalar/duration.go index 770875ec72..17a3e1b032 100644 --- a/scalar/duration.go +++ b/scalar/duration.go @@ -4,7 +4,7 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Duration struct { diff --git a/scalar/errors.go b/scalar/errors.go index f64ad4b402..91960e2934 100644 --- a/scalar/errors.go +++ b/scalar/errors.go @@ -3,7 +3,7 @@ package scalar import ( "fmt" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) const ( diff --git a/scalar/float.go b/scalar/float.go index 690ddc3a98..f3a097ed63 100644 --- a/scalar/float.go +++ b/scalar/float.go @@ -4,7 +4,7 @@ import ( "math" "strconv" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Float struct { diff --git a/scalar/inet.go b/scalar/inet.go index 3d6163cfc7..e587a3b8cb 100644 --- a/scalar/inet.go +++ b/scalar/inet.go @@ -6,7 +6,7 @@ import ( "net" "strings" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/scalar/int.go b/scalar/int.go index d702492c94..6035042b48 100644 --- a/scalar/int.go +++ b/scalar/int.go @@ -4,7 +4,7 @@ import ( "math" "strconv" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Int struct { diff --git a/scalar/interval.go b/scalar/interval.go index d39269516f..96cfb5ce4f 100644 --- a/scalar/interval.go +++ b/scalar/interval.go @@ -3,7 +3,7 @@ package scalar import ( "encoding/json" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type MonthInterval struct { diff --git a/scalar/json.go b/scalar/json.go index c0c5fceea3..62c160893e 100644 --- a/scalar/json.go +++ b/scalar/json.go @@ -5,7 +5,7 @@ import ( "encoding/json" "reflect" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/scalar/list.go b/scalar/list.go index 3cf3e26939..b1b1cd2ba0 100644 --- a/scalar/list.go +++ b/scalar/list.go @@ -4,7 +4,7 @@ import ( "reflect" "strings" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type List struct { diff --git a/scalar/mac.go b/scalar/mac.go index 5350a64bee..7973c2ad44 100644 --- a/scalar/mac.go +++ b/scalar/mac.go @@ -3,7 +3,7 @@ package scalar import ( "net" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/scalar/scalar.go b/scalar/scalar.go index 08718a858e..501502b7e5 100644 --- a/scalar/scalar.go +++ b/scalar/scalar.go @@ -3,10 +3,10 @@ package scalar import ( "fmt" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/float16" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/float16" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/types" "golang.org/x/exp/maps" ) diff --git a/scalar/string.go b/scalar/string.go index 7997aded97..7b137bfca1 100644 --- a/scalar/string.go +++ b/scalar/string.go @@ -3,8 +3,8 @@ package scalar import ( "fmt" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" ) const nullValueStr = array.NullValueStr diff --git a/scalar/struct.go b/scalar/struct.go index e53f34cc16..2f2a0b1de3 100644 --- a/scalar/struct.go +++ b/scalar/struct.go @@ -5,7 +5,7 @@ import ( "encoding/json" "reflect" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Struct struct { diff --git a/scalar/time.go b/scalar/time.go index 617ec9ac88..99e65daba9 100644 --- a/scalar/time.go +++ b/scalar/time.go @@ -1,7 +1,7 @@ package scalar import ( - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Time struct { diff --git a/scalar/timestamp.go b/scalar/timestamp.go index 371f676572..76abd3cada 100644 --- a/scalar/timestamp.go +++ b/scalar/timestamp.go @@ -6,7 +6,7 @@ import ( "math" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) const ( diff --git a/scalar/uint.go b/scalar/uint.go index 61daf95cc2..f3cfdd2adf 100644 --- a/scalar/uint.go +++ b/scalar/uint.go @@ -4,7 +4,7 @@ import ( "math" "strconv" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type Uint struct { diff --git a/scalar/uuid.go b/scalar/uuid.go index dfae523cbd..a3c61f408c 100644 --- a/scalar/uuid.go +++ b/scalar/uuid.go @@ -4,7 +4,7 @@ import ( "encoding/hex" "fmt" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/types" "github.com/google/uuid" ) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 67bc01bad8..2332500e33 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,8 +8,8 @@ import ( "sync/atomic" "time" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scalar" diff --git a/schema/arrow.go b/schema/arrow.go index 8e83d8e994..d653e7c6e9 100644 --- a/schema/arrow.go +++ b/schema/arrow.go @@ -1,7 +1,7 @@ package schema import ( - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) const ( diff --git a/schema/column.go b/schema/column.go index 7ad2062b0a..a080f048c5 100644 --- a/schema/column.go +++ b/schema/column.go @@ -5,7 +5,7 @@ import ( "encoding/json" "strings" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) type ColumnList []Column diff --git a/schema/errors.go b/schema/errors.go index 8535000f08..aacb755e5a 100644 --- a/schema/errors.go +++ b/schema/errors.go @@ -4,7 +4,7 @@ package schema import ( "fmt" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) const ( diff --git a/schema/meta.go b/schema/meta.go index bd5ca2de7e..b184207078 100644 --- a/schema/meta.go +++ b/schema/meta.go @@ -3,7 +3,7 @@ package schema import ( "context" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/scalar" "github.com/cloudquery/plugin-sdk/v4/types" ) diff --git a/schema/table.go b/schema/table.go index 6b9e6f973e..67c33df51e 100644 --- a/schema/table.go +++ b/schema/table.go @@ -6,7 +6,7 @@ import ( "regexp" "slices" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/glob" ) diff --git a/schema/testdata.go b/schema/testdata.go index de6e89bfdf..adf191e58b 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -8,9 +8,9 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/types" "github.com/google/uuid" "golang.org/x/exp/rand" diff --git a/schema/validators.go b/schema/validators.go index 659237a36b..7c8962d017 100644 --- a/schema/validators.go +++ b/schema/validators.go @@ -1,7 +1,7 @@ package schema import ( - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" ) func FindEmptyColumns(table *Table, records []arrow.Record) []string { diff --git a/transformers/struct.go b/transformers/struct.go index 5e0b8c5da3..bb0bd75883 100644 --- a/transformers/struct.go +++ b/transformers/struct.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/types" diff --git a/types/extensions.go b/types/extensions.go index 9f30f1c705..384d5cc5fa 100644 --- a/types/extensions.go +++ b/types/extensions.go @@ -1,6 +1,6 @@ package types -import "github.com/apache/arrow/go/v13/arrow" +import "github.com/apache/arrow/go/v14/arrow" var ExtensionTypes = struct { UUID arrow.ExtensionType diff --git a/types/inet.go b/types/inet.go index 3a2bbe78d6..f1a26d778f 100644 --- a/types/inet.go +++ b/types/inet.go @@ -7,8 +7,8 @@ import ( "reflect" "strings" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" "github.com/goccy/go-json" ) diff --git a/types/json.go b/types/json.go index 3068e13fd2..ba90a3fdb8 100644 --- a/types/json.go +++ b/types/json.go @@ -8,8 +8,8 @@ import ( "github.com/goccy/go-json" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" ) type JSONBuilder struct { diff --git a/types/mac.go b/types/mac.go index 469e3dc06f..b06bc9f9da 100644 --- a/types/mac.go +++ b/types/mac.go @@ -7,8 +7,8 @@ import ( "reflect" "strings" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" "github.com/goccy/go-json" ) diff --git a/types/register.go b/types/register.go index 65d9822281..0c614d49cb 100644 --- a/types/register.go +++ b/types/register.go @@ -1,6 +1,6 @@ package types -import "github.com/apache/arrow/go/v13/arrow" +import "github.com/apache/arrow/go/v14/arrow" func RegisterAllExtensions() error { if err := arrow.RegisterExtensionType(&UUIDType{}); err != nil { diff --git a/types/uuid.go b/types/uuid.go index c85d55464f..e026e7ab91 100644 --- a/types/uuid.go +++ b/types/uuid.go @@ -8,8 +8,8 @@ import ( "github.com/goccy/go-json" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" "github.com/google/uuid" ) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 2166ea8206..61a044f2a1 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/apache/arrow/go/v13/arrow/util" + "github.com/apache/arrow/go/v14/arrow/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/writers" diff --git a/writers/mixedbatchwriter/mixedbatchwriter.go b/writers/mixedbatchwriter/mixedbatchwriter.go index 37ab09f835..10fe2aeb29 100644 --- a/writers/mixedbatchwriter/mixedbatchwriter.go +++ b/writers/mixedbatchwriter/mixedbatchwriter.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/apache/arrow/go/v13/arrow/util" + "github.com/apache/arrow/go/v14/arrow/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/writers" "github.com/rs/zerolog" diff --git a/writers/streamingbatchwriter/streamingbatchwriter.go b/writers/streamingbatchwriter/streamingbatchwriter.go index 50c1779070..2273a0dbf3 100644 --- a/writers/streamingbatchwriter/streamingbatchwriter.go +++ b/writers/streamingbatchwriter/streamingbatchwriter.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/apache/arrow/go/v13/arrow/util" + "github.com/apache/arrow/go/v14/arrow/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/writers" From e808221e8979a3884645cabce9d91b618f91349d Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 11 Oct 2023 07:44:59 -0500 Subject: [PATCH 06/15] Update testing_write_delete.go --- plugin/testing_write_delete.go | 123 +++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index baab1d666f..f4210ac028 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -142,3 +142,126 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context) { require.EqualValuesf(s.t, rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second delete stale") require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{nullRecord}), "record differs") } + +func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { + tableName := s.tableNameForTest("delete_all_rows") + syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision). + Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087 + table := &schema.Table{ + Name: tableName, + Columns: schema.ColumnList{ + schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64, PrimaryKey: true, NotNull: true}, + schema.CqSourceNameColumn, + schema.CqSyncTimeColumn, + }, + } + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table") + const sourceName = "source-test" + + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*array.Int64Builder).Append(0) + bldr.Field(1).(*array.StringBuilder).Append(sourceName) + bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime) + record1 := bldr.NewRecord() + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record") + record1 = s.handleNulls(record1) // we process nulls after writing + + records, err := s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read") + require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items") + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: table.Name, + }, + }), "failed to delete stale records") + + records, err = s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read after delete stale") + require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete stale") + require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete stale") + + bldr.Field(0).(*array.Int64Builder).Append(1) + bldr.Field(1).(*array.StringBuilder).Append(sourceName) + bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second)) + record2 := bldr.NewRecord() + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record") + record2 = s.handleNulls(record2) // we process nulls after writing + + records, err = s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read second time") + sortRecords(table, records, "id") + require.EqualValuesf(s.t, 2, TotalRows(records), "unexpected amount of items second time") + require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1, record2}), "record differs after delete stale") + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteStale{ + TableName: table.Name, + SourceName: sourceName, + SyncTime: syncTime.Add(time.Second), + }), "failed to delete stale records second time") + + records, err = s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read after second delete stale") + require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after second delete stale") + require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record2}), "record differs after second delete stale") +} + +func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) { + tableName := s.tableNameForTest("delete_all_records") + syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision). + Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087 + table := &schema.Table{ + Name: tableName, + Columns: schema.ColumnList{ + schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64, PrimaryKey: true, NotNull: true}, + schema.CqSourceNameColumn, + schema.CqSyncTimeColumn, + }, + } + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table") + const sourceName = "source-test" + + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*array.Int64Builder).Append(0) + bldr.Field(1).(*array.StringBuilder).Append(sourceName) + bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime) + record1 := bldr.NewRecord() + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record") + record1 = s.handleNulls(record1) // we process nulls after writing + + records, err := s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read") + require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items") + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: table.Name, + }, + }), "failed to delete records") + + records, err = s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read after delete all records") + require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete stale") + + bldr.Field(0).(*array.Int64Builder).Append(1) + bldr.Field(1).(*array.StringBuilder).Append(sourceName) + bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second)) + record2 := bldr.NewRecord() + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record") + record2 = s.handleNulls(record2) // we process nulls after writing + + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: table.Name, + }, + }), "failed to delete records second time") + + records, err = s.plugin.readAll(ctx, table) + require.NoErrorf(s.t, err, "failed to read second time") + sortRecords(table, records, "id") + require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items second time") +} From dec3e6ef06be627091d81b2c7ebaecb6ad793674 Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 11 Oct 2023 07:45:02 -0500 Subject: [PATCH 07/15] Update testing_write.go --- plugin/testing_write.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugin/testing_write.go b/plugin/testing_write.go index 6d52573167..1f9630e302 100644 --- a/plugin/testing_write.go +++ b/plugin/testing_write.go @@ -148,6 +148,15 @@ func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests, }) }) + t.Run("TestDeleteRecord", func(t *testing.T) { + // t.Run("Basic", func(t *testing.T) { + // suite.testDeleteRecordBasic(ctx) + // }) + t.Run("DeleteAll", func(t *testing.T) { + suite.testDeleteAllRecords(ctx) + }) + }) + t.Run("TestMigrate", func(t *testing.T) { if suite.tests.SkipMigrate { t.Skip("skipping " + t.Name()) From 15ce5b5ae5ce704d5ac9b123cbc8a24bb4e53b9a Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 11 Oct 2023 07:45:05 -0500 Subject: [PATCH 08/15] Update memdb.go --- internal/memdb/memdb.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 719eafe9c1..46e2b3c824 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -211,6 +211,8 @@ func (c *client) Write(ctx context.Context, msgs <-chan message.WriteMessage) er c.migrate(ctx, msg.Table) case *message.WriteDeleteStale: c.deleteStale(ctx, msg) + case *message.WriteDeleteRecord: + c.deleteRecord(ctx, msg) case *message.WriteInsert: sc := msg.Record.Schema() tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName) @@ -257,3 +259,40 @@ func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) { } c.memoryDB[tableName] = filteredTable } + +func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) { + var filteredTable []arrow.Record + tableName := msg.TableName + for i, row := range c.memoryDB[tableName] { + isMatch := true + for _, whereClause := range msg.WhereClauses { + for _, pred := range whereClause.And { + isMatch = isMatch && evaluatePredicate(pred, row) + } + + for _, pred := range whereClause.Or { + isMatch = isMatch || evaluatePredicate(pred, row) + } + } + if !isMatch { + filteredTable = append(filteredTable, c.memoryDB[tableName][i]) + } + + } + c.memoryDB[tableName] = filteredTable +} + +func evaluatePredicate(pred message.Predicate, record arrow.Record) bool { + sc := record.Schema() + indices := sc.FieldIndices(pred.Column) + if len(indices) == 0 { + return false + } + syncColIndex := indices[0] + switch pred.Operator { + case "eq": + return record.Column(syncColIndex).(*array.String).Value(0) == pred.Record.Column(syncColIndex).(*array.String).Value(0) + default: + return false + } +} From dadd6cbe607d9aaa6be78f0d108830ab7dcee3d1 Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 11 Oct 2023 09:56:45 -0500 Subject: [PATCH 09/15] more tests --- internal/memdb/memdb.go | 8 +++- plugin/testing_write.go | 6 +-- plugin/testing_write_delete.go | 78 ++++++++++++++++++++++------------ 3 files changed, 62 insertions(+), 30 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 46e2b3c824..3419f52bba 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -289,9 +289,15 @@ func evaluatePredicate(pred message.Predicate, record arrow.Record) bool { return false } syncColIndex := indices[0] + + if record.Column(syncColIndex).DataType() != pred.Record.Column(0).DataType() { + return false + } + // dataType := record.Column(syncColIndex).DataType() switch pred.Operator { case "eq": - return record.Column(syncColIndex).(*array.String).Value(0) == pred.Record.Column(syncColIndex).(*array.String).Value(0) + return record.Column(syncColIndex).String() == pred.Record.Column(0).String() + // return record.Column(syncColIndex).(*array.String).Value(0) == pred.Record.Column(0).(*array.String).Value(0) default: return false } diff --git a/plugin/testing_write.go b/plugin/testing_write.go index 1f9630e302..658865b750 100644 --- a/plugin/testing_write.go +++ b/plugin/testing_write.go @@ -149,9 +149,9 @@ func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests, }) t.Run("TestDeleteRecord", func(t *testing.T) { - // t.Run("Basic", func(t *testing.T) { - // suite.testDeleteRecordBasic(ctx) - // }) + t.Run("Basic", func(t *testing.T) { + suite.testDeleteRecordBasic(ctx) + }) t.Run("DeleteAll", func(t *testing.T) { suite.testDeleteAllRecords(ctx) }) diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index f4210ac028..fc76a594d7 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -171,41 +171,69 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { require.NoErrorf(s.t, err, "failed to read") require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items") + // create value for delete statement but nothing will be deleted because ID value isn't present + bldrDeleteNoMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{ + Name: tableName, + Columns: schema.ColumnList{ + schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64}, + }, + }).ToArrowSchema()) + bldrDeleteNoMatch.Field(0).(*array.Int64Builder).Append(1) + deleteValue := bldrDeleteNoMatch.NewRecord() + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ TableName: table.Name, + WhereClauses: message.WhereClauses{ + { + And: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: deleteValue, + }, + }, + }, + }, }, - }), "failed to delete stale records") + }), "failed to delete record no match") records, err = s.plugin.readAll(ctx, table) - require.NoErrorf(s.t, err, "failed to read after delete stale") - require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete stale") - require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete stale") + require.NoErrorf(s.t, err, "failed to read after delete with no match") + require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete with no match") + require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete with no match") - bldr.Field(0).(*array.Int64Builder).Append(1) - bldr.Field(1).(*array.StringBuilder).Append(sourceName) - bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second)) - record2 := bldr.NewRecord() + // create value for delete statement will be delete One record + bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{ + Name: tableName, + Columns: schema.ColumnList{ + schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64}, + }, + }).ToArrowSchema()) + bldrDeleteMatch.Field(0).(*array.Int64Builder).Append(0) + deleteValue = bldrDeleteMatch.NewRecord() - require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record") - record2 = s.handleNulls(record2) // we process nulls after writing + require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: table.Name, + WhereClauses: message.WhereClauses{ + { + And: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: deleteValue, + }, + }, + }, + }, + }, + }), "failed to delete record no match") records, err = s.plugin.readAll(ctx, table) - require.NoErrorf(s.t, err, "failed to read second time") - sortRecords(table, records, "id") - require.EqualValuesf(s.t, 2, TotalRows(records), "unexpected amount of items second time") - require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1, record2}), "record differs after delete stale") - - require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteStale{ - TableName: table.Name, - SourceName: sourceName, - SyncTime: syncTime.Add(time.Second), - }), "failed to delete stale records second time") + require.NoErrorf(s.t, err, "failed to read after delete with match") + require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete with match") - records, err = s.plugin.readAll(ctx, table) - require.NoErrorf(s.t, err, "failed to read after second delete stale") - require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after second delete stale") - require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record2}), "record differs after second delete stale") } func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) { @@ -230,7 +258,6 @@ func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) { record1 := bldr.NewRecord() require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record") - record1 = s.handleNulls(record1) // we process nulls after writing records, err := s.plugin.readAll(ctx, table) require.NoErrorf(s.t, err, "failed to read") @@ -252,7 +279,6 @@ func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) { record2 := bldr.NewRecord() require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record") - record2 = s.handleNulls(record2) // we process nulls after writing require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ From 3b4bcefc7936247c4076696bee96c258a8a6809c Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 11 Oct 2023 12:03:53 -0500 Subject: [PATCH 10/15] remove complexity --- internal/memdb/memdb.go | 13 ++-- internal/servers/plugin/v3/plugin.go | 103 +++++++++++++-------------- message/write_message.go | 4 +- plugin/testing_write_delete.go | 28 ++++---- 4 files changed, 70 insertions(+), 78 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 3419f52bba..1219f75edc 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -265,15 +265,14 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) tableName := msg.TableName for i, row := range c.memoryDB[tableName] { isMatch := true - for _, whereClause := range msg.WhereClauses { - for _, pred := range whereClause.And { - isMatch = isMatch && evaluatePredicate(pred, row) - } + for _, pred := range msg.WhereClause.And { + isMatch = isMatch && evaluatePredicate(pred, row) + } - for _, pred := range whereClause.Or { - isMatch = isMatch || evaluatePredicate(pred, row) - } + for _, pred := range msg.WhereClause.Or { + isMatch = isMatch || evaluatePredicate(pred, row) } + if !isMatch { filteredTable = append(filteredTable, c.memoryDB[tableName][i]) } diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index b2b1cefc2b..a60ec294fa 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -157,33 +157,32 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { }, } case *message.SyncDeleteRecord: - whereClauses := make([]*pb.WhereClause, len(m.WhereClauses)) - for i, whereClause := range m.WhereClauses { - whereClauses[i].And = make([]*pb.Predicate, len(whereClause.And)) - for j, value := range whereClause.And { - record, err := pb.RecordToBytes(value.Record) - if err != nil { - return status.Errorf(codes.Internal, "failed to encode record: %v", err) - } - whereClauses[i].And[j] = &pb.Predicate{ - Record: record, - Column: value.Column, - Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), - } + var whereClause *pb.WhereClause + whereClause.And = make([]*pb.Predicate, len(whereClause.And)) + for j, value := range m.WhereClause.And { + record, err := pb.RecordToBytes(value.Record) + if err != nil { + return status.Errorf(codes.Internal, "failed to encode record: %v", err) } - whereClauses[i].Or = make([]*pb.Predicate, len(whereClause.Or)) - for j, value := range whereClause.Or { - record, err := pb.RecordToBytes(value.Record) - if err != nil { - return status.Errorf(codes.Internal, "failed to encode record: %v", err) - } - whereClauses[i].Or[j] = &pb.Predicate{ - Record: record, - Column: value.Column, - Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), - } + whereClause.And[j] = &pb.Predicate{ + Record: record, + Column: value.Column, + Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), } } + whereClause.Or = make([]*pb.Predicate, len(whereClause.Or)) + for j, value := range m.WhereClause.Or { + record, err := pb.RecordToBytes(value.Record) + if err != nil { + return status.Errorf(codes.Internal, "failed to encode record: %v", err) + } + whereClause.Or[j] = &pb.Predicate{ + Record: record, + Column: value.Column, + Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), + } + + } tableRelations := make([]*pb.TableRelation, len(m.TableRelations)) for i, tr := range m.TableRelations { @@ -196,7 +195,7 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { DeleteRecord: &pb.Sync_MessageDeleteRecord{ TableName: m.TableName, TableRelations: tableRelations, - WhereClauses: whereClauses, + WhereClause: whereClause, }, } default: @@ -275,35 +274,35 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { } case *pb.Write_Request_DeleteRecord: - whereClauses := make(message.WhereClauses, len(pbMsg.DeleteRecord.WhereClauses)) - for i, whereClause := range pbMsg.DeleteRecord.WhereClauses { - whereClauses[i].And = make([]message.Predicate, len(whereClause.And)) - for j, value := range whereClause.And { - record, err := pb.NewRecordFromBytes(value.Record) - if err != nil { - pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) - break - } - whereClauses[i].And[j] = message.Predicate{ - Record: record, - Column: value.Column, - Operator: value.Operator.String(), - } + var whereClause message.WhereClause + + whereClause.And = make([]message.Predicate, len(whereClause.And)) + for j, value := range pbMsg.DeleteRecord.WhereClause.And { + record, err := pb.NewRecordFromBytes(value.Record) + if err != nil { + pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) + break } - whereClauses[i].Or = make([]message.Predicate, len(whereClause.Or)) - for j, value := range whereClause.Or { - record, err := pb.NewRecordFromBytes(value.Record) - if err != nil { - pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) - break - } - whereClauses[i].Or[j] = message.Predicate{ - Record: record, - Column: value.Column, - Operator: value.Operator.String(), - } + whereClause.And[j] = message.Predicate{ + Record: record, + Column: value.Column, + Operator: value.Operator.String(), } } + whereClause.Or = make([]message.Predicate, len(whereClause.Or)) + for j, value := range pbMsg.DeleteRecord.WhereClause.Or { + record, err := pb.NewRecordFromBytes(value.Record) + if err != nil { + pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) + break + } + whereClause.Or[j] = message.Predicate{ + Record: record, + Column: value.Column, + Operator: value.Operator.String(), + } + } + tableRelations := make([]message.TableRelation, len(pbMsg.DeleteRecord.TableRelations)) for i, tr := range pbMsg.DeleteRecord.TableRelations { tableRelations[i] = message.TableRelation{ @@ -315,7 +314,7 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { DeleteRecord: message.DeleteRecord{ TableName: pbMsg.DeleteRecord.TableName, TableRelations: tableRelations, - WhereClauses: whereClauses, + WhereClause: whereClause, }, } } diff --git a/message/write_message.go b/message/write_message.go index 1398b4490c..ca0de24ef8 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -146,12 +146,10 @@ type WhereClause struct { type TableRelations []TableRelation -type WhereClauses []WhereClause - type DeleteRecord struct { TableName string TableRelations TableRelations - WhereClauses WhereClauses + WhereClause WhereClause SyncTime time.Time } diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index fc76a594d7..9205741115 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -184,14 +184,12 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ TableName: table.Name, - WhereClauses: message.WhereClauses{ - { - And: []message.Predicate{ - { - Operator: "eq", - Column: "id", - Record: deleteValue, - }, + WhereClause: message.WhereClause{ + And: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: deleteValue, }, }, }, @@ -216,14 +214,12 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ TableName: table.Name, - WhereClauses: message.WhereClauses{ - { - And: []message.Predicate{ - { - Operator: "eq", - Column: "id", - Record: deleteValue, - }, + WhereClause: message.WhereClause{ + And: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: deleteValue, }, }, }, From 86b95b628f69765bf6677aaae4f32e2016a0f717 Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 11 Oct 2023 16:07:19 -0500 Subject: [PATCH 11/15] use new structure --- examples/simple_plugin/go.mod | 1 + internal/memdb/memdb.go | 23 +++++--- internal/servers/plugin/v3/plugin.go | 78 +++++++++++----------------- message/write_message.go | 16 ++++-- plugin/testing_write_delete.go | 30 ++++++----- 5 files changed, 77 insertions(+), 71 deletions(-) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index a1a0927937..cc27fc159a 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -11,6 +11,7 @@ require ( replace github.com/cloudquery/plugin-sdk/v4 => ../../ replace github.com/apache/arrow/go/v14 => github.com/cloudquery/arrow/go/v14 v14.0.0-20231009001222-d4016862d2dd +replace github.com/cloudquery/plugin-pb-go => /Users/benbernays/Documents/GitHub/plugin-pb-go require ( github.com/BurntSushi/toml v1.3.2 // indirect diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 1219f75edc..76135b1cf8 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -265,12 +265,23 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) tableName := msg.TableName for i, row := range c.memoryDB[tableName] { isMatch := true - for _, pred := range msg.WhereClause.And { - isMatch = isMatch && evaluatePredicate(pred, row) - } - - for _, pred := range msg.WhereClause.Or { - isMatch = isMatch || evaluatePredicate(pred, row) + // Groups are evaluated as AND + for _, predGroup := range msg.WhereClause { + for _, pred := range predGroup.Predicates { + predResult := evaluatePredicate(pred, row) + if predGroup.GroupingType == "AND" { + isMatch = isMatch && predResult + } else { + if predResult { + isMatch = true + break + } + } + } + // If any single predicate group is false then we can break out of the loop + if !isMatch { + break + } } if !isMatch { diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index a60ec294fa..1cf001f193 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -157,31 +157,24 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { }, } case *message.SyncDeleteRecord: - var whereClause *pb.WhereClause - whereClause.And = make([]*pb.Predicate, len(whereClause.And)) - for j, value := range m.WhereClause.And { - record, err := pb.RecordToBytes(value.Record) - if err != nil { - return status.Errorf(codes.Internal, "failed to encode record: %v", err) - } - whereClause.And[j] = &pb.Predicate{ - Record: record, - Column: value.Column, - Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), - } - } - whereClause.Or = make([]*pb.Predicate, len(whereClause.Or)) - for j, value := range m.WhereClause.Or { - record, err := pb.RecordToBytes(value.Record) - if err != nil { - return status.Errorf(codes.Internal, "failed to encode record: %v", err) - } - whereClause.Or[j] = &pb.Predicate{ - Record: record, - Column: value.Column, - Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[value.Operator]), + whereClause := make([]*pb.PredicatesGroup, len(m.WhereClause)) + for j, predicateGroup := range m.WhereClause { + whereClause[j] = &pb.PredicatesGroup{ + GroupingType: pb.PredicatesGroup_GroupingType(pb.PredicatesGroup_GroupingType_value[predicateGroup.GroupingType]), + Predicates: make([]*pb.Predicate, len(predicateGroup.Predicates)), } + for i, predicate := range predicateGroup.Predicates { + record, err := pb.RecordToBytes(predicate.Record) + if err != nil { + return status.Errorf(codes.Internal, "failed to encode record: %v", err) + } + whereClause[j].Predicates[i] = &pb.Predicate{ + Record: record, + Column: predicate.Column, + Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[predicate.Operator]), + } + } } tableRelations := make([]*pb.TableRelation, len(m.TableRelations)) @@ -274,32 +267,21 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error { } case *pb.Write_Request_DeleteRecord: - var whereClause message.WhereClause + whereClause := make(message.PredicateGroups, len(pbMsg.DeleteRecord.WhereClause)) - whereClause.And = make([]message.Predicate, len(whereClause.And)) - for j, value := range pbMsg.DeleteRecord.WhereClause.And { - record, err := pb.NewRecordFromBytes(value.Record) - if err != nil { - pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) - break - } - whereClause.And[j] = message.Predicate{ - Record: record, - Column: value.Column, - Operator: value.Operator.String(), - } - } - whereClause.Or = make([]message.Predicate, len(whereClause.Or)) - for j, value := range pbMsg.DeleteRecord.WhereClause.Or { - record, err := pb.NewRecordFromBytes(value.Record) - if err != nil { - pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) - break - } - whereClause.Or[j] = message.Predicate{ - Record: record, - Column: value.Column, - Operator: value.Operator.String(), + for j, predicateGroup := range pbMsg.DeleteRecord.WhereClause { + whereClause[j].Predicates = make(message.Predicates, len(predicateGroup.Predicates)) + for i, predicate := range predicateGroup.Predicates { + record, err := pb.NewRecordFromBytes(predicate.Record) + if err != nil { + pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err) + break + } + whereClause[j].Predicates[i] = message.Predicate{ + Record: record, + Column: predicate.Column, + Operator: predicate.Operator.String(), + } } } diff --git a/message/write_message.go b/message/write_message.go index ca0de24ef8..0e8fbad72a 100644 --- a/message/write_message.go +++ b/message/write_message.go @@ -134,22 +134,28 @@ type TableRelation struct { ParentTable string } +type TableRelations []TableRelation + type Predicate struct { Operator string Column string Record arrow.Record } -type WhereClause struct { - And []Predicate - Or []Predicate + +type Predicates []Predicate + +type PredicateGroup struct { + // This will be AND or OR + GroupingType string + Predicates Predicates } -type TableRelations []TableRelation +type PredicateGroups []PredicateGroup type DeleteRecord struct { TableName string TableRelations TableRelations - WhereClause WhereClause + WhereClause PredicateGroups SyncTime time.Time } diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index 9205741115..5079b16663 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -184,12 +184,15 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ TableName: table.Name, - WhereClause: message.WhereClause{ - And: []message.Predicate{ - { - Operator: "eq", - Column: "id", - Record: deleteValue, + WhereClause: message.PredicateGroups{ + { + GroupingType: "AND", + Predicates: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: deleteValue, + }, }, }, }, @@ -214,12 +217,15 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{ DeleteRecord: message.DeleteRecord{ TableName: table.Name, - WhereClause: message.WhereClause{ - And: []message.Predicate{ - { - Operator: "eq", - Column: "id", - Record: deleteValue, + WhereClause: message.PredicateGroups{ + { + GroupingType: "AND", + Predicates: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: deleteValue, + }, }, }, }, From 03ca40ba31ec95601f6060a101c7517c39328e2b Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 12 Oct 2023 11:49:47 -0500 Subject: [PATCH 12/15] deps --- examples/simple_plugin/go.mod | 1 - go.mod | 5 ++--- go.sum | 8 ++++---- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index cc27fc159a..a1a0927937 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -11,7 +11,6 @@ require ( replace github.com/cloudquery/plugin-sdk/v4 => ../../ replace github.com/apache/arrow/go/v14 => github.com/cloudquery/arrow/go/v14 v14.0.0-20231009001222-d4016862d2dd -replace github.com/cloudquery/plugin-pb-go => /Users/benbernays/Documents/GitHub/plugin-pb-go require ( github.com/BurntSushi/toml v1.3.2 // indirect diff --git a/go.mod b/go.mod index dda3f9a6fa..9efb19f20e 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,11 @@ module github.com/cloudquery/plugin-sdk/v4 go 1.21.1 -replace github.com/cloudquery/plugin-pb-go => /Users/benbernays/Documents/GitHub/plugin-pb-go require ( github.com/apache/arrow/go/v14 v14.0.0-20230929201650-00efb06dc0de github.com/bradleyjkemp/cupaloy/v2 v2.8.0 - github.com/cloudquery/cloudquery-api-go v1.2.6 - github.com/cloudquery/plugin-pb-go v1.12.1 + github.com/cloudquery/cloudquery-api-go v1.2.8 + github.com/cloudquery/plugin-pb-go v1.12.3-0.20231012162621-df16384481c5 github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/getsentry/sentry-go v0.24.1 github.com/goccy/go-json v0.10.2 diff --git a/go.sum b/go.sum index 3462074e5c..fb8f43c0a4 100644 --- a/go.sum +++ b/go.sum @@ -86,10 +86,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudquery/arrow/go/v14 v14.0.0-20231009001222-d4016862d2dd h1:w0lNQ0/+my9RqY7kA26MTeD8sHXYc3epsxdCaXza6pc= github.com/cloudquery/arrow/go/v14 v14.0.0-20231009001222-d4016862d2dd/go.mod h1:/SqmdO2dsWqFHqQQeupnsr0ollL8C91n3x0I72rArY8= -github.com/cloudquery/cloudquery-api-go v1.2.6 h1:eq3qPgz813NDdCpipxdo33ko+kiqHld3+x6X2W1lVH4= -github.com/cloudquery/cloudquery-api-go v1.2.6/go.mod h1:oyNUZZ6CKjPapxMbmE/qR2vdo9/G+tuCdF7uXT1LYuM= -github.com/cloudquery/plugin-pb-go v1.12.1 h1:lxCe/ovcbmY4i0N5ko1fH5PIllosIwbXTvrRqZ/UBeg= -github.com/cloudquery/plugin-pb-go v1.12.1/go.mod h1:+20GGdx/k9ApU56ix5QrPIGfzn0WA7VJlExgaHnNGzc= +github.com/cloudquery/cloudquery-api-go v1.2.8 h1:kzTuHxA/CwNFiCeg+rUZN+pe7lIi4w6FgbG2T+abGlI= +github.com/cloudquery/cloudquery-api-go v1.2.8/go.mod h1:oyNUZZ6CKjPapxMbmE/qR2vdo9/G+tuCdF7uXT1LYuM= +github.com/cloudquery/plugin-pb-go v1.12.3-0.20231012162621-df16384481c5 h1:Zkq0O54ek3WA5Ygkxr5zEf16AqFcQSvfBxPlbSfAq5Y= +github.com/cloudquery/plugin-pb-go v1.12.3-0.20231012162621-df16384481c5/go.mod h1:CYorX3zCHF9ByoOgdBOuwLX/2vVCDH6/FoREOE3oH+w= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= From 3928a69f2ecb379d2957071e80af26a78142a012 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 12 Oct 2023 12:07:47 -0500 Subject: [PATCH 13/15] Update go.mod --- examples/simple_plugin/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index cc1c2c0221..a1d1b871b0 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -29,7 +29,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/cloudquery/cloudquery-api-go v1.2.8 // indirect - github.com/cloudquery/plugin-pb-go v1.12.2 // indirect + github.com/cloudquery/plugin-pb-go v1.12.3-0.20231012162621-df16384481c5 // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deepmap/oapi-codegen v1.15.0 // indirect From b119c14280328732fb4b153757fad5f22132b234 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 12 Oct 2023 12:07:49 -0500 Subject: [PATCH 14/15] Update go.sum --- examples/simple_plugin/go.sum | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index ce17e547e8..f5b9725be5 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -88,8 +88,8 @@ github.com/cloudquery/arrow/go/v14 v14.0.0-20231009001222-d4016862d2dd h1:w0lNQ0 github.com/cloudquery/arrow/go/v14 v14.0.0-20231009001222-d4016862d2dd/go.mod h1:/SqmdO2dsWqFHqQQeupnsr0ollL8C91n3x0I72rArY8= github.com/cloudquery/cloudquery-api-go v1.2.8 h1:kzTuHxA/CwNFiCeg+rUZN+pe7lIi4w6FgbG2T+abGlI= github.com/cloudquery/cloudquery-api-go v1.2.8/go.mod h1:oyNUZZ6CKjPapxMbmE/qR2vdo9/G+tuCdF7uXT1LYuM= -github.com/cloudquery/plugin-pb-go v1.12.2 h1:zDu+ClcuPLDcsyjiosyGpCADipPLhXNrc/sEC1WvWAU= -github.com/cloudquery/plugin-pb-go v1.12.2/go.mod h1:CYorX3zCHF9ByoOgdBOuwLX/2vVCDH6/FoREOE3oH+w= +github.com/cloudquery/plugin-pb-go v1.12.3-0.20231012162621-df16384481c5 h1:Zkq0O54ek3WA5Ygkxr5zEf16AqFcQSvfBxPlbSfAq5Y= +github.com/cloudquery/plugin-pb-go v1.12.3-0.20231012162621-df16384481c5/go.mod h1:CYorX3zCHF9ByoOgdBOuwLX/2vVCDH6/FoREOE3oH+w= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= From b983021a65b6f647222c0e0182f33d67d51d5a00 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 12 Oct 2023 12:10:29 -0500 Subject: [PATCH 15/15] make lint --- internal/memdb/memdb.go | 9 +++------ plugin/testing_write_delete.go | 1 - 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 76135b1cf8..479596baae 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -271,11 +271,9 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) predResult := evaluatePredicate(pred, row) if predGroup.GroupingType == "AND" { isMatch = isMatch && predResult - } else { - if predResult { - isMatch = true - break - } + } else if predResult { + isMatch = true + break } } // If any single predicate group is false then we can break out of the loop @@ -287,7 +285,6 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) if !isMatch { filteredTable = append(filteredTable, c.memoryDB[tableName][i]) } - } c.memoryDB[tableName] = filteredTable } diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index 5079b16663..6709c16f72 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -235,7 +235,6 @@ func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) { records, err = s.plugin.readAll(ctx, table) require.NoErrorf(s.t, err, "failed to read after delete with match") require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete with match") - } func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) {