From bd12dea887057075c80ad5dd43fbd59b6fbe2e82 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Tue, 30 Aug 2016 16:35:35 -0400 Subject: [PATCH] 0002759: Abstract File Parsing Router needs to remove context entries when file is deleted --- .../route/AbstractFileParsingRouter.java | 113 ++++++++++-------- .../symmetric/service/IContextService.java | 2 + .../service/impl/ContextService.java | 4 + 3 files changed, 66 insertions(+), 53 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java index 30a42c0a7c..d66ff415f2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractFileParsingRouter.java @@ -54,71 +54,78 @@ public Set routeToNodes(SimpleRouterContext context, DataMetaData dataMe String fileName = newData.get("FILE_NAME"); String relativeDir = newData.get("RELATIVE_DIR"); String triggerId = newData.get("TRIGGER_ID"); + String lastEventType = newData.get("LAST_EVENT_TYPE"); String routerExpression = dataMetaData.getRouter().getRouterExpression(); String channelId = "default"; - if (routerExpression != null) { - String[] keyValues = routerExpression.split(","); - if (keyValues.length > 0) { - for (int i=0; i< keyValues.length; i++) { - String[] keyValue = keyValues[i].split("="); - if (keyValue.length > 1) { - if (ROUTER_EXPRESSION_CHANNEL_KEY.equals(keyValue[0])) { - channelId = keyValue[1]; + String filePath = relativeDir + "/" + fileName; + IContextService contextService = getEngine().getContextService(); + + if (lastEventType.equals(DataEventType.DELETE.toString())) { + log.debug("File deleted (" + filePath + "), cleaning up context value."); + contextService.delete(filePath); + } + else { + if (routerExpression != null) { + String[] keyValues = routerExpression.split(","); + if (keyValues.length > 0) { + for (int i=0; i< keyValues.length; i++) { + String[] keyValue = keyValues[i].split("="); + if (keyValue.length > 1) { + if (ROUTER_EXPRESSION_CHANNEL_KEY.equals(keyValue[0])) { + channelId = keyValue[1]; + } } } } } - } - if (triggerId != null) { - String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir(); - File file = createSourceFile(baseDir, relativeDir, fileName); - - IContextService contextService = getEngine().getContextService(); - - String filePath = relativeDir + "/" + fileName; - - Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath)); - List dataRows = parse(file, lineNumber); - String columnNames = getColumnNames(); - - String nodeList = buildNodeList(nodes); - String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY) - .append("=") - .append(triggerId) - .append(",") - .append(EXTERNAL_DATA_ROUTER_KEY) - .append("=") - .append(dataMetaData.getRouter().getRouterId()) - .append(",") - .append(EXTERNAL_DATA_FILE_DATA_ID) - .append("=") - .append(dataMetaData.getData().getDataId()).toString(); - - for (String row : dataRows) { - Data data = new Data(); + if (triggerId != null) { + String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir(); + File file = createSourceFile(baseDir, relativeDir, fileName); - data.setChannelId(channelId); - data.setDataEventType(DataEventType.INSERT); - data.setRowData(row); - data.setTableName(targetTableName); - data.setNodeList(nodeList); - data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames)); - data.setExternalData(externalData); - data.setDataId(getEngine().getDataService().insertData(data)); - lineNumber++; - } - if (!dataRows.isEmpty()) { - try { - contextService.save(filePath, lineNumber.toString()); - deleteFileIfNecessary(dataMetaData); + + Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath)); + + List dataRows = parse(file, lineNumber); + String columnNames = getColumnNames(); + + String nodeList = buildNodeList(nodes); + String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY) + .append("=") + .append(triggerId) + .append(",") + .append(EXTERNAL_DATA_ROUTER_KEY) + .append("=") + .append(dataMetaData.getRouter().getRouterId()) + .append(",") + .append(EXTERNAL_DATA_FILE_DATA_ID) + .append("=") + .append(dataMetaData.getData().getDataId()).toString(); + + for (String row : dataRows) { + Data data = new Data(); + + data.setChannelId(channelId); + data.setDataEventType(DataEventType.INSERT); + data.setRowData(row); + data.setTableName(targetTableName); + data.setNodeList(nodeList); + data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames)); + data.setExternalData(externalData); + data.setDataId(getEngine().getDataService().insertData(data)); + lineNumber++; } - catch (Exception e) { - e.printStackTrace(); + if (!dataRows.isEmpty()) { + try { + contextService.save(filePath, lineNumber.toString()); + deleteFileIfNecessary(dataMetaData); + } + catch (Exception e) { + e.printStackTrace(); + } } } } - return new HashSet(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IContextService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IContextService.java index 80cfc99cd4..b9a7f0d822 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IContextService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IContextService.java @@ -42,6 +42,8 @@ public interface IContextService { public int delete(ISqlTransaction transaction, String name); + public int delete(String name); + public void save(String name, String value); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ContextService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ContextService.java index b072aa9c9b..bb96341860 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ContextService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ContextService.java @@ -75,6 +75,10 @@ public int update(ISqlTransaction transaction, String name, String value) { public int delete(ISqlTransaction transaction, String name) { return transaction.prepareAndExecute(getSql("deleteSql"), name); } + + public int delete(String name) { + return sqlTemplate.update(getSql("deleteSql"), name); + } public void save(String name, String value) { int count = sqlTemplate.update(getSql("updateSql"), value, name);