Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloudquery/destination/v0/destination.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message Migrate {
string name = 1;
string version = 2;
bytes tables = 3;
bytes options = 4;
}
message Response {}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *client) overwrite(table *schema.Table, data []any) {
c.memoryDB[table.Name] = append(c.memoryDB[table.Name], data)
}

func (c *client) Migrate(_ context.Context, tables schema.Tables) error {
func (c *client) Migrate(_ context.Context, tables schema.Tables, _ destination.MigrateOptions) error {
for _, table := range tables {
if c.memoryDB[table.Name] == nil {
c.memoryDB[table.Name] = make([][]any, 0)
Expand Down
202 changes: 106 additions & 96 deletions internal/pb/destination/v0/destination.pb.go

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion internal/servers/destination/v0/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr
if err := json.Unmarshal(req.Tables, &tables); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err)
}
return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables)
var options destination.MigrateOptions
if len(req.Options) > 0 {
if err := json.Unmarshal(req.Options, &options); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal options: %v", err)
}
}

return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables, options)
}

func (*Server) Write(pb.Destination_WriteServer) error {
Expand Down
10 changes: 7 additions & 3 deletions plugins/destination/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@ func (*UnimplementedUnmanagedWriter) Metrics() Metrics {
panic("Metrics not implemented")
}

type MigrateOptions struct {
Force bool
}

type Client interface {
schema.CQTypeTransformer
ReverseTransformValues(table *schema.Table, values []any) (schema.CQTypes, error)
Migrate(ctx context.Context, tables schema.Tables) error
Migrate(ctx context.Context, tables schema.Tables, options MigrateOptions) error
Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error
ManagedWriter
UnmanagedWriter
Expand Down Expand Up @@ -184,9 +188,9 @@ func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Des
}

// we implement all DestinationClient functions so we can hook into pre-post behavior
func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error {
func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables, options MigrateOptions) error {
SetDestinationManagedCqColumns(tables)
return p.client.Migrate(ctx, tables)
return p.client.Migrate(ctx, tables, options)
}

func (p *Plugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) {
Expand Down
18 changes: 11 additions & 7 deletions plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type PluginTestSuiteTests struct {
}

func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
var options MigrateOptions
spec.WriteMode = specs.WriteModeOverwrite
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
Expand All @@ -61,7 +62,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
tables := []*schema.Table{
table,
}
if err := p.Migrate(ctx, tables); err != nil {
if err := p.Migrate(ctx, tables, options); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

Expand Down Expand Up @@ -131,6 +132,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
}

func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
var options MigrateOptions
spec.WriteMode = specs.WriteModeOverwriteDeleteStale
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
Expand All @@ -144,7 +146,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
table,
incTable,
}
if err := p.Migrate(ctx, tables); err != nil {
if err := p.Migrate(ctx, tables, options); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

Expand Down Expand Up @@ -243,6 +245,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
}

func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
var options MigrateOptions
spec.WriteMode = specs.WriteModeAppend
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
Expand All @@ -253,7 +256,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
tables := []*schema.Table{
table,
}
if err := p.Migrate(ctx, tables); err != nil {
if err := p.Migrate(ctx, tables, options); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

Expand Down Expand Up @@ -315,13 +318,14 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
) error {
spec.WriteMode = mode
spec.BatchSize = 1
var options MigrateOptions
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
suffix := strings.ToLower(strings.ReplaceAll(mode.String(), "-", "_"))
tableName := "cq_test_migrate_" + suffix
table := testdata.TestTable(tableName)
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
if err := p.Migrate(ctx, []*schema.Table{table}, options); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

Expand All @@ -339,7 +343,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
a := table.Columns.Index("uuid")
b := table.Columns.Index("float")
table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a]
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
if err := p.Migrate(ctx, []*schema.Table{table}, options); err != nil {
return fmt.Errorf("failed to migrate table with changed column ordering: %w", err)
}
resource2 := createTestResources(table, sourceName, syncTime, 1)[0]
Expand All @@ -360,7 +364,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
Name: "new_column",
Type: schema.TypeInt,
})
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
if err := p.Migrate(ctx, []*schema.Table{table}, options); err != nil {
return fmt.Errorf("failed to migrate table with new column: %w", err)
}
resource3 := createTestResources(table, sourceName, syncTime, 1)[0]
Expand All @@ -378,7 +382,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
// check that migration still succeeds when there is an extra column in the destination table,
// which should be ignored
oldTable := testdata.TestTable(tableName)
if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil {
if err := p.Migrate(ctx, []*schema.Table{oldTable}, options); err != nil {
return fmt.Errorf("failed to migrate table with extra column in destination: %w", err)
}
resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0]
Expand Down
1 change: 1 addition & 0 deletions plugins/source/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (p *Plugin) Sync(ctx context.Context, res chan<- *schema.Resource) error {
}

startTime := time.Now()

switch p.spec.Scheduler {
case specs.SchedulerDFS:
p.syncDfs(ctx, p.spec, p.client, p.sessionTables, res)
Expand Down
1 change: 1 addition & 0 deletions schema/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ColumnResolver func(ctx context.Context, meta ClientMeta, resource *Resourc
// ColumnCreationOptions allow modification of how column is defined when table is created
type ColumnCreationOptions struct {
PrimaryKey bool `json:"primary_key,omitempty"`
NotNull bool `json:"not_null,omitempty"`
// IncrementalKey is a flag that indicates if the column is used as part of an incremental key.
// It is mainly used for documentation purposes, but may also be used as part of ensuring that
// migrations are done correctly.
Expand Down
51 changes: 51 additions & 0 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Table struct {

// Parent is the parent table in case this table is called via parent table (i.e. relation)
Parent *Table `json:"-"`

PkConstraintName string `json:"pk_constraint_name"`
}

var (
Expand Down Expand Up @@ -246,6 +248,55 @@ func (t *Table) ValidateName() error {
return nil
}

// GetAddedColumns returns a list of columns that are in this table but not in the other table.
func (t *Table) GetAddedColumns(other *Table) []Column {
var added []Column
for _, c := range t.Columns {
if other.Columns.Get(c.Name) == nil {
added = append(added, c)
}
}
return added
}

// GetChangedColumns returns a list of columns that are in this table but have different type in the other table.
func (t *Table) GetChangedColumns(other *Table) []Column {
var changed []Column
for _, c := range t.Columns {
otherCol := other.Columns.Get(c.Name)
if otherCol == nil {
continue
}
if c.Type != otherCol.Type {
changed = append(changed, c)
}
if c.CreationOptions.NotNull != otherCol.CreationOptions.NotNull {
changed = append(changed, c)
}
}
return changed
}

func (t *Table) IsPrimaryKeyEqual(other *Table) bool {
for _, c := range t.Columns {
if c.CreationOptions.PrimaryKey {
otherCol := other.Columns.Get(c.Name)
if otherCol == nil || !otherCol.CreationOptions.PrimaryKey {
return false
}
}
}
for _, c := range other.Columns {
if c.CreationOptions.PrimaryKey {
otherCol := t.Columns.Get(c.Name)
if otherCol == nil || !otherCol.CreationOptions.PrimaryKey {
return false
}
}
}
return true
}

func (t *Table) ValidateDuplicateColumns() error {
columns := make(map[string]bool, len(t.Columns))
for _, c := range t.Columns {
Expand Down
76 changes: 76 additions & 0 deletions schema/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,79 @@ func TestTablesFilterDFS(t *testing.T) {
})
}
}

var testTable1 = &Table{
Name: "test",
Columns: []Column{
{Name: "bool", Type: TypeBool},
},
}

var testTable2 = &Table{
Name: "test",
Columns: []Column{
{Name: "bool", Type: TypeBool},
{Name: "bool1", Type: TypeBool},
},
}

var testTable3 = &Table{
Name: "test",
Columns: []Column{
{Name: "bool", Type: TypeString},
},
}

var testTable4 = &Table{
Name: "test",
Columns: []Column{
{Name: "bool", Type: TypeBool, CreationOptions: ColumnCreationOptions{PrimaryKey: true, NotNull: true}},
},
}

func TestGetAddedColumns(t *testing.T) {
columns := testTable1.GetAddedColumns(testTable1)
if columns != nil {
t.Fatalf("got %v want nil", columns)
}

columns = testTable2.GetAddedColumns(testTable1)
if len(columns) != 1 {
t.Fatalf("got %v want 1", columns)
}
if columns[0].Name != "bool1" {
t.Fatalf("got %v want bool1", columns[0].Name)
}
}

func TestGetChangedColumns(t *testing.T) {
columns := testTable1.GetChangedColumns(testTable1)
if columns != nil {
t.Fatalf("got %v want nil", columns)
}

columns = testTable3.GetChangedColumns(testTable2)
if len(columns) != 1 {
t.Fatalf("got %v want 1", columns)
}
if columns[0].Name != "bool" {
t.Fatalf("got %v want bool", columns[0].Name)
}

columns = testTable4.GetChangedColumns(testTable2)
if len(columns) != 1 {
t.Fatalf("got %v want 1", columns)
}
if columns[0].Name != "bool" {
t.Fatalf("got %v want bool", columns[0].Name)
}
}

func TestIsPkEqual(t *testing.T) {
if !testTable1.IsPrimaryKeyEqual(testTable1) {
t.Fatalf("got false want true")
}
if testTable4.IsPrimaryKeyEqual(testTable2) {
t.Fatalf("got true want false")
}
}