Skip to content

Commit

Permalink
0003234: Ability to store additional statistics by channel
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Aug 30, 2017
1 parent 7c4d6bf commit eb02543
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 0 deletions.
Expand Up @@ -24,6 +24,7 @@

import java.io.File;
import java.io.StringReader;
import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,10 +58,17 @@
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.job.JobManager;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.IMonitorService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IStatisticService;
import org.jumpmind.symmetric.service.impl.ClientExtensionService;
import org.jumpmind.symmetric.service.impl.MonitorService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.statistic.StatisticManager;
import org.jumpmind.symmetric.util.LogSummaryAppenderUtils;
import org.jumpmind.symmetric.util.SnapshotUtil;
import org.jumpmind.symmetric.util.TypedPropertiesFactory;
Expand Down Expand Up @@ -351,6 +359,24 @@ protected IStagingManager createStagingManager() {
String directory = parameterService.getTempDirectory();
return new BatchStagingManager(this, directory);
}

@Override
protected IStatisticManager createStatisticManager() {
String statisticManagerClassName = parameterService.getString(ParameterConstants.STATISTIC_MANAGER_CLASS);
if (statisticManagerClassName != null) {
try {
Constructor<?> cons = Class.forName(statisticManagerClassName).getConstructor(IParameterService.class,
INodeService.class, IConfigurationService.class, IStatisticService.class, IClusterService.class);
return (IStatisticManager) cons.newInstance(parameterService, nodeService,
configurationService, statisticService, clusterService);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return new StatisticManager(parameterService, nodeService,
configurationService, statisticService, clusterService);
}

protected static void waitForAvailableDatabase(DataSource dataSource) {
boolean success = false;
Expand Down
Expand Up @@ -328,6 +328,7 @@ protected void init() {
this.statisticService = new StatisticService(parameterService, symmetricDialect);
this.statisticManager = new StatisticManager(parameterService, nodeService,
configurationService, statisticService, clusterService);
this.statisticManager = createStatisticManager();
this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService,
statisticManager);
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService,
Expand Down Expand Up @@ -398,6 +399,8 @@ protected INodeCommunicationService buildNodeCommunicationService(IClusterServic

abstract protected IStagingManager createStagingManager();

abstract protected IStatisticManager createStatisticManager();

abstract protected ISymmetricDialect createSymmetricDialect();

abstract protected IExtensionService createExtensionService();
Expand Down
Expand Up @@ -393,6 +393,8 @@ private ParameterConstants() {

public final static String LOG_CONFLICT_RESOLUTION = "log.conflict.resolution";

public final static String STATISTIC_MANAGER_CLASS = "statistic.manager.class";

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
}
Expand Down
Expand Up @@ -1073,6 +1073,9 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile());
targetResource.setState(State.DONE);
isRetry = true;

statisticManager.incrementDataSent(currentBatch.getChannelId(), currentBatch.getDataEventCount());
statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), extractedBatch.getFile().length());
} catch (Exception e) {
FileUtils.deleteQuietly(targetResource.getFile());
throw new RuntimeException(e);
Expand Down

0 comments on commit eb02543

Please sign in to comment.