Skip to content

Commit

Permalink
Merge branch '3.14' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.14
  • Loading branch information
joshahicks committed Jun 30, 2023
2 parents cd74b7d + 23744fb commit aac7f6b
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 62 deletions.
Expand Up @@ -160,7 +160,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
if (useTriggerTemplateForColumnTemplatesDuringInitialLoad(column)) {
ColumnString columnString = fillOutColumnTemplate(tableAlias,
tableAlias, "", table, column, DataEventType.INSERT, false, channel,
triggerRouter.getTrigger());
triggerRouter.getTrigger(), true);
columnExpression = columnString.columnString;
if (isNotBlank(textColumnExpression)
&& TypeMap.isTextType(column.getMappedTypeCode())) {
Expand Down Expand Up @@ -777,7 +777,7 @@ protected ColumnString buildColumnsString(String origTableAlias, String tableAli
Column column = columns[i];
if (column != null) {
ColumnString columnString = fillOutColumnTemplate(origTableAlias, tableAlias,
columnPrefix, table, column, dml, isOld, channel, trigger);
columnPrefix, table, column, dml, isOld, channel, trigger, false);
columnsText = columnsText + "\n " + columnString.columnString
+ lastCommandToken;
containsLob |= columnString.isBlobClob;
Expand All @@ -792,7 +792,7 @@ protected ColumnString buildColumnsString(String origTableAlias, String tableAli

protected ColumnString fillOutColumnTemplate(String origTableAlias, String tableAlias,
String columnPrefix, Table table, Column column, DataEventType dml, boolean isOld, Channel channel,
Trigger trigger) {
Trigger trigger, boolean ignoreStreamLobs) {
boolean isLob = symmetricDialect.getPlatform().isLob(column.getMappedTypeCode());
String templateToUse = null;
if (column.getJdbcTypeName() != null
Expand Down Expand Up @@ -925,7 +925,7 @@ protected ColumnString fillOutColumnTemplate(String origTableAlias, String table
}
if (dml == DataEventType.DELETE && isLob && requiresEmptyLobTemplateForDeletes()) {
templateToUse = emptyColumnTemplate;
} else if (isLob && trigger.isUseStreamLobs()) {
} else if (isLob && trigger.isUseStreamLobs() && !ignoreStreamLobs) {
templateToUse = emptyColumnTemplate;
}
if (templateToUse != null) {
Expand Down
Expand Up @@ -35,7 +35,7 @@ public class AbstractBatch implements Serializable {

public enum Status {
OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), RS(
"Resend"), XX("Unknown");
"Resend"), XX("Unknown"), LS("LoadSetup");

private String description;

Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ExtractRequest implements Serializable {
private static final long serialVersionUID = 1L;

public enum ExtractStatus {
NE, OK
NE, OK, LS
};

private long requestId;
Expand Down
Expand Up @@ -149,7 +149,7 @@ protected static String getLogDetails(MonitorEvent event) throws IOException {
protected static List<String> deserializeOfflineNodes(MonitorEvent event) throws IOException {
List<String> nodes = null;
if (event.getDetails() != null) {
new Gson().fromJson(event.getDetails(), new TypeToken<List<String>>() {
nodes = new Gson().fromJson(event.getDetails(), new TypeToken<List<String>>() {
}.getType());
}
if (nodes == null) {
Expand Down
Expand Up @@ -86,4 +86,7 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String
public int cancelExtractRequests(long loadId);

public void releaseMissedExtractRequests();

public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId,
String fromStatus, String toStatus);
}
Expand Up @@ -75,6 +75,15 @@ public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long sta

public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId);

public void updateOutgoingSetupBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long maxBatchId, String fromStatus, String toStatus);

public void updateOutgoingLoadBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long startDataBatchId, long endDataBatchId, String fromStatus, String toStatus);

public void updateOutgoingFinalizeBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long minBatchId, String fromStatus, String toStatus);

public void updateCommonBatchExtractStatistics(OutgoingBatch batch);

public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch);
Expand Down
Expand Up @@ -1725,7 +1725,7 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String
requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ);
}
transaction.prepareAndExecute(getSql("insertExtractRequestSql"),
new Object[] { requestId, engine.getNodeId(), nodeId, queue, ExtractStatus.NE.name(), startBatchId, endBatchId,
new Object[] { requestId, engine.getNodeId(), nodeId, queue, ExtractStatus.LS.name(), startBatchId, endBatchId,
triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId(), loadId,
table, rows, parentRequestId, new Date(), new Date() },
new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT,
Expand All @@ -1735,7 +1735,7 @@ table, rows, parentRequestId, new Date(), new Date() },
request.setRequestId(requestId);
request.setNodeId(nodeId);
request.setQueue(queue);
request.setStatus(ExtractStatus.NE);
request.setStatus(ExtractStatus.LS);
request.setStartBatchId(startBatchId);
request.setEndBatchId(endBatchId);
request.setRouterId(triggerRouter.getRouterId());
Expand Down Expand Up @@ -2063,6 +2063,13 @@ public void removeBatchFromStaging(OutgoingBatch batch) {
}
}

@Override
public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId,
String fromStatus, String toStatus) {
transaction.prepareAndExecute(getSql("updateExtractRequestStatuses"),
toStatus, new Date(), loadId, sourceNodeId, fromStatus);
}

static class FutureExtractStatus {
boolean shouldExtractSkip;
int batchExtractCount;
Expand Down
Expand Up @@ -80,6 +80,9 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,
putSql("selectIncompleteTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is null and node_id = ? order by request_id");

putSql("selectCompletedTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is not null and node_id = ? order by request_id");

putSql("updateExtractRequestStatuses", "update $(extract_request) set status=?, last_update_time=? "
+ "where load_id=? and source_node_id=? and status=?");
}

}

0 comments on commit aac7f6b

Please sign in to comment.