Skip to content

Commit

Permalink
0002759: Abstract File Parsing Router needs to remove context entries
Browse files Browse the repository at this point in the history
when file is deleted
  • Loading branch information
jumpmind-josh committed Aug 30, 2016
1 parent a7b0158 commit bd12dea
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 53 deletions.
Expand Up @@ -54,71 +54,78 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
String fileName = newData.get("FILE_NAME");
String relativeDir = newData.get("RELATIVE_DIR");
String triggerId = newData.get("TRIGGER_ID");
String lastEventType = newData.get("LAST_EVENT_TYPE");
String routerExpression = dataMetaData.getRouter().getRouterExpression();
String channelId = "default";
if (routerExpression != null) {
String[] keyValues = routerExpression.split(",");
if (keyValues.length > 0) {
for (int i=0; i< keyValues.length; i++) {
String[] keyValue = keyValues[i].split("=");
if (keyValue.length > 1) {
if (ROUTER_EXPRESSION_CHANNEL_KEY.equals(keyValue[0])) {
channelId = keyValue[1];
String filePath = relativeDir + "/" + fileName;
IContextService contextService = getEngine().getContextService();

if (lastEventType.equals(DataEventType.DELETE.toString())) {
log.debug("File deleted (" + filePath + "), cleaning up context value.");
contextService.delete(filePath);
}
else {
if (routerExpression != null) {
String[] keyValues = routerExpression.split(",");
if (keyValues.length > 0) {
for (int i=0; i< keyValues.length; i++) {
String[] keyValue = keyValues[i].split("=");
if (keyValue.length > 1) {
if (ROUTER_EXPRESSION_CHANNEL_KEY.equals(keyValue[0])) {
channelId = keyValue[1];
}
}
}
}
}
}
if (triggerId != null) {
String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir();
File file = createSourceFile(baseDir, relativeDir, fileName);

IContextService contextService = getEngine().getContextService();

String filePath = relativeDir + "/" + fileName;

Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath));

List<String> dataRows = parse(file, lineNumber);
String columnNames = getColumnNames();

String nodeList = buildNodeList(nodes);
String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY)
.append("=")
.append(triggerId)
.append(",")
.append(EXTERNAL_DATA_ROUTER_KEY)
.append("=")
.append(dataMetaData.getRouter().getRouterId())
.append(",")
.append(EXTERNAL_DATA_FILE_DATA_ID)
.append("=")
.append(dataMetaData.getData().getDataId()).toString();

for (String row : dataRows) {
Data data = new Data();
if (triggerId != null) {
String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir();
File file = createSourceFile(baseDir, relativeDir, fileName);

data.setChannelId(channelId);
data.setDataEventType(DataEventType.INSERT);
data.setRowData(row);
data.setTableName(targetTableName);
data.setNodeList(nodeList);
data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames));
data.setExternalData(externalData);
data.setDataId(getEngine().getDataService().insertData(data));
lineNumber++;
}
if (!dataRows.isEmpty()) {
try {
contextService.save(filePath, lineNumber.toString());
deleteFileIfNecessary(dataMetaData);

Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath));

List<String> dataRows = parse(file, lineNumber);
String columnNames = getColumnNames();

String nodeList = buildNodeList(nodes);
String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY)
.append("=")
.append(triggerId)
.append(",")
.append(EXTERNAL_DATA_ROUTER_KEY)
.append("=")
.append(dataMetaData.getRouter().getRouterId())
.append(",")
.append(EXTERNAL_DATA_FILE_DATA_ID)
.append("=")
.append(dataMetaData.getData().getDataId()).toString();

for (String row : dataRows) {
Data data = new Data();

data.setChannelId(channelId);
data.setDataEventType(DataEventType.INSERT);
data.setRowData(row);
data.setTableName(targetTableName);
data.setNodeList(nodeList);
data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames));
data.setExternalData(externalData);
data.setDataId(getEngine().getDataService().insertData(data));
lineNumber++;
}
catch (Exception e) {
e.printStackTrace();
if (!dataRows.isEmpty()) {
try {
contextService.save(filePath, lineNumber.toString());
deleteFileIfNecessary(dataMetaData);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}

return new HashSet<String>();

}
Expand Down
Expand Up @@ -42,6 +42,8 @@ public interface IContextService {

public int delete(ISqlTransaction transaction, String name);

public int delete(String name);

public void save(String name, String value);

}
Expand Up @@ -75,6 +75,10 @@ public int update(ISqlTransaction transaction, String name, String value) {
public int delete(ISqlTransaction transaction, String name) {
return transaction.prepareAndExecute(getSql("deleteSql"), name);
}

public int delete(String name) {
return sqlTemplate.update(getSql("deleteSql"), name);
}

public void save(String name, String value) {
int count = sqlTemplate.update(getSql("updateSql"), value, name);
Expand Down

0 comments on commit bd12dea

Please sign in to comment.