Skip to content

Commit

Permalink
0000032: Implement file synchronization - dev checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 10, 2013
1 parent 74b3d36 commit b1288cf
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 31 deletions.
Expand Up @@ -110,6 +110,17 @@ public void write(CsvData data) {
snapshot.setFilePath(columnData.get("FILE_PATH"));
snapshot.setLastEventType(LastEventType.fromCode(columnData.get("LAST_EVENT_TYPE")));
snapshotEvents.add(snapshot);
} else if (eventType == DataEventType.DELETE) {
Map<String, String> columnData = data.toColumnNameValuePairs(
snapshotTable.getPrimaryKeyColumnNames(), CsvData.PK_DATA);
FileSnapshot snapshot = new FileSnapshot();
snapshot.setTriggerId(columnData.get("TRIGGER_ID"));
snapshot.setRouterId(columnData.get("ROUTER_ID"));
snapshot.setFileName(columnData.get("FILE_NAME"));
snapshot.setFilePath(columnData.get("FILE_PATH"));
snapshot.setLastEventType(LastEventType.DELETE);
snapshotEvents.add(snapshot);

}
}

Expand Down
Expand Up @@ -53,6 +53,13 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
engine.getSymmetricDialect());
String triggerId = newData.get("TRIGGER_ID");
String routerId = newData.get("ROUTER_ID");

if (triggerId == null) {
Map<String, String> oldData = getOldDataAsString(null, dataMetaData,
engine.getSymmetricDialect());
triggerId = oldData.get("TRIGGER_ID");
routerId = oldData.get("ROUTER_ID");
}
FileTriggerRouter fileTriggerRouter = fileSyncService.getFileTriggerRouter(
triggerId, routerId);
if (fileTriggerRouter != null) {
Expand Down
Expand Up @@ -242,28 +242,38 @@ public void save(List<FileSnapshot> changes) {
}

public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) {
snapshot.setLastUpdateTime(new Date());
if (0 == sqlTransaction.prepareAndExecute(
getSql("updateFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(), snapshot.getCrc32Checksum(),
snapshot.getFileSize(), snapshot.getFileModifiedTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(),
snapshot.getTriggerId(), snapshot.getRouterId(), snapshot.getFilePath(),
snapshot.getFileName() }, new int[] { Types.VARCHAR, Types.NUMERIC,
Types.NUMERIC, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) {
snapshot.setCreateTime(snapshot.getLastUpdateTime());
sqlTransaction.prepareAndExecute(
getSql("insertFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(),
snapshot.getCrc32Checksum(), snapshot.getFileSize(),
snapshot.getFileModifiedTime(), snapshot.getCreateTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(),
snapshot.getTriggerId(), snapshot.getRouterId(),
snapshot.getFilePath(), snapshot.getFileName() }, new int[] {
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
if (snapshot.getLastEventType() == LastEventType.DELETE) {
sqlTransaction.prepareAndExecute(getSql("deleteFileSnapshotSql"), new Object[] {
snapshot.getTriggerId(), snapshot.getRouterId(), snapshot.getFilePath(),
snapshot.getFileName() }, new int[] { Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR });
} else {
snapshot.setLastUpdateTime(new Date());
if (0 == sqlTransaction
.prepareAndExecute(
getSql("updateFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(),
snapshot.getCrc32Checksum(), snapshot.getFileSize(),
snapshot.getFileModifiedTime(), snapshot.getLastUpdateTime(),
snapshot.getLastUpdateBy(), snapshot.getTriggerId(),
snapshot.getRouterId(), snapshot.getFilePath(),
snapshot.getFileName() }, new int[] { Types.VARCHAR,
Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.TIMESTAMP,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR })) {
snapshot.setCreateTime(snapshot.getLastUpdateTime());
sqlTransaction.prepareAndExecute(
getSql("insertFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(),
snapshot.getCrc32Checksum(), snapshot.getFileSize(),
snapshot.getFileModifiedTime(), snapshot.getCreateTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(),
snapshot.getTriggerId(), snapshot.getRouterId(),
snapshot.getFilePath(), snapshot.getFileName() }, new int[] {
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
}
}

}
Expand Down Expand Up @@ -429,7 +439,9 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt
protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNodeStatus status,
Node identity, NodeSecurity security) {
IIncomingTransport transport = null;
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(new ProcessInfoKey(nodeCommunication.getNodeId(), identity.getNodeId(), ProcessType.FILE_SYNC_PULL_JOB));
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(nodeCommunication.getNodeId(), identity.getNodeId(),
ProcessType.FILE_SYNC_PULL_JOB));
try {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
File unzipDir = new File(parameterService.getTempDirectory(), "filesync_incoming/"
Expand Down Expand Up @@ -488,7 +500,7 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode
ex = target;
}
}

log.error("Failed to process file sync batch " + batchId, ex);

incomingBatch.setErrorFlag(true);
Expand All @@ -497,29 +509,29 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode
incomingBatchService.updateIncomingBatch(incomingBatch);
processInfo.setStatus(ProcessInfo.Status.ERROR);
break;

}
} else {
log.error("Could not find the sync.bsh script for batch {}", batchId);
}
}

}

if (batchesProcessed.size() > 0) {
processInfo.setStatus(ProcessInfo.Status.ACKING);
status.updateIncomingStatus(batchesProcessed);
sendAck(nodeCommunication.getNode(), identity, security, batchesProcessed, engine.getTransportManager());
sendAck(nodeCommunication.getNode(), identity, security, batchesProcessed,
engine.getTransportManager());
}


} catch (IOException e) {
throw new IoException(e);
} finally {
if (transport != null) {
transport.close();
}

if (processInfo.getStatus() != ProcessInfo.Status.ERROR) {
processInfo.setStatus(ProcessInfo.Status.DONE);
}
Expand Down
Expand Up @@ -49,6 +49,11 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" last_update_by=? " +
" where " +
" trigger_id=? and router_id=? and file_path=? and file_name=? ");

putSql("deleteFileSnapshotSql",
" delete from $(file_snapshot) " +
" where " +
" trigger_id=? and router_id=? and file_path=? and file_name=? ");

putSql("insertFileSnapshotSql",
" insert into $(file_snapshot) ( " +
Expand Down
5 changes: 4 additions & 1 deletion symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java
Expand Up @@ -198,7 +198,10 @@ public static void unzip(InputStream in, File toDir) {
dir.setLastModified(entry.getTime());
} else {
File file = new File(toDir, entry.getName());
file.getParentFile().mkdirs();
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
file.getParentFile().setLastModified(entry.getTime());
}
FileOutputStream fos = new FileOutputStream(file);
try {
IOUtils.copy(is, fos);
Expand Down

0 comments on commit b1288cf

Please sign in to comment.