Skip to content

Commit

Permalink
0002876: FileSync initial load batches should respect max batch size of
Browse files Browse the repository at this point in the history
the filesync_reload channel
  • Loading branch information
mmichalek committed Oct 22, 2016
1 parent 28cb5ad commit 9b1e5c5
Show file tree
Hide file tree
Showing 14 changed files with 729 additions and 399 deletions.
Expand Up @@ -43,6 +43,7 @@ public String getClusterLockName() {

@Override
void doJob(boolean force) throws Exception {
engine.getFileSyncExtractorService().queueWork(force);
engine.getDataExtractorService().queueWork(force);
}

Expand Down
Expand Up @@ -100,6 +100,7 @@
import org.jumpmind.symmetric.service.impl.DataExtractorService;
import org.jumpmind.symmetric.service.impl.DataLoaderService;
import org.jumpmind.symmetric.service.impl.DataService;
import org.jumpmind.symmetric.service.impl.FileSyncExtractorService;
import org.jumpmind.symmetric.service.impl.FileSyncService;
import org.jumpmind.symmetric.service.impl.GroupletService;
import org.jumpmind.symmetric.service.impl.IncomingBatchService;
Expand Down Expand Up @@ -190,6 +191,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {
protected IRouterService routerService;

protected IDataExtractorService dataExtractorService;

protected IDataExtractorService fileSyncExtractorService;

protected IRegistrationService registrationService;

Expand Down Expand Up @@ -348,6 +351,7 @@ protected void init() {
nodeService, dataLoaderService, clusterService, nodeCommunicationService,
configurationService, extensionService, offlineTransportManager);
this.fileSyncService = buildFileSyncService();
this.fileSyncExtractorService = new FileSyncExtractorService(this);
this.mailService = new MailService(parameterService, symmetricDialect);
this.contextService = new ContextService(parameterService, symmetricDialect);

Expand Down Expand Up @@ -961,6 +965,10 @@ public IBandwidthService getBandwidthService() {
public IDataExtractorService getDataExtractorService() {
return this.dataExtractorService;
}

public IDataExtractorService getFileSyncExtractorService() {
return this.fileSyncExtractorService;
}

public IDataLoaderService getDataLoaderService() {
return this.dataLoaderService;
Expand Down
Expand Up @@ -250,6 +250,8 @@ public interface ISymmetricEngine {

public IDataExtractorService getDataExtractorService();

public IDataExtractorService getFileSyncExtractorService();

public IDataLoaderService getDataLoaderService();

public IIncomingBatchService getIncomingBatchService();
Expand Down
Expand Up @@ -81,7 +81,9 @@ public void open(DataContext context) {
this.context = context;
}

@Override
public void close() {
// no-op as this is called at batch boundaries, but this writer can handle multiple batches.
}

public Map<Batch, Statistics> getStatistics() {
Expand Down
Expand Up @@ -28,7 +28,7 @@ public class NodeCommunication implements Serializable {
private static final long serialVersionUID = 1L;

public enum CommunicationType {
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT;
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_EXTRACT;

public static boolean isPullType(CommunicationType communicationType) {
return communicationType == PULL || communicationType == CommunicationType.FILE_PULL || communicationType == OFFLN_PULL;
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB;
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB;

public String toString() {
switch (this) {
Expand Down Expand Up @@ -69,6 +69,8 @@ public String toString() {
return "Inserting Load Events";
case INITIAL_LOAD_EXTRACT_JOB:
return "Initial Load Extractor";
case FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB:
return "File Sync Initial Load Extractor";
default:
return name();
}
Expand Down
Expand Up @@ -80,5 +80,7 @@ public interface IFileSyncService {
public boolean refreshFromDatabase();

public File getControleFile(File file);

public Object[] getStagingPathComponents(OutgoingBatch fileSyncBatch);

}

0 comments on commit 9b1e5c5

Please sign in to comment.