Skip to content

Commit

Permalink
0003241: File sync should not sync all files when first scanning a
Browse files Browse the repository at this point in the history
directory
  • Loading branch information
klementinastojanovska committed Sep 11, 2017
1 parent b89e260 commit ce2d32c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
Expand Up @@ -48,7 +48,6 @@ public class ClusterConstants {
public static final String FILE_SYNC_PUSH = "File Sync Push";
public static final String MONITOR = "Monitor";

public static final String FILE_SYNC_SCAN = "FILE_SYNC_SCAN";
public static final String FILE_SYNC_SHARED = "FILE_SYNC_SHARED";

public static final String TYPE_CLUSTER = "CLUSTER";
Expand Down
Expand Up @@ -49,7 +49,7 @@ public class ClusterService extends AbstractService implements IClusterService {

private static final String[] actions = new String[] { ROUTE, PULL, PUSH, HEARTBEAT, PURGE_INCOMING, PURGE_OUTGOING,
PURGE_STATISTICS, SYNC_TRIGGERS, PURGE_DATA_GAPS, STAGE_MANAGEMENT, WATCHDOG, STATISTICS, FILE_SYNC_PULL,
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR, SYNC_CONFIG };
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR, SYNC_CONFIG };

private static final String[] sharedActions = new String[] { FILE_SYNC_SHARED };

Expand Down
Expand Up @@ -160,14 +160,24 @@ public void trackChanges(boolean force) {
}

protected void trackChanges(ProcessInfo processInfo, boolean useCrc) {
long ctxTime = engine.getContextService().getLong(ContextConstants.FILE_SYNC_FAST_SCAN_TRACK_TIME);
Date ctxDate = new Date(ctxTime);
if (ctxTime == 0) {
ctxDate = null;
}
Date currentDate = new Date();

List<FileTriggerRouter> fileTriggerRouters = getFileTriggerRoutersForCurrentNode();
for (FileTriggerRouter fileTriggerRouter : fileTriggerRouters) {
if (fileTriggerRouter.isEnabled()) {
try {
FileTrigger fileTrigger = fileTriggerRouter.getFileTrigger();
boolean ignoreFiles = shouldIgnoreInitialFiles(fileTriggerRouter, fileTrigger, ctxDate);
FileTriggerTracker tracker = new FileTriggerTracker(fileTriggerRouter, getDirectorySnapshot(fileTriggerRouter),
processInfo, useCrc, engine);
DirectorySnapshot dirSnapshot = tracker.trackChanges();
saveDirectorySnapshot(fileTriggerRouter, dirSnapshot);
saveDirectorySnapshot(fileTriggerRouter, dirSnapshot,ignoreFiles);
engine.getContextService().save(ContextConstants.FILE_SYNC_FAST_SCAN_TRACK_TIME, String.valueOf(currentDate.getTime()));
} catch (Exception ex) {
log.error("Failed to track changes for file trigger router: "
+ fileTriggerRouter.getFileTrigger().getTriggerId()
Expand All @@ -185,20 +195,20 @@ protected void trackChangesFastScan(ProcessInfo processInfo, boolean useCrc) {
}
Date currentDate = new Date();

boolean isLocked = engine.getClusterService().lock(ClusterConstants.FILE_SYNC_SCAN);
log.debug("File tracker range of " + ctxDate + " to " + currentDate + ", isLocked=" + isLocked);
int maxRowsBeforeCommit = engine.getParameterService().getInt(ParameterConstants.DATA_LOADER_MAX_ROWS_BEFORE_COMMIT);

try {
List<FileTriggerRouter> fileTriggerRouters = getFileTriggerRoutersForCurrentNode();
for (final FileTriggerRouter fileTriggerRouter : fileTriggerRouters) {
if (fileTriggerRouter.isEnabled()) {
if (fileTriggerRouter.isEnabled()) {
FileTrigger fileTrigger = fileTriggerRouter.getFileTrigger();
boolean ignoreFiles = shouldIgnoreInitialFiles(fileTriggerRouter, fileTrigger, ctxDate);
FileAlterationObserver observer = new FileAlterationObserver(fileTriggerRouter.getFileTrigger().getBaseDir(),
fileTriggerRouter.getFileTrigger().createIOFileFilter());
FileTriggerFileModifiedListener listener = new FileTriggerFileModifiedListener(fileTriggerRouter, ctxDate,
currentDate, processInfo, useCrc, new FileModifiedCallback(maxRowsBeforeCommit) {
public void commit(DirectorySnapshot dirSnapshot) {
saveDirectorySnapshot(fileTriggerRouter, dirSnapshot);
saveDirectorySnapshot(fileTriggerRouter, dirSnapshot, ignoreFiles);
}

public DirectorySnapshot getLastDirectorySnapshot(String relativeDir) {
Expand All @@ -210,13 +220,21 @@ public DirectorySnapshot getLastDirectorySnapshot(String relativeDir) {
engine.getContextService().save(ContextConstants.FILE_SYNC_FAST_SCAN_TRACK_TIME, String.valueOf(currentDate.getTime()));
}
}
engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_SCAN);
} catch (Exception ex) {
log.error("Failed to track changes", ex);
}
}

protected boolean shouldIgnoreInitialFiles(FileTriggerRouter router, FileTrigger trigger, Date contextDate) {
if (!router.isInitialLoadEnabled()) {
if (contextDate == null || router.getLastUpdateTime().after(contextDate) || trigger.getLastUpdateTime().after(contextDate)) {
return true;
}
}
return false;
}

protected long saveDirectorySnapshot(FileTriggerRouter fileTriggerRouter, DirectorySnapshot dirSnapshot) {
protected long saveDirectorySnapshot(FileTriggerRouter fileTriggerRouter, DirectorySnapshot dirSnapshot, boolean shouldIgnore) {
long totalBytes = 0;
for (FileSnapshot fileSnapshot : dirSnapshot) {
File file = fileTriggerRouter.getFileTrigger().createSourceFile(fileSnapshot);
Expand All @@ -235,7 +253,7 @@ protected long saveDirectorySnapshot(FileTriggerRouter fileTriggerRouter, Direct
log.debug("Captured change " + fileSnapshot);
totalBytes += fileSnapshot.getFileSize();
}
save(dirSnapshot);
save(dirSnapshot,shouldIgnore);
return totalBytes;
}

Expand Down Expand Up @@ -383,11 +401,14 @@ public DirectorySnapshot getDirectorySnapshot(FileTriggerRouter fileTriggerRoute
.getRouterId(), relativeDir));
}

public void save(List<FileSnapshot> changes) {
public void save(List<FileSnapshot> changes, boolean shouldIgnore) {
if (changes != null) {
ISqlTransaction sqlTransaction = null;
try {
sqlTransaction = sqlTemplate.startSqlTransaction();
if (shouldIgnore) {
engine.getSymmetricDialect().disableSyncTriggers(sqlTransaction, null);
}
for (FileSnapshot fileSnapshot : changes) {
save(sqlTransaction, fileSnapshot);
}
Expand All @@ -404,6 +425,9 @@ public void save(List<FileSnapshot> changes) {
}
throw ex;
} finally {
if (shouldIgnore && sqlTransaction != null) {
engine.getSymmetricDialect().enableSyncTriggers(sqlTransaction);
}
close(sqlTransaction);
}
}
Expand Down Expand Up @@ -1182,4 +1206,9 @@ public FileSnapshot mapRow(Row rs) {
return fileSnapshot;
}
}

@Override
public void save(List<FileSnapshot> changes) {
// TODO Auto-generated method stub
}
}

0 comments on commit ce2d32c

Please sign in to comment.