Skip to content

Commit

Permalink
0002501: File sync performance on large file system
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 22, 2016
1 parent fa62470 commit 15fc410
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 90 deletions.
Expand Up @@ -280,6 +280,12 @@ private ParameterConstants() {
public final static String SYNCHRONIZE_ALL_JOBS = "jobs.synchronized.enable";

public final static String FILE_SYNC_ENABLE = "file.sync.enable";

public final static String FILE_SYNC_FAST_SCAN = "file.sync.fast.scan";

public final static String FILE_SYNC_USE_CRC = "file.sync.use.crc";

public final static String FILE_SYNC_PREVENT_PING_BACK = "file.sync.prevent.ping.back";

public final static String FILE_SYNC_LOCK_WAIT_MS = "file.sync.lock.wait.ms";

Expand Down
Expand Up @@ -68,7 +68,7 @@ protected void merge(DirectorySnapshot snapshot) {
this.addAll(toAdd);
}

protected DirectorySnapshot diff(DirectorySnapshot anotherSnapshot) {
public DirectorySnapshot diff(DirectorySnapshot anotherSnapshot) {
DirectorySnapshot differences = new DirectorySnapshot(anotherSnapshot.getFileTriggerRouter());
for (FileSnapshot anotherFile : anotherSnapshot) {
boolean found = false;
Expand Down
@@ -0,0 +1,204 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.file;

import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileTriggerFileModifiedListener extends FileAlterationListenerAdaptor {

final protected Logger log = LoggerFactory.getLogger(getClass());

protected FileTriggerRouter fileTriggerRouter;
protected DirectorySnapshot snapshot;
protected Date fromDate;
protected Date toDate;
protected FileModifiedCallback fileModifiedCallback;
protected ProcessInfo processInfo;
protected boolean isSyncOnCtlFile;
protected boolean useCrc;
protected Map<String, DirectorySnapshot> modifiedDirs = new HashMap<String, DirectorySnapshot>();

protected long startTime = System.currentTimeMillis();
protected long ts = startTime;
protected long fileCount = 0;
protected long changeCount = 0;

public FileTriggerFileModifiedListener(FileTriggerRouter fileTriggerRouter, Date fromDate, Date toDate, ProcessInfo processInfo,
boolean useCrc, FileModifiedCallback fileModifiedCallback) {
this.fileTriggerRouter = fileTriggerRouter;
this.snapshot = new DirectorySnapshot(fileTriggerRouter);
this.fromDate = fromDate;
this.toDate = toDate;
this.fileModifiedCallback = fileModifiedCallback;
this.processInfo = processInfo;
this.isSyncOnCtlFile = fileTriggerRouter.getFileTrigger().isSyncOnCtlFile();
this.useCrc = useCrc;
this.processInfo.setStatus(ProcessInfo.Status.PROCESSING);
}

public void onStart(final FileAlterationObserver observer) {
long lastModified = observer.getDirectory().lastModified();
if ((fromDate != null && lastModified > fromDate.getTime()) && lastModified <= toDate.getTime()) {
modifiedDirs.put(".", new DirectorySnapshot(fileTriggerRouter));
}
}

public void onFileCreate(File file) {
if (isSyncOnCtlFile) {
File ctlFile = new File(file.getAbsolutePath() + ".ctl");
if (ctlFile.exists()) {
log.debug("Control file detected: {}", file.getAbsolutePath());
addSnapshot(file, LastEventType.CREATE, false);
}
} else {
log.debug("File create detected: {}", file.getAbsolutePath());
addSnapshot(file, LastEventType.CREATE, false);
}
}

public void onDirectoryCreate(File directory) {
log.debug("File create detected: {}", directory.getAbsolutePath());
addSnapshot(directory, LastEventType.CREATE, true);
}

public void onStop(final FileAlterationObserver observer) {
if (snapshot.size() > 0) {
commit();
}

long scanTime = (System.currentTimeMillis() - startTime) / 1000;
long modifiedDirStartTime = System.currentTimeMillis();
long ts = modifiedDirStartTime;
int modifiedDirCount = 0;
int modifiedDirFileCount = 0;
int modifiedDirChangeCount = 0;

processInfo.setStatus(Status.QUERYING);
for (String relativeDir : modifiedDirs.keySet()) {
DirectorySnapshot lastSnapshot = fileModifiedCallback.getLastDirectorySnapshot(relativeDir);
DirectorySnapshot currentSnapshot = modifiedDirs.get(relativeDir);
modifiedDirFileCount += currentSnapshot.size();
DirectorySnapshot changesSinceLastSnapshot = lastSnapshot.diff(currentSnapshot);
processInfo.setCurrentDataCount(processInfo.getCurrentDataCount() + lastSnapshot.size() + currentSnapshot.size());
if (changesSinceLastSnapshot.size() > 0) {
modifiedDirCount++;
modifiedDirChangeCount += changesSinceLastSnapshot.size();
fileModifiedCallback.commit(changesSinceLastSnapshot);
}
if (System.currentTimeMillis() - ts > 60000) {
log.info("File tracker has been processing modified directories for {} seconds. The following stats have been gathered: {}",
new Object[] { (System.currentTimeMillis() - modifiedDirStartTime) / 1000,
"{ modifiedDirCount=" + modifiedDirCount + " of " + modifiedDirs.size() +
", modifiedDirFileCount=" + modifiedDirFileCount + ", modifiedDirChangeCount= " + modifiedDirChangeCount + " }" });
ts = System.currentTimeMillis();
}
}

if (changeCount > 0 || modifiedDirChangeCount > 0) {
String extra = ".";
if (modifiedDirChangeCount > 0) {
extra = String.format(", slow scan %d directories in %d seconds for %d changes.",
modifiedDirCount, (System.currentTimeMillis() - modifiedDirStartTime) / 1000, modifiedDirChangeCount);
}

log.info("File tracker fast scan {} files in {} seconds for {} changes" + extra,
new Object[] { fileCount, scanTime, changeCount });
}
}

protected void addSnapshot(File file, LastEventType lastEventType, boolean isDir) {
fileCount++;
processInfo.incrementCurrentDataCount();
FileSnapshot fileSnapshot = new FileSnapshot(fileTriggerRouter, file, lastEventType, useCrc);
DirectorySnapshot modifiedDir = modifiedDirs.get(fileSnapshot.getRelativeDir());

if (!isDir && modifiedDir != null) {
// This file belongs to a directory that had a file add/delete, so we will process the directory later
modifiedDir.add(fileSnapshot);
} else {
long lastModified = fileSnapshot.getFileModifiedTime();
if ((fromDate != null && lastModified > fromDate.getTime()) && lastModified <= toDate.getTime()) {
if (isDir) {
// This is a directory that had a file add/delete, so we'll need to look for deletes later
modifiedDirs.put(fileSnapshot.getRelativeDir() + "/" + fileSnapshot.getFileName(),
new DirectorySnapshot(fileTriggerRouter));
} else {
snapshot.add(fileSnapshot);
changeCount++;

if (snapshot.size() >= fileModifiedCallback.getCommitSize()) {
commit();
}
}
}
}

if (System.currentTimeMillis() - ts > 60000) {
log.info("File tracker has been processing for {} seconds. The following stats have been gathered: {}", new Object[] {
(System.currentTimeMillis() - startTime) / 1000,
"{ fileCount=" + fileCount + ", fileChangeCount=" + changeCount + " }" });
ts = System.currentTimeMillis();
}
}

protected void commit() {
fileModifiedCallback.commit(snapshot);
snapshot.clear();
}

public Map<String, DirectorySnapshot> getModifiedDirs() {
return modifiedDirs;
}

static public class FileModifiedCallback {

int commitSize;

public FileModifiedCallback(int commitSize) {
this.commitSize = commitSize;
}

public void commit(DirectorySnapshot dirSnapshot) {
}

public DirectorySnapshot getLastDirectorySnapshot(String relativeDir) {
return null;
}

public int getCommitSize() {
return commitSize;
}
}

}
Expand Up @@ -29,6 +29,7 @@
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +42,21 @@ public class FileTriggerTracker {
private DirectorySnapshot lastSnapshot;
private DirectorySnapshot changesSinceLastSnapshot;
private SnapshotUpdater currentListener;
private ProcessInfo processInfo;
protected boolean useCrc;

public FileTriggerTracker(FileTriggerRouter fileTriggerRouter, DirectorySnapshot lastSnapshot) {
long startTime = System.currentTimeMillis();
long ts = startTime;

public FileTriggerTracker(FileTriggerRouter fileTriggerRouter, DirectorySnapshot lastSnapshot, ProcessInfo processInfo,
boolean useCrc) {
this.fileTriggerRouter = fileTriggerRouter;
this.processInfo = processInfo;
this.useCrc = useCrc;

changesSinceLastSnapshot = new DirectorySnapshot(fileTriggerRouter);
fileObserver = new FileAlterationObserver(fileTriggerRouter.getFileTrigger().getBaseDir(),
fileTriggerRouter.getFileTrigger().createIOFileFilter());
fileObserver = new FileAlterationObserver(fileTriggerRouter.getFileTrigger().getBaseDir(), fileTriggerRouter.getFileTrigger()
.createIOFileFilter());
currentListener = new SnapshotUpdater(changesSinceLastSnapshot);
fileObserver.addListener(currentListener);
try {
Expand All @@ -66,7 +75,6 @@ public FileTriggerTracker(FileTriggerRouter fileTriggerRouter, DirectorySnapshot
} catch (Exception e) {
throw new RuntimeException(e);
}

}

protected void pollForChanges() {
Expand All @@ -84,14 +92,17 @@ synchronized public DirectorySnapshot trackChanges() {
fileObserver.removeListener(currentListener);
currentListener = newListener;
lastSnapshot.merge(changes);
long runTime = (System.currentTimeMillis() - startTime) / 1000;
if (changes.size() > 0) {
log.info("Tracked {} files in {} seconds. Found {} files changed.", new Object[] { lastSnapshot.size(), runTime, changes.size() });
}
return changes;
}

synchronized protected void takeFullSnapshot(DirectorySnapshot snapshot) {
// update the snapshot with every file in the directory spec
FileAlterationObserver observer = new FileAlterationObserver(fileTriggerRouter
.getFileTrigger().getBaseDir(), fileTriggerRouter.getFileTrigger()
.createIOFileFilter());
FileAlterationObserver observer = new FileAlterationObserver(fileTriggerRouter.getFileTrigger().getBaseDir(), fileTriggerRouter
.getFileTrigger().createIOFileFilter());
observer.addListener(new SnapshotUpdater(snapshot));
observer.checkAndNotify();
}
Expand All @@ -105,53 +116,59 @@ class SnapshotUpdater extends FileAlterationListenerAdaptor {
}

public void onFileDelete(File file) {
log.debug("File delete detected: {}", file.getAbsolutePath());
this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
LastEventType.DELETE));
log.debug("File delete detected: {}", file.getAbsolutePath());
addSnapshot(file, LastEventType.DELETE);
}

public void onFileCreate(File file) {
if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){
if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()) {
onCtlFile(file);
} else {
log.debug("File create detected: {}", file.getAbsolutePath());
this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
LastEventType.CREATE));
addSnapshot(file, LastEventType.CREATE);
}
}

public void onCtlFile(File file) {
if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){
File ctlFile = new File(file.getAbsolutePath() + ".ctl");
if (ctlFile.exists()) {
log.debug("Control file detected: {}", file.getAbsolutePath());
this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
LastEventType.CREATE));
}
}
}
if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()) {
File ctlFile = new File(file.getAbsolutePath() + ".ctl");
if (ctlFile.exists()) {
log.debug("Control file detected: {}", file.getAbsolutePath());
addSnapshot(file, LastEventType.CREATE);
}
}
}

public void onFileChange(File file) {
log.debug("File change detected: {}", file.getAbsolutePath());
this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
LastEventType.MODIFY));
log.debug("File change detected: {}", file.getAbsolutePath());
addSnapshot(file, LastEventType.MODIFY);
}

public void onDirectoryDelete(File directory) {
log.debug("File delete detected: {}", directory.getAbsolutePath());
this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), directory,
LastEventType.DELETE));
log.debug("File delete detected: {}", directory.getAbsolutePath());
addSnapshot(directory, LastEventType.DELETE);
}

public void onDirectoryCreate(File directory) {
log.debug("File create detected: {}", directory.getAbsolutePath());
this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), directory,
LastEventType.CREATE));
log.debug("File create detected: {}", directory.getAbsolutePath());
addSnapshot(directory, LastEventType.CREATE);
}

public void onDirectoryChange(File directory) {
}

protected void addSnapshot(File file, LastEventType lastEventType) {
if (processInfo != null) {
processInfo.incrementCurrentDataCount();
}
snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, lastEventType, useCrc));

if (System.currentTimeMillis() - ts > 60000) {
log.info("File tracker has been processing for {} seconds. The following stats have been gathered: {}", new Object[] {
(System.currentTimeMillis() - startTime) / 1000, "{ fileCount=" + snapshot.size() + " }" });
ts = System.currentTimeMillis();
}
}
}

}
Expand Up @@ -95,6 +95,10 @@ public FileSnapshot(FileSnapshot copy) {
}

public FileSnapshot(FileTriggerRouter fileTriggerRouter, File file, LastEventType lastEventType) {
this(fileTriggerRouter, file, lastEventType, false);
}

public FileSnapshot(FileTriggerRouter fileTriggerRouter, File file, LastEventType lastEventType, boolean useCrc) {
boolean isDelete = lastEventType == LastEventType.DELETE;
this.triggerId = fileTriggerRouter.getFileTrigger().getTriggerId();
this.channelId = fileTriggerRouter.getFileTrigger().getChannelId();
Expand Down Expand Up @@ -128,7 +132,7 @@ public FileSnapshot(FileTriggerRouter fileTriggerRouter, File file, LastEventTyp
this.fileSize = isDelete ? 0 : file.length();
this.fileModifiedTime = isDelete ? 0 : file.lastModified();

if (file.isFile() && !isDelete) {
if (useCrc && file.isFile() && !isDelete) {
try {
this.crc32Checksum = FileUtils.checksumCRC32(file);
} catch (FileNotFoundException ex) {
Expand Down

0 comments on commit 15fc410

Please sign in to comment.