Skip to content

Commit

Permalink
Spark: backport apache#8656 and update docs (apache#9512)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored and adnanhemani committed Jan 30, 2024
1 parent 4a0ce49 commit 589e822
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 187 deletions.
1 change: 0 additions & 1 deletion docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ Creates a view that contains the changes from a given table.
| `net_changes` | | boolean | Whether to output net changes (see below for more information). Defaults to false. |
| `compute_updates` | | boolean | Whether to compute pre/post update images (see below for more information). Defaults to false. |
| `identifier_columns` | | array<string> | The list of identifier columns to compute updates. If the argument `compute_updates` is set to true and `identifier_columns` are not provided, the table’s current identifier fields will be used. |
| `remove_carryovers` | | boolean | Whether to remove carry-over rows (see below for more information). Defaults to true. Deprecated since 1.4.0, will be removed in 1.5.0; Please query `SparkChangelogTable` to view carry-over rows. |

Here is a list of commonly used Spark read options:
* `start-snapshot-id`: the exclusive start snapshot ID. If not provided, it reads from the table’s first snapshot inclusively.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testWithCarryovers() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName, "cdc_view");

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() {
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// carry-over row (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
// the following two rows are carry-over rows
row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
/**
* A procedure that creates a view for changed rows.
*
* <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
* "remove_carryovers" to be false in the options.
* <p>The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable}
* instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
* them, you can set "compute_updates" to be true in the options.
Expand Down Expand Up @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);

/**
* Enable or disable the remove carry-over rows.
*
* @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over
* rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove
* carry-over rows.
*/
@Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);

private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
Expand All @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
Expand Down Expand Up @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
} else {
df = removeCarryoverRows(df, netChanges);
}

Expand Down Expand Up @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testWithCarryovers() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName, "cdc_view");

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() {
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// carry-over row (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
// the following two rows are carry-over rows
row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
/**
* A procedure that creates a view for changed rows.
*
* <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
* "remove_carryovers" to be false in the options.
* <p>The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable}
* instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
* them, you can set "compute_updates" to be true in the options.
Expand Down Expand Up @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);

/**
* Enable or disable the remove carry-over rows.
*
* @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over
* rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove
* carry-over rows.
*/
@Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);

private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
Expand All @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
Expand Down Expand Up @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
} else {
df = removeCarryoverRows(df, netChanges);
}

Expand Down Expand Up @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
/**
* A procedure that creates a view for changed rows.
*
* <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
* "remove_carryovers" to be false in the options.
* <p>The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable}
* instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
* them, you can set "compute_updates" to be true in the options.
Expand Down

0 comments on commit 589e822

Please sign in to comment.