Skip to content

Commit

Permalink
[tests] Move BuildMergeQueries tests to dialect test files (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed May 14, 2024
1 parent 9d95906 commit 076a57f
Show file tree
Hide file tree
Showing 6 changed files with 595 additions and 654 deletions.
49 changes: 49 additions & 0 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,52 @@ func TestBuildColumnsUpdateFragment(t *testing.T) {
assert.Equal(t, _testCase.expectedString, actualQuery, _testCase.name)
}
}

func TestBigQueryDialect_BuildMergeQueries_TempTable(t *testing.T) {
var cols columns.Columns
cols.AddColumn(columns.NewColumn("order_id", typing.Integer))
cols.AddColumn(columns.NewColumn("name", typing.String))
cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("customers.orders")

statements, err := BigQueryDialect{}.BuildMergeQueries(
fakeTableID,
"customers.orders_tmp",
"",
[]columns.Column{columns.NewColumn("order_id", typing.Invalid)},
nil,
cols.ValidColumns(),
false,
nil,
)
assert.NoError(t, err)
assert.Len(t, statements, 1)
assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON c.`order_id` = cc.`order_id`")
}

func TestBigQueryDialect_BuildMergeQueries_JSONKey(t *testing.T) {
orderOIDCol := columns.NewColumn("order_oid", typing.Struct)
var cols columns.Columns
cols.AddColumn(orderOIDCol)
cols.AddColumn(columns.NewColumn("name", typing.String))
cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("customers.orders")

statements, err := BigQueryDialect{}.BuildMergeQueries(
fakeTableID,
"customers.orders_tmp",
"",
[]columns.Column{orderOIDCol},
nil,
cols.ValidColumns(),
false,
nil,
)
assert.Len(t, statements, 1)
assert.NoError(t, err)
assert.Contains(t, statements[0], "MERGE INTO customers.orders c USING customers.orders_tmp AS cc ON TO_JSON_STRING(c.`order_oid`) = TO_JSON_STRING(cc.`order_oid`)")
}
290 changes: 290 additions & 0 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,293 @@ func TestRedshiftDialect_BuildMergeDeleteQuery(t *testing.T) {
),
)
}

type result struct {
PrimaryKeys []columns.Column
Columns []columns.Column
}

// getBasicColumnsForTest - will return you all the columns within `result` that are needed for tests.
// * In here, we'll return if compositeKey=false - id (pk), email, first_name, last_name, created_at, toast_text (TOAST-able)
// * Else if compositeKey=true - id(pk), email (pk), first_name, last_name, created_at, toast_text (TOAST-able)
func getBasicColumnsForTest(compositeKey bool) result {
idCol := columns.NewColumn("id", typing.Float)
emailCol := columns.NewColumn("email", typing.String)
textToastCol := columns.NewColumn("toast_text", typing.String)
textToastCol.ToastColumn = true

var cols columns.Columns
cols.AddColumn(idCol)
cols.AddColumn(emailCol)
cols.AddColumn(columns.NewColumn("first_name", typing.String))
cols.AddColumn(columns.NewColumn("last_name", typing.String))
cols.AddColumn(columns.NewColumn("created_at", typing.ETime))
cols.AddColumn(textToastCol)
cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

var pks []columns.Column
pks = append(pks, idCol)

if compositeKey {
pks = append(pks, emailCol)
}

return result{
PrimaryKeys: pks,
Columns: cols.ValidColumns(),
}
}

func TestRedshiftDialect_BuildMergeQueries_SkipDelete(t *testing.T) {
// Biggest difference with this test are:
// 1. We are not saving `__artie_deleted` column
// 2. There are 3 SQL queries (INSERT, UPDATE and DELETE)
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(false)

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("public.tableName")
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"",
res.PrimaryKeys,
nil,
res.Columns,
false,
ptr.ToBool(false),
)
assert.NoError(t, err)
assert.Equal(t, 2, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`,
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])
}

func TestRedshiftDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(false)

fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("public.tableName")

{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"",
res.PrimaryKeys,
nil,
res.Columns,
true,
ptr.ToBool(false),
)
assert.NoError(t, err)
assert.Equal(t, 2, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`,
parts[0])
assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id";`,
parts[1])
}
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"created_at",
res.PrimaryKeys,
nil,
res.Columns,
true,
ptr.ToBool(false),
)
assert.NoError(t, err)

// Parts[0] for insertion should be identical
assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`,
parts[0])
// Parts[1] where we're doing UPDATES will have idempotency key.
assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at;`,
parts[1])
}
}

func TestRedshiftDialect_BuildMergeQueries_SoftDeleteComposite(t *testing.T) {
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(true)
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("public.tableName")
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"",
res.PrimaryKeys,
nil,
res.Columns,
true,
ptr.ToBool(false),
)
assert.NoError(t, err)
assert.Equal(t, 2, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`,
parts[0])
assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email";`,
parts[1])
}
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"created_at",
res.PrimaryKeys,
nil,
res.Columns,
true,
ptr.ToBool(false),
)
assert.NoError(t, err)

// Parts[0] for insertion should be identical
assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text",cc."__artie_delete" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`,
parts[0])
// Parts[1] where we're doing UPDATES will have idempotency key.
assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END,"__artie_delete"=cc."__artie_delete" FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND cc.created_at >= c.created_at;`,
parts[1])
}
}

func TestRedshiftDialect_BuildMergeQueries(t *testing.T) {
// Biggest difference with this test are:
// 1. We are not saving `__artie_deleted` column
// 2. There are 3 SQL queries (INSERT, UPDATE and DELETE)
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(false)
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("public.tableName")
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"",
res.PrimaryKeys,
nil,
res.Columns,
false,
ptr.ToBool(true),
)
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`,
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`,
parts[2])
}
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"created_at",
res.PrimaryKeys,
nil,
res.Columns,
false,
ptr.ToBool(true),
)
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" WHERE c."id" IS NULL;`,
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id") IN (SELECT cc."id" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`,
parts[2])
}
}

func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
tempTableName := "public.tableName__temp"
res := getBasicColumnsForTest(true)
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("public.tableName")
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"",
res.PrimaryKeys,
nil,
res.Columns,
false,
ptr.ToBool(true),
)
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`,
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`,
parts[2])
}
{
parts, err := RedshiftDialect{}.BuildMergeQueries(
fakeTableID,
tempTableName,
"created_at",
res.PrimaryKeys,
nil,
res.Columns,
false,
ptr.ToBool(true),
)
assert.NoError(t, err)
assert.Equal(t, 3, len(parts))

assert.Equal(t,
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text") SELECT cc."id",cc."email",cc."first_name",cc."last_name",cc."created_at",cc."toast_text" FROM public.tableName__temp AS cc LEFT JOIN public.tableName AS c ON c."id" = cc."id" AND c."email" = cc."email" WHERE c."id" IS NULL;`,
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS c SET "id"=cc."id","email"=cc."email","first_name"=cc."first_name","last_name"=cc."last_name","created_at"=cc."created_at","toast_text"= CASE WHEN COALESCE(cc."toast_text" != '__debezium_unavailable_value', true) THEN cc."toast_text" ELSE c."toast_text" END FROM public.tableName__temp AS cc WHERE c."id" = cc."id" AND c."email" = cc."email" AND cc.created_at >= c.created_at AND COALESCE(cc."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
`DELETE FROM public.tableName WHERE ("id","email") IN (SELECT cc."id",cc."email" FROM public.tableName__temp AS cc WHERE cc."__artie_delete" = true);`,
parts[2])
}
}

0 comments on commit 076a57f

Please sign in to comment.