diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 13543e84..88ab4908 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -288,3 +288,52 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name) } } + +func TestBigQueryDialect_BuildMergeQueries_TempTable(t *testing.T) { + var cols columns.Columns + cols.AddColumn(columns.NewColumn("order_id", typing.Integer)) + cols.AddColumn(columns.NewColumn("name", typing.String)) + cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("customers.orders") + + statements, err := BigQueryDialect{}.BuildMergeQueries( + fakeTableID, + "customers.orders_tmp", + "", + []columns.Column{columns.NewColumn("order_id", typing.Invalid)}, + nil, + cols.ValidColumns(), + false, + nil, + ) + assert.NoError(t, err) + assert.Len(t, statements, 1) + assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON c.`order_id` = cc.`order_id`") +} + +func TestBigQueryDialect_BuildMergeQueries_JSONKey(t *testing.T) { + orderOIDCol := columns.NewColumn("order_oid", typing.Struct) + var cols columns.Columns + cols.AddColumn(orderOIDCol) + cols.AddColumn(columns.NewColumn("name", typing.String)) + cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("customers.orders") + + statements, err := BigQueryDialect{}.BuildMergeQueries( + fakeTableID, + "customers.orders_tmp", + "", + []columns.Column{orderOIDCol}, + nil, + cols.ValidColumns(), + false, + nil, + ) + assert.Len(t, statements, 1) + assert.NoError(t, err) + assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON TO_JSON_STRING(c.`order_oid`) = TO_JSON_STRING(cc.`order_oid`)") +} diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index 771ddb66..a89f7e17 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -362,3 +362,293 @@ func TestRedshiftDialect_BuildMergeDeleteQuery(t *testing.T) { ), ) } + +type result struct { + PrimaryKeys []columns.Column + Columns []columns.Column +} + +// getBasicColumnsForTest - will return you all the columns within `result` that are needed for tests. +// * In here, we'll return if compositeKey=false - id (pk), email, first_name, last_name, created_at, toast_text (TOAST-able) +// * Else if compositeKey=true - id(pk), email (pk), first_name, last_name, created_at, toast_text (TOAST-able) +func getBasicColumnsForTest(compositeKey bool) result { + idCol := columns.NewColumn("id", typing.Float) + emailCol := columns.NewColumn("email", typing.String) + textToastCol := columns.NewColumn("toast_text", typing.String) + textToastCol.ToastColumn = true + + var cols columns.Columns + cols.AddColumn(idCol) + cols.AddColumn(emailCol) + cols.AddColumn(columns.NewColumn("first_name", typing.String)) + cols.AddColumn(columns.NewColumn("last_name", typing.String)) + cols.AddColumn(columns.NewColumn("created_at", typing.ETime)) + cols.AddColumn(textToastCol) + cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + var pks []columns.Column + pks = append(pks, idCol) + + if compositeKey { + pks = append(pks, emailCol) + } + + return result{ + PrimaryKeys: pks, + Columns: cols.ValidColumns(), + } +} + +func TestRedshiftDialect_BuildMergeQueries_SkipDelete(t *testing.T) { + // Biggest difference with this test are: + // 1. We are not saving `__artie_deleted` column + // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(false) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("public.tableName") + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "", + res.PrimaryKeys, + nil, + res.Columns, + false, + ptr.ToBool(false), + ) + assert.NoError(t, err) + assert.Equal(t, 2, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`, + parts[1]) +} + +func TestRedshiftDialect_BuildMergeQueries_SoftDelete(t *testing.T) { + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(false) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("public.tableName") + + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "", + res.PrimaryKeys, + nil, + res.Columns, + true, + ptr.ToBool(false), + ) + assert.NoError(t, err) + assert.Equal(t, 2, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, + parts[0]) + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id";`, + parts[1]) + } + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "created_at", + res.PrimaryKeys, + nil, + res.Columns, + true, + ptr.ToBool(false), + ) + assert.NoError(t, err) + + // Parts[0] for insertion should be identical + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, + parts[0]) + // Parts[1] where we're doing UPDATES will have idempotency key. + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at;`, + parts[1]) + } +} + +func TestRedshiftDialect_BuildMergeQueries_SoftDeleteComposite(t *testing.T) { + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(true) + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("public.tableName") + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "", + res.PrimaryKeys, + nil, + res.Columns, + true, + ptr.ToBool(false), + ) + assert.NoError(t, err) + assert.Equal(t, 2, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, + parts[0]) + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email";`, + parts[1]) + } + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "created_at", + res.PrimaryKeys, + nil, + res.Columns, + true, + ptr.ToBool(false), + ) + assert.NoError(t, err) + + // Parts[0] for insertion should be identical + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, + parts[0]) + // Parts[1] where we're doing UPDATES will have idempotency key. + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND cc.created_at >= c.created_at;`, + parts[1]) + } +} + +func TestRedshiftDialect_BuildMergeQueries(t *testing.T) { + // Biggest difference with this test are: + // 1. We are not saving `__artie_deleted` column + // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(false) + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("public.tableName") + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "", + res.PrimaryKeys, + nil, + res.Columns, + false, + ptr.ToBool(true), + ) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, + parts[2]) + } + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "created_at", + res.PrimaryKeys, + nil, + res.Columns, + false, + ptr.ToBool(true), + ) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, + parts[2]) + } +} + +func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) { + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(true) + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("public.tableName") + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "", + res.PrimaryKeys, + nil, + res.Columns, + false, + ptr.ToBool(true), + ) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND COALESCE(cc."__artie_delete", false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, + parts[2]) + } + { + parts, err := RedshiftDialect{}.BuildMergeQueries( + fakeTableID, + tempTableName, + "created_at", + res.PrimaryKeys, + nil, + res.Columns, + false, + ptr.ToBool(true), + ) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, + parts[2]) + } +} diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 9994e89d..da075366 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -2,7 +2,9 @@ package dialect import ( "fmt" + "strings" "testing" + "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" @@ -283,3 +285,257 @@ func TestBuildColumnsUpdateFragment(t *testing.T) { actualQuery := sql.BuildColumnsUpdateFragment(stringAndToastCols, SnowflakeDialect{}) assert.Equal(t, `"FOO"= CASE WHEN COALESCE(cc."FOO" != '__debezium_unavailable_value', true) THEN cc."FOO" ELSE c."FOO" END,"BAR"=cc."BAR"`, actualQuery) } + +func TestSnowflakeDialect_BuildMergeQueries_SoftDelete(t *testing.T) { + // No idempotent key + fqTable := "database.schema.table" + cols := []string{ + "id", + "bar", + "updated_at", + constants.DeleteColumnMarker, + } + + tableValues := []string{ + fmt.Sprintf("('%s', '%s', '%v', false)", "1", "456", time.Now().Round(0).Format(time.RFC3339)), + fmt.Sprintf("('%s', '%s', '%v', true)", "2", "bb", time.Now().Round(0).Format(time.RFC3339)), // Delete row 2. + fmt.Sprintf("('%s', '%s', '%v', false)", "3", "dd", time.Now().Round(0).Format(time.RFC3339)), + } + + // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); + subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", + strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) + + var _cols columns.Columns + _cols.AddColumn(columns.NewColumn("id", typing.String)) + _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + for _, idempotentKey := range []string{"", "updated_at"} { + statements, err := SnowflakeDialect{}.BuildMergeQueries( + fakeTableID, + subQuery, + idempotentKey, + []columns.Column{columns.NewColumn("id", typing.Invalid)}, + nil, + _cols.ValidColumns(), + true, + nil, + ) + assert.Len(t, statements, 1) + mergeSQL := statements[0] + assert.NoError(t, err) + assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) + // Soft deletion flag being passed. + assert.Contains(t, mergeSQL, `"__ARTIE_DELETE"=cc."__ARTIE_DELETE"`, mergeSQL) + + assert.Equal(t, len(idempotentKey) > 0, strings.Contains(mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"))) + } +} + +func TestSnowflakeDialect_BuildMergeQueries(t *testing.T) { + // No idempotent key + fqTable := "database.schema.table" + colToTypes := map[string]typing.KindDetails{ + "id": typing.String, + "bar": typing.String, + "updated_at": typing.String, + "start": typing.String, + constants.DeleteColumnMarker: typing.Boolean, + } + + // This feels a bit round about, but this is because iterating over a map is not deterministic. + cols := []string{"id", "bar", "updated_at", "start", constants.DeleteColumnMarker} + var _cols columns.Columns + for _, col := range cols { + _cols.AddColumn(columns.NewColumn(col, colToTypes[col])) + } + + tableValues := []string{ + fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", time.Now().Round(0).UTC()), + } + + // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); + subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", + strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + statements, err := SnowflakeDialect{}.BuildMergeQueries( + fakeTableID, + subQuery, + "", + []columns.Column{columns.NewColumn("id", typing.Invalid)}, + nil, + _cols.ValidColumns(), + false, + nil, + ) + assert.Len(t, statements, 1) + mergeSQL := statements[0] + assert.NoError(t, err) + assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) + assert.NotContains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", `"UPDATED_AT"`, `"UPDATED_AT"`), fmt.Sprintf("Idempotency key: %s", mergeSQL)) + // Check primary keys clause + assert.Contains(t, mergeSQL, `AS cc ON c."ID" = cc."ID"`, mergeSQL) + + // Check setting for update + assert.Contains(t, mergeSQL, `SET "ID"=cc."ID","BAR"=cc."BAR","UPDATED_AT"=cc."UPDATED_AT","START"=cc."START"`, mergeSQL) + // Check for INSERT + assert.Contains(t, mergeSQL, `"ID","BAR","UPDATED_AT","START"`, mergeSQL) + assert.Contains(t, mergeSQL, `cc."ID",cc."BAR",cc."UPDATED_AT",cc."START"`, mergeSQL) +} + +func TestSnowflakeDialect_BuildMergeQueries_IdempotentKey(t *testing.T) { + fqTable := "database.schema.table" + cols := []string{ + "id", + "bar", + "updated_at", + constants.DeleteColumnMarker, + } + + tableValues := []string{ + fmt.Sprintf("('%s', '%s', '%v', false)", "1", "456", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%v', false)", "2", "bb", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%v', false)", "3", "dd", time.Now().Round(0).UTC()), + } + + // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); + subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", + strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) + + var _cols columns.Columns + _cols.AddColumn(columns.NewColumn("id", typing.String)) + _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + statements, err := SnowflakeDialect{}.BuildMergeQueries( + fakeTableID, + subQuery, + "updated_at", + []columns.Column{columns.NewColumn("id", typing.Invalid)}, + nil, + _cols.ValidColumns(), + false, + nil, + ) + assert.Len(t, statements, 1) + mergeSQL := statements[0] + assert.NoError(t, err) + assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) + assert.Contains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"), fmt.Sprintf("Idempotency key: %s", mergeSQL)) +} + +func TestSnowflakeDialect_BuildMergeQueries_CompositeKey(t *testing.T) { + fqTable := "database.schema.table" + cols := []string{ + "id", + "another_id", + "bar", + "updated_at", + constants.DeleteColumnMarker, + } + + tableValues := []string{ + fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "1", "3", "456", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "2", "2", "bb", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "3", "1", "dd", time.Now().Round(0).UTC()), + } + + // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); + subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", + strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) + + var _cols columns.Columns + _cols.AddColumn(columns.NewColumn("id", typing.String)) + _cols.AddColumn(columns.NewColumn("another_id", typing.String)) + _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + statements, err := SnowflakeDialect{}.BuildMergeQueries( + fakeTableID, + subQuery, + "updated_at", + []columns.Column{ + columns.NewColumn("id", typing.Invalid), + columns.NewColumn("another_id", typing.Invalid), + }, + nil, + _cols.ValidColumns(), + false, + nil, + ) + assert.Len(t, statements, 1) + mergeSQL := statements[0] + assert.NoError(t, err) + assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) + assert.Contains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"), fmt.Sprintf("Idempotency key: %s", mergeSQL)) + assert.Contains(t, mergeSQL, `cc ON c."ID" = cc."ID" and c."ANOTHER_ID" = cc."ANOTHER_ID"`, mergeSQL) +} + +func TestSnowflakeDialect_BuildMergeQueries_EscapePrimaryKeys(t *testing.T) { + // No idempotent key + fqTable := "database.schema.table" + colToTypes := map[string]typing.KindDetails{ + "id": typing.String, + "group": typing.String, + "updated_at": typing.String, + "start": typing.String, + constants.DeleteColumnMarker: typing.Boolean, + } + + // This feels a bit round about, but this is because iterating over a map is not deterministic. + cols := []string{"id", "group", "updated_at", "start", constants.DeleteColumnMarker} + var _cols columns.Columns + for _, col := range cols { + _cols.AddColumn(columns.NewColumn(col, colToTypes[col])) + } + + tableValues := []string{ + fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", time.Now().Round(0).UTC()), + fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", time.Now().Round(0).UTC()), + } + + // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); + subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", + strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) + + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns(fqTable) + + statements, err := SnowflakeDialect{}.BuildMergeQueries( + fakeTableID, + subQuery, + "", + []columns.Column{ + columns.NewColumn("id", typing.Invalid), + columns.NewColumn("group", typing.Invalid), + }, + nil, + _cols.ValidColumns(), + false, + nil, + ) + assert.Len(t, statements, 1) + mergeSQL := statements[0] + assert.NoError(t, err) + assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) + assert.NotContains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", `"UPDATED_AT"`, `"UPDATED_AT"`), fmt.Sprintf("Idempotency key: %s", mergeSQL)) + // Check primary keys clause + assert.Contains(t, mergeSQL, `AS cc ON c."ID" = cc."ID" and c."GROUP" = cc."GROUP"`, mergeSQL) + // Check setting for update + assert.Contains(t, mergeSQL, `SET "ID"=cc."ID","GROUP"=cc."GROUP","UPDATED_AT"=cc."UPDATED_AT","START"=cc."START"`, mergeSQL) + // Check for INSERT + assert.Contains(t, mergeSQL, `"ID","GROUP","UPDATED_AT","START"`, mergeSQL) + assert.Contains(t, mergeSQL, `cc."ID",cc."GROUP",cc."UPDATED_AT",cc."START"`, mergeSQL) +} diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go deleted file mode 100644 index 1f4822f8..00000000 --- a/lib/destination/dml/merge_bigquery_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package dml - -import ( - "testing" - - bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/stretchr/testify/assert" -) - -func TestMergeStatement_TempTable(t *testing.T) { - var cols columns.Columns - cols.AddColumn(columns.NewColumn("order_id", typing.Integer)) - cols.AddColumn(columns.NewColumn("name", typing.String)) - cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("customers.orders") - - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)}, - Columns: cols.ValidColumns(), - Dialect: bigQueryDialect.BigQueryDialect{}, - SoftDelete: false, - } - - statements, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Len(t, statements, 1) - assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON c.`order_id` = cc.`order_id`") -} - -func TestMergeStatement_JSONKey(t *testing.T) { - orderOIDCol := columns.NewColumn("order_oid", typing.Struct) - var cols columns.Columns - cols.AddColumn(orderOIDCol) - cols.AddColumn(columns.NewColumn("name", typing.String)) - cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("customers.orders") - - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: "customers.orders_tmp", - PrimaryKeys: []columns.Column{orderOIDCol}, - Columns: cols.ValidColumns(), - Dialect: bigQueryDialect.BigQueryDialect{}, - SoftDelete: false, - } - - statements, err := mergeArg.BuildStatements() - assert.Len(t, statements, 1) - assert.NoError(t, err) - assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON TO_JSON_STRING(c.`order_oid`) = TO_JSON_STRING(cc.`order_oid`)") -} - -func TestMergeArgument_BuildStatements_BigQuery(t *testing.T) { - orderOIDCol := columns.NewColumn("order_oid", typing.Struct) - var cols columns.Columns - cols.AddColumn(orderOIDCol) - cols.AddColumn(columns.NewColumn("name", typing.String)) - cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - mergeArg := &MergeArgument{ - TableID: &mocks.FakeTableIdentifier{}, - SubQuery: "{SUB_QUERY}", - PrimaryKeys: []columns.Column{orderOIDCol}, - Columns: cols.ValidColumns(), - Dialect: bigQueryDialect.BigQueryDialect{}, - SoftDelete: false, - } - - statements1, err := mergeArg.BuildStatements() - assert.Len(t, statements1, 1) - assert.NoError(t, err) - statements2, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, statements1, statements2) -} diff --git a/lib/destination/dml/merge_redshift_test.go b/lib/destination/dml/merge_redshift_test.go deleted file mode 100644 index 8187489d..00000000 --- a/lib/destination/dml/merge_redshift_test.go +++ /dev/null @@ -1,307 +0,0 @@ -package dml - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/artie-labs/transfer/clients/redshift/dialect" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" -) - -type result struct { - PrimaryKeys []columns.Column - Columns []columns.Column -} - -// getBasicColumnsForTest - will return you all the columns within `result` that are needed for tests. -// * In here, we'll return if compositeKey=false - id (pk), email, first_name, last_name, created_at, toast_text (TOAST-able) -// * Else if compositeKey=true - id(pk), email (pk), first_name, last_name, created_at, toast_text (TOAST-able) -func getBasicColumnsForTest(compositeKey bool) result { - idCol := columns.NewColumn("id", typing.Float) - emailCol := columns.NewColumn("email", typing.String) - textToastCol := columns.NewColumn("toast_text", typing.String) - textToastCol.ToastColumn = true - - var cols columns.Columns - cols.AddColumn(idCol) - cols.AddColumn(emailCol) - cols.AddColumn(columns.NewColumn("first_name", typing.String)) - cols.AddColumn(columns.NewColumn("last_name", typing.String)) - cols.AddColumn(columns.NewColumn("created_at", typing.ETime)) - cols.AddColumn(textToastCol) - cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - var pks []columns.Column - pks = append(pks, idCol) - - if compositeKey { - pks = append(pks, emailCol) - } - - return result{ - PrimaryKeys: pks, - Columns: cols.ValidColumns(), - } -} - -func TestMergeArgument_BuildStatements_Redshift(t *testing.T) { - res := getBasicColumnsForTest(false) - mergeArg := &MergeArgument{ - TableID: &mocks.FakeTableIdentifier{}, - SubQuery: "{SUB_QUERY}", - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - ContainsHardDeletes: ptr.ToBool(true), - } - - statements, err := dialect.RedshiftDialect{}.BuildMergeQueries( - &mocks.FakeTableIdentifier{}, - "{SUB_QUERY}", - "", - res.PrimaryKeys, - nil, - res.Columns, - false, - ptr.ToBool(true), - ) - assert.NoError(t, err) - assert.Equal(t, 3, len(statements)) - statements2, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, statements, statements2) -} - -func TestMergeArgument_BuildRedshiftStatements_SkipDelete(t *testing.T) { - // Biggest difference with this test are: - // 1. We are not saving `__artie_deleted` column - // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) - tempTableName := "public.tableName__temp" - res := getBasicColumnsForTest(false) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("public.tableName") - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - ContainsHardDeletes: ptr.ToBool(false), - } - - parts, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 2, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, - parts[0]) - - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`, - parts[1]) -} - -func TestMergeArgument_BuildRedshiftStatements_SoftDelete(t *testing.T) { - tempTableName := "public.tableName__temp" - res := getBasicColumnsForTest(false) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("public.tableName") - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - SoftDelete: true, - ContainsHardDeletes: ptr.ToBool(false), - } - - parts, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 2, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, - parts[0]) - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id";`, - parts[1]) - - mergeArg.IdempotentKey = "created_at" - parts, err = mergeArg.BuildStatements() - assert.NoError(t, err) - - // Parts[0] for insertion should be identical - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, - parts[0]) - // Parts[1] where we're doing UPDATES will have idempotency key. - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at;`, - parts[1]) -} - -func TestMergeArgument_BuildRedshiftStatements_SoftDeleteComposite(t *testing.T) { - tempTableName := "public.tableName__temp" - res := getBasicColumnsForTest(true) - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("public.tableName") - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - SoftDelete: true, - ContainsHardDeletes: ptr.ToBool(false), - } - - parts, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 2, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, - parts[0]) - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email";`, - parts[1]) - - mergeArg.IdempotentKey = "created_at" - parts, err = mergeArg.BuildStatements() - assert.NoError(t, err) - - // Parts[0] for insertion should be identical - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, - parts[0]) - // Parts[1] where we're doing UPDATES will have idempotency key. - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND cc.created_at >= c.created_at;`, - parts[1]) -} - -func TestMergeArgument_GetRedshiftStatements(t *testing.T) { - // Biggest difference with this test are: - // 1. We are not saving `__artie_deleted` column - // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) - tempTableName := "public.tableName__temp" - res := getBasicColumnsForTest(false) - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("public.tableName") - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - ContainsHardDeletes: ptr.ToBool(true), - } - - parts, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 3, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, - parts[0]) - - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`, - parts[1]) - - assert.Equal(t, - `DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, - parts[2]) - - mergeArg = &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - IdempotentKey: "created_at", - ContainsHardDeletes: ptr.ToBool(true), - } - - parts, err = mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 3, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`, - parts[0]) - - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`, - parts[1]) - - assert.Equal(t, - `DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, - parts[2]) -} - -func TestMergeArgument_BuildRedshiftStatements_CompositeKey(t *testing.T) { - tempTableName := "public.tableName__temp" - res := getBasicColumnsForTest(true) - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns("public.tableName") - mergeArg := &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - ContainsHardDeletes: ptr.ToBool(true), - } - - parts, err := mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 3, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, - parts[0]) - - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND COALESCE(cc."__artie_delete", false) = false;`, - parts[1]) - - assert.Equal(t, - `DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, - parts[2]) - - mergeArg = &MergeArgument{ - TableID: fakeTableID, - SubQuery: tempTableName, - PrimaryKeys: res.PrimaryKeys, - Columns: res.Columns, - Dialect: dialect.RedshiftDialect{}, - ContainsHardDeletes: ptr.ToBool(true), - IdempotentKey: "created_at", - } - - parts, err = mergeArg.BuildStatements() - assert.NoError(t, err) - assert.Equal(t, 3, len(parts)) - - assert.Equal(t, - `INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`, - parts[0]) - - assert.Equal(t, - `UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`, - parts[1]) - - assert.Equal(t, - `DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`, - parts[2]) -} diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go index 4b6cea6d..651b0c88 100644 --- a/lib/destination/dml/merge_test.go +++ b/lib/destination/dml/merge_test.go @@ -1,276 +1,14 @@ package dml import ( - "fmt" - "strings" "testing" - "time" "github.com/stretchr/testify/assert" bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" snowflakeDialect "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" ) -func TestMergeStatementSoftDelete(t *testing.T) { - // No idempotent key - fqTable := "database.schema.table" - cols := []string{ - "id", - "bar", - "updated_at", - constants.DeleteColumnMarker, - } - - tableValues := []string{ - fmt.Sprintf("('%s', '%s', '%v', false)", "1", "456", time.Now().Round(0).Format(time.RFC3339)), - fmt.Sprintf("('%s', '%s', '%v', true)", "2", "bb", time.Now().Round(0).Format(time.RFC3339)), // Delete row 2. - fmt.Sprintf("('%s', '%s', '%v', false)", "3", "dd", time.Now().Round(0).Format(time.RFC3339)), - } - - // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); - subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", - strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - - var _cols columns.Columns - _cols.AddColumn(columns.NewColumn("id", typing.String)) - _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns(fqTable) - for _, idempotentKey := range []string{"", "updated_at"} { - mergeArg := MergeArgument{ - TableID: fakeTableID, - SubQuery: subQuery, - IdempotentKey: idempotentKey, - PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: _cols.ValidColumns(), - Dialect: snowflakeDialect.SnowflakeDialect{}, - SoftDelete: true, - } - - statements, err := mergeArg.BuildStatements() - assert.Len(t, statements, 1) - mergeSQL := statements[0] - assert.NoError(t, err) - assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) - // Soft deletion flag being passed. - assert.Contains(t, mergeSQL, `"__ARTIE_DELETE"=cc."__ARTIE_DELETE"`, mergeSQL) - - assert.Equal(t, len(idempotentKey) > 0, strings.Contains(mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"))) - } -} - -func TestMergeStatement(t *testing.T) { - // No idempotent key - fqTable := "database.schema.table" - colToTypes := map[string]typing.KindDetails{ - "id": typing.String, - "bar": typing.String, - "updated_at": typing.String, - "start": typing.String, - constants.DeleteColumnMarker: typing.Boolean, - } - - // This feels a bit round about, but this is because iterating over a map is not deterministic. - cols := []string{"id", "bar", "updated_at", "start", constants.DeleteColumnMarker} - var _cols columns.Columns - for _, col := range cols { - _cols.AddColumn(columns.NewColumn(col, colToTypes[col])) - } - - tableValues := []string{ - fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", time.Now().Round(0).UTC()), - } - - // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); - subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", - strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns(fqTable) - mergeArg := MergeArgument{ - TableID: fakeTableID, - SubQuery: subQuery, - IdempotentKey: "", - PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: _cols.ValidColumns(), - Dialect: snowflakeDialect.SnowflakeDialect{}, - SoftDelete: false, - } - - statements, err := mergeArg.BuildStatements() - assert.Len(t, statements, 1) - mergeSQL := statements[0] - assert.NoError(t, err) - assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) - assert.NotContains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", `"UPDATED_AT"`, `"UPDATED_AT"`), fmt.Sprintf("Idempotency key: %s", mergeSQL)) - // Check primary keys clause - assert.Contains(t, mergeSQL, `AS cc ON c."ID" = cc."ID"`, mergeSQL) - - // Check setting for update - assert.Contains(t, mergeSQL, `SET "ID"=cc."ID","BAR"=cc."BAR","UPDATED_AT"=cc."UPDATED_AT","START"=cc."START"`, mergeSQL) - // Check for INSERT - assert.Contains(t, mergeSQL, `"ID","BAR","UPDATED_AT","START"`, mergeSQL) - assert.Contains(t, mergeSQL, `cc."ID",cc."BAR",cc."UPDATED_AT",cc."START"`, mergeSQL) -} - -func TestMergeStatementIdempotentKey(t *testing.T) { - fqTable := "database.schema.table" - cols := []string{ - "id", - "bar", - "updated_at", - constants.DeleteColumnMarker, - } - - tableValues := []string{ - fmt.Sprintf("('%s', '%s', '%v', false)", "1", "456", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%v', false)", "2", "bb", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%v', false)", "3", "dd", time.Now().Round(0).UTC()), - } - - // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); - subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", - strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - - var _cols columns.Columns - _cols.AddColumn(columns.NewColumn("id", typing.String)) - _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns(fqTable) - mergeArg := MergeArgument{ - TableID: fakeTableID, - SubQuery: subQuery, - IdempotentKey: "updated_at", - PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)}, - Columns: _cols.ValidColumns(), - Dialect: snowflakeDialect.SnowflakeDialect{}, - SoftDelete: false, - } - - statements, err := mergeArg.BuildStatements() - assert.Len(t, statements, 1) - mergeSQL := statements[0] - assert.NoError(t, err) - assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) - assert.Contains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"), fmt.Sprintf("Idempotency key: %s", mergeSQL)) -} - -func TestMergeStatementCompositeKey(t *testing.T) { - fqTable := "database.schema.table" - cols := []string{ - "id", - "another_id", - "bar", - "updated_at", - constants.DeleteColumnMarker, - } - - tableValues := []string{ - fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "1", "3", "456", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "2", "2", "bb", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%s', '%v', false)", "3", "1", "dd", time.Now().Round(0).UTC()), - } - - // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); - subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", - strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - - var _cols columns.Columns - _cols.AddColumn(columns.NewColumn("id", typing.String)) - _cols.AddColumn(columns.NewColumn("another_id", typing.String)) - _cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns(fqTable) - mergeArg := MergeArgument{ - TableID: fakeTableID, - SubQuery: subQuery, - IdempotentKey: "updated_at", - PrimaryKeys: []columns.Column{ - columns.NewColumn("id", typing.Invalid), - columns.NewColumn("another_id", typing.Invalid), - }, - Columns: _cols.ValidColumns(), - Dialect: snowflakeDialect.SnowflakeDialect{}, - SoftDelete: false, - } - - statements, err := mergeArg.BuildStatements() - assert.Len(t, statements, 1) - mergeSQL := statements[0] - assert.NoError(t, err) - assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) - assert.Contains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", "updated_at", "updated_at"), fmt.Sprintf("Idempotency key: %s", mergeSQL)) - assert.Contains(t, mergeSQL, `cc ON c."ID" = cc."ID" and c."ANOTHER_ID" = cc."ANOTHER_ID"`, mergeSQL) -} - -func TestMergeStatementEscapePrimaryKeys(t *testing.T) { - // No idempotent key - fqTable := "database.schema.table" - colToTypes := map[string]typing.KindDetails{ - "id": typing.String, - "group": typing.String, - "updated_at": typing.String, - "start": typing.String, - constants.DeleteColumnMarker: typing.Boolean, - } - - // This feels a bit round about, but this is because iterating over a map is not deterministic. - cols := []string{"id", "group", "updated_at", "start", constants.DeleteColumnMarker} - var _cols columns.Columns - for _, col := range cols { - _cols.AddColumn(columns.NewColumn(col, colToTypes[col])) - } - - tableValues := []string{ - fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "1", "456", "foo", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "2", "bb", "bar", time.Now().Round(0).UTC()), - fmt.Sprintf("('%s', '%s', '%v', '%v', false)", "3", "dd", "world", time.Now().Round(0).UTC()), - } - - // select cc.foo, cc.bar from (values (12, 34), (44, 55)) as cc(foo, bar); - subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", - strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) - - fakeTableID := &mocks.FakeTableIdentifier{} - fakeTableID.FullyQualifiedNameReturns(fqTable) - mergeArg := MergeArgument{ - TableID: fakeTableID, - SubQuery: subQuery, - IdempotentKey: "", - PrimaryKeys: []columns.Column{ - columns.NewColumn("id", typing.Invalid), - columns.NewColumn("group", typing.Invalid), - }, - Columns: _cols.ValidColumns(), - Dialect: snowflakeDialect.SnowflakeDialect{}, - SoftDelete: false, - } - - statements, err := mergeArg.BuildStatements() - assert.Len(t, statements, 1) - mergeSQL := statements[0] - assert.NoError(t, err) - assert.Contains(t, mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable), mergeSQL) - assert.NotContains(t, mergeSQL, fmt.Sprintf("cc.%s >= c.%s", `"UPDATED_AT"`, `"UPDATED_AT"`), fmt.Sprintf("Idempotency key: %s", mergeSQL)) - // Check primary keys clause - assert.Contains(t, mergeSQL, `AS cc ON c."ID" = cc."ID" and c."GROUP" = cc."GROUP"`, mergeSQL) - // Check setting for update - assert.Contains(t, mergeSQL, `SET "ID"=cc."ID","GROUP"=cc."GROUP","UPDATED_AT"=cc."UPDATED_AT","START"=cc."START"`, mergeSQL) - // Check for INSERT - assert.Contains(t, mergeSQL, `"ID","GROUP","UPDATED_AT","START"`, mergeSQL) - assert.Contains(t, mergeSQL, `cc."ID",cc."GROUP",cc."UPDATED_AT",cc."START"`, mergeSQL) -} - func TestMergeArgument_BuildStatements_Validation(t *testing.T) { for _, arg := range []*MergeArgument{ {Dialect: snowflakeDialect.SnowflakeDialect{}},