diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 0ffb613839..c9b3e9842e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -88,8 +88,12 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch public void reloadMissingForeignKeyRows(String nodeId, long dataId); - public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, String channelId, boolean sendCorrectionToPeers); + public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, long batchId, long rowNumber, Table table, CsvData data, + String channelId, boolean sendCorrectionToPeers); + public void sendMissingRowsForInitialLoad(String nodeId, long batchId, long rowNumber, String catalogName, String schemaName, String tableName, + String whereSql, String channelId); + public void sendNewerDataToNode(ISqlTransaction transaction, String targetNodeId, String tableName, String pkCsvData, Date minCreateTime, String winningNodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 900902f51d..4622d7358a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -649,15 +649,16 @@ protected IDataWriter chooseDataWriter(Batch batch) { && listener.getCurrentBatch() != null && listener.isNewErrorForCurrentBatch() && listener.getCurrentBatch().isLoadFlag() && listener.getCurrentBatch().getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) { - engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getTable(), - ctx.getData(), Constants.CHANNEL_CONFIG, false); + engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getBatch().getBatchId(), + listener.getCurrentBatch().getFailedLineNumber(), ctx.getTable(), ctx.getData(), Constants.CHANNEL_CONFIG, false); } if (parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE) && listener.getCurrentBatch() != null && listener.isNewErrorForCurrentBatch() && !listener.getCurrentBatch().isLoadFlag() && listener.getCurrentBatch().getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) { - engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getTable(), - ctx.getData(), null, parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS)); + engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getBatch().getBatchId(), + listener.getCurrentBatch().getFailedLineNumber(), ctx.getTable(), ctx.getData(), null, + parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS)); } logOrRethrow(ex); } finally { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 15f8413088..5229601ddb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -2444,7 +2444,8 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam } - public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, String channelId, boolean sendCorrectionToPeers) { + public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, long batchId, long rowNumber, Table table, CsvData data, String channelId, + boolean sendCorrectionToPeers) { try { IDatabasePlatform platform = engine.getTargetDialect().getPlatform(); Map dataMap = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); @@ -2497,12 +2498,19 @@ public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, foreignTableRow.getWhereSql(), localTable.getName(), foreignTableRow.getReferenceColumnName()); for (Node targetNode : targetNodes) { - script.append("engine.getDataService().reloadTableImmediate(\"" + targetNode.getNodeId() + "\", " + - (catalog == null ? catalog : "\"" + catalog + "\"") + ", " + + script.append("try { engine.getDataService().sendMissingRowsForInitialLoad(\"" + targetNode.getNodeId() + "\", " + + batchId + ", " + rowNumber + ", " + (catalog == null ? catalog : "\"" + catalog + "\"") + ", " + (schema == null ? schema : "\"" + schema + "\"") + ", \"" + foreignTable.getName().replace("\"", "\\\"") + "\", \"" + foreignTableRow.getWhereSql().replace("\"", "\\\"") + "\", " + (channelId == null ? channelId : "\"" + channelId + "\"") + ");\n"); + + script.append("} catch (Exception e) { engine.getDataService().reloadTableImmediate(\"" + targetNode.getNodeId() + "\", " + + (catalog == null ? catalog : "\"" + catalog + "\"") + ", " + + (schema == null ? schema : "\"" + schema + "\"") + ", \"" + + foreignTable.getName().replace("\"", "\\\"") + "\", \"" + + foreignTableRow.getWhereSql().replace("\"", "\\\"") + "\", " + + (channelId == null ? channelId : "\"" + channelId + "\"") + "); }\n"); } } } @@ -2514,6 +2522,32 @@ public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, log.error("Unknown exception while processing foreign key for node id: " + sourceNodeId, e); } } + + /** + * Check if source has the foreign key parent rows that the target is reporting as missing. + * If they exist, send a reload batch for each row. Otherwise, send a SQL batch to ignore the row since we can't resolve it. + * + * TODO: should reloadMissingForeignKeyRows() also call this to handle similar case with backlog of changes and a busy system? + */ + public void sendMissingRowsForInitialLoad(String nodeId, long batchId, long rowNumber, String catalogName, String schemaName, String tableName, + String whereSql, String channelId) { + IDatabasePlatform platform = engine.getTargetDialect().getPlatform(); + DatabaseInfo info = platform.getDatabaseInfo(); + String quote = info.getDelimiterToken() == null || !platform.getDdlBuilder().isDelimitedIdentifierModeOn() ? "" : info.getDelimiterToken(); + String fullTableName = Table.getFullyQualifiedTableName(catalogName, schemaName, tableName, quote, + info.getCatalogSeparator(), info.getSchemaSeparator()); + // TODO: still a race condition here before push/pull, maybe use createData() instead + long count = platform.getSqlTemplateDirty().queryForLong("select count(*) from " + fullTableName + " where " + whereSql); + if (count > 0) { + reloadTableImmediate(nodeId, catalogName, schemaName, tableName, whereSql, channelId); + } else { + log.info("Resolving initial load batch {}-{} to ignore the row because no rows found for {} where {}", nodeId, batchId, + fullTableName, whereSql); + sendSQL(nodeId, "update " + engine.getParameterService().getTablePrefix() + + "_incoming_error set resolve_ignore = 1 where batch_id = " + batchId + " and node_id = '" + engine.getNodeId() + + "' and failed_row_number = " + rowNumber); + } + } public void reloadMissingForeignKeyRows(String nodeId, long dataId) { try {