Skip to content

Commit

Permalink
0005387: Initial load delete or truncate with table transform
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 1, 2022
1 parent 64f480e commit ab31a73
Showing 1 changed file with 30 additions and 16 deletions.
Expand Up @@ -116,7 +116,8 @@ public boolean start(Table table) {

protected boolean isTransformable(DataEventType eventType) {
return eventType != null
&& (eventType == DataEventType.INSERT || eventType == DataEventType.UPDATE || eventType == DataEventType.DELETE);
&& (eventType == DataEventType.INSERT || eventType == DataEventType.UPDATE || eventType == DataEventType.DELETE
|| eventType == DataEventType.SQL);
}

public void write(CsvData data) {
Expand All @@ -128,6 +129,15 @@ public void write(CsvData data) {
// use the last table we used
start(context.getLastParsedTable());
}
if (eventType == DataEventType.SQL) {
List<TransformTable> transformTables = activeTransforms;
for (TransformTable transformation : transformTables) {
Table transformedTable = new Table(transformation.getTargetCatalogName(),
transformation.getTargetSchemaName(), transformation.getTargetTableName());
callWriter(transformedTable, data);
}
return;
}
Map<String, String> sourceValues = data.toColumnNameValuePairs(this.sourceTable.getColumnNames(),
CsvData.ROW_DATA);
Map<String, String> oldSourceValues = null;
Expand Down Expand Up @@ -166,21 +176,7 @@ public void write(CsvData data) {
for (TransformedData transformedData : dataThatHasBeenTransformed) {
Table transformedTable = transformedData.buildTargetTable();
CsvData csvData = transformedData.buildTargetCsvData(data.getAttributes());
boolean processData = true;
if (lastTransformedTable == null || transformedTable == null || !lastTransformedTable.equalsByName(transformedTable)) {
if (lastTransformedTable != null) {
this.nestedWriter.end(lastTransformedTable);
}
processData = this.nestedWriter.start(transformedTable);
if (!processData) {
lastTransformedTable = null;
} else {
lastTransformedTable = transformedTable;
}
}
if (processData || !csvData.requiresTable()) {
this.nestedWriter.write(csvData);
}
callWriter(transformedTable, csvData);
}
}
} else {
Expand All @@ -194,6 +190,24 @@ public void write(CsvData data) {
}
}

protected void callWriter(Table transformedTable, CsvData csvData) {
boolean processData = true;
if (lastTransformedTable == null || transformedTable == null || !lastTransformedTable.equalsByName(transformedTable)) {
if (lastTransformedTable != null) {
this.nestedWriter.end(lastTransformedTable);
}
processData = this.nestedWriter.start(transformedTable);
if (!processData) {
lastTransformedTable = null;
} else {
lastTransformedTable = transformedTable;
}
}
if (processData || !csvData.requiresTable()) {
this.nestedWriter.write(csvData);
}
}

protected List<TransformedData> transform(DataEventType eventType, DataContext context,
TransformTable transformation, Map<String, String> sourceKeyValues,
Map<String, String> oldSourceValues, Map<String, String> sourceValues) {
Expand Down

0 comments on commit ab31a73

Please sign in to comment.