Skip to content

Commit

Permalink
0002330: Make channels push asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 6, 2015
1 parent 0a04d4a commit df48a90
Show file tree
Hide file tree
Showing 67 changed files with 2,524 additions and 1,227 deletions.
Expand Up @@ -45,14 +45,19 @@
import org.jumpmind.symmetric.route.DataGapRouteReader;
import org.jumpmind.symmetric.route.IDataToRouteReader;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPushService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.impl.DataLoaderService;
import org.jumpmind.symmetric.service.impl.ExtensionService;
import org.jumpmind.symmetric.service.impl.NodeCommunicationService;
import org.jumpmind.symmetric.service.impl.PushService;
import org.jumpmind.symmetric.service.impl.RouterService;
import org.jumpmind.util.CurrentThreadExecutor;

import android.content.Context;
import android.database.sqlite.SQLiteOpenHelper;
Expand Down Expand Up @@ -140,28 +145,30 @@ protected IExtensionService createExtensionService() {
protected IRouterService buildRouterService() {
return new AndroidRouterService(this);
}

class AndroidRouterService extends RouterService {

public AndroidRouterService(ISymmetricEngine engine) {
super(engine);
}

@Override
protected IDataToRouteReader startReading(ChannelRouterContext context) {
IDataToRouteReader reader = new DataGapRouteReader(context, engine);
// not going to read on a separate thread in android
reader.run();
return reader;
}


@Override
protected IPushService buildPushService() {
return new AndroidPushService(this);
}

@Override
protected IDataLoaderService buildDataLoaderService() {
return new AndroidDataLoaderService(this);
}

@Override
protected INodeCommunicationService buildNodeCommunicationService(IClusterService clusterService, INodeService nodeService,
IParameterService parameterService, ISymmetricDialect symmetricDialect) {
return new AndroidNodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect);
}

public File snapshot() {
return null;
}

public List<File> listSnapshots() {
return new ArrayList<File>(0);
}

class AndroidNodeCommunicationService extends NodeCommunicationService {

Expand Down Expand Up @@ -211,14 +218,47 @@ public int getAvailableThreads(CommunicationType communicationType) {
return 10;
}

}

class AndroidPushService extends PushService {
public AndroidPushService(ISymmetricEngine engine) {
super(engine);
}

@Override
public void start() {
nodeChannelExtractForPushWorker = new CurrentThreadExecutor();
nodeChannelTransportForPushWorker = new CurrentThreadExecutor();
}
}

public File snapshot() {
return null;
class AndroidDataLoaderService extends DataLoaderService {

public AndroidDataLoaderService(ISymmetricEngine engine) {
super(engine);
}

@Override
public void start() {
dataLoadWorkers = new CurrentThreadExecutor();
}

}

public List<File> listSnapshots() {
return new ArrayList<File>(0);
class AndroidRouterService extends RouterService {

public AndroidRouterService(ISymmetricEngine engine) {
super(engine);
}

@Override
protected IDataToRouteReader startReading(ChannelRouterContext context) {
IDataToRouteReader reader = new DataGapRouteReader(context, engine);
// not going to read on a separate thread in android
reader.run();
return reader;
}

}

}
Expand Up @@ -40,24 +40,24 @@ public PostgreSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
datetimeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US') || '\"' end" ;
timeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'HH24:MI:SS.US') || '\"' end" ;
dateTimeWithTimeZoneColumnTemplate =
"case when $(tableAlias).\"$(columnName)\" is null then '' else " +
" case " +
" when extract(timezone_hour from $(tableAlias).\"$(columnName)\") <= 0 and " +
" extract(timezone_minute from $(tableAlias).\"$(columnName)\") <= 0 then " +
" '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||'-'|| " +
" lpad(cast(abs(round(extract(timezone_hour from $(tableAlias).\"$(columnName)\"))) as varchar),2,'0')||':'|| " +
" lpad(cast(abs(round(extract(timezone_minute from $(tableAlias).\"$(columnName)\"))) as varchar), 2, '0') || '\"' " +
" when extract(timezone_hour from $(tableAlias).\"$(columnName)\") = 0 and " +
" extract(timezone_minute from $(tableAlias).\"$(columnName)\") >= 0 then " +
" '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||'+'|| " +
" lpad(cast(round(extract(timezone_hour from $(tableAlias).\"$(columnName)\")) as varchar),2,'0')||':'|| " +
" lpad(cast(round(extract(timezone_minute from $(tableAlias).\"$(columnName)\")) as varchar), 2, '0') || '\"' " +
" else " +
" '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||'+'|| " +
" lpad(cast(round(extract(timezone_hour from $(tableAlias).\"$(columnName)\")) as varchar),2,'0')||':'|| " +
" lpad(cast(round(extract(timezone_minute from $(tableAlias).\"$(columnName)\")) as varchar), 2, '0') || '\"' " +
" end " +
"end ";
"case when $(tableAlias).\"$(columnName)\" is null then '' else " +
" case " +
" when extract(timezone_hour from $(tableAlias).\"$(columnName)\") <= 0 and " +
" extract(timezone_minute from $(tableAlias).\"$(columnName)\") <= 0 then " +
" '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||'-'|| " +
" lpad(cast(abs(round(extract(timezone_hour from $(tableAlias).\"$(columnName)\"))) as varchar),2,'0')||':'|| " +
" lpad(cast(abs(round(extract(timezone_minute from $(tableAlias).\"$(columnName)\"))) as varchar), 2, '0') || '\"' " +
" when extract(timezone_hour from $(tableAlias).\"$(columnName)\") = 0 and " +
" extract(timezone_minute from $(tableAlias).\"$(columnName)\") >= 0 then " +
" '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||'+'|| " +
" lpad(cast(round(extract(timezone_hour from $(tableAlias).\"$(columnName)\")) as varchar),2,'0')||':'|| " +
" lpad(cast(round(extract(timezone_minute from $(tableAlias).\"$(columnName)\")) as varchar), 2, '0') || '\"' " +
" else " +
" '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||'+'|| " +
" lpad(cast(round(extract(timezone_hour from $(tableAlias).\"$(columnName)\")) as varchar),2,'0')||':'|| " +
" lpad(cast(round(extract(timezone_minute from $(tableAlias).\"$(columnName)\")) as varchar), 2, '0') || '\"' " +
" end " +
"end ";
clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace(cast($(tableAlias).\"$(columnName)\" as varchar),$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end" ;
blobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || pg_catalog.encode($(tableAlias).\"$(columnName)\", 'base64') || '\"' end" ;
wrappedBlobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || $(defaultSchema)$(prefixName)_largeobject($(tableAlias).\"$(columnName)\") || '\"' end" ;
Expand Down
Expand Up @@ -37,7 +37,7 @@ public PushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
@Override
public void doJob(boolean force) throws Exception {
if (engine != null) {
engine.getPushService().pushData(force).getDataProcessedCount();
engine.getPushService().push(force).getDataProcessedCount();
}
}

Expand Down
Expand Up @@ -24,16 +24,12 @@
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.text.DateFormat;
import java.text.NumberFormat;
import java.util.Date;
import java.util.Map;

import javax.sql.DataSource;

import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.security.SecurityConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
Expand All @@ -42,7 +38,6 @@
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.transport.ConcurrentConnectionManager.NodeConnectionStatistics;
import org.jumpmind.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -132,48 +127,7 @@ public void syncTriggers() {
+ " clients and 20 concurrent pull clients will be allowed")
public int getNumfNodeConnectionsPerInstance() {
return engine.getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS);
}

@ManagedAttribute(description = "Get connection statistics about indivdual nodes")
public String getNodeConcurrencyStatisticsAsText() {
String lineFeed = "\n";
if (engine.getParameterService().getString(ParameterConstants.JMX_LINE_FEED).equals("html")) {
lineFeed = "</br>";
}
Map<String, Map<String, NodeConnectionStatistics>> stats = engine
.getConcurrentConnectionManager().getNodeConnectionStatisticsByPoolByNodeId();
StringBuilder out = new StringBuilder();
for (String pool : stats.keySet()) {
out.append("-------------------------------------------------------------------------------------------------------------------------------");
out.append(lineFeed);
out.append(" CONNECTION TYPE: ");
out.append(pool);
out.append(lineFeed);
out.append("-------------------------------------------------------------------------------------------------------------------------------");
out.append(lineFeed);
out.append(" NODE ID LAST CONNECT TIME NUMBER OF CONNECTIONS NUMBER OF REJECTIONS AVG CONNECTED TIME");
out.append(lineFeed);
out.append("-------------------------------------------------------------------------------------------------------------------------------");
out.append(lineFeed);
Map<String, NodeConnectionStatistics> nodeStats = stats.get(pool);
for (String nodeId : nodeStats.keySet()) {
NodeConnectionStatistics nodeStat = nodeStats.get(nodeId);
out.append(StringUtils.leftPad(nodeId, 20));
out.append(StringUtils.leftPad(
DateFormat.getDateTimeInstance(DateFormat.MEDIUM, DateFormat.MEDIUM)
.format(new Date(nodeStat.getLastConnectionTimeMs())), 30));
out.append(StringUtils.leftPad(Long.toString(nodeStat.getTotalConnectionCount()),
27));
out.append(StringUtils.leftPad(Integer.toString(nodeStat.getNumOfRejections()), 25));
out.append(StringUtils.leftPad(
NumberFormat.getIntegerInstance().format(
nodeStat.getTotalConnectionTimeMs()
/ nodeStat.getTotalConnectionCount()), 25));
}
out.append(lineFeed);
}
return out.toString();
}
}

@ManagedOperation(description = "Clean up both incoming and outgoing resources that are older than the passed in number of milliseconds")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "timeToLiveInMS", description = "The number of milliseconds old a resource should be before it is cleaned up") })
Expand Down
Expand Up @@ -438,8 +438,7 @@ protected static void writeJobsStats(ISymmetricEngine engine, File tmpDir) {
IJobManager jobManager = engine.getJobManager();
IClusterService clusterService = engine.getClusterService();
INodeService nodeService = engine.getNodeService();
writer.write("Clustering is " + (clusterService.isClusteringEnabled() ? "" : "not ") +
"enabled and there are " + nodeService.findNodeHosts(nodeService.findIdentityNodeId()).size() +
writer.write("There are " + nodeService.findNodeHosts(nodeService.findIdentityNodeId()).size() +
" instances in the cluster\n\n");
writer.write(StringUtils.rightPad("Job Name", 30) + StringUtils.rightPad("Schedule", 20) +
StringUtils.rightPad("Status", 10) + StringUtils.rightPad("Server Id", 30) +
Expand Down
Expand Up @@ -147,7 +147,7 @@ public void exportTestDatabaseSQL() throws Exception {
export.setCompatible(Compatible.H2);
String output = export.exportTables(tables).toLowerCase();

Assert.assertEquals(output, 42, StringUtils.countMatches(output, "create table \"sym_"));
Assert.assertEquals(output, 43, StringUtils.countMatches(output, "create table \"sym_"));
final int EXPECTED_VARCHAR_MAX = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 264 : 43;
final String EXPECTED_STRING = "varchar(" + Integer.MAX_VALUE + ")";
Assert.assertEquals("Expected " + EXPECTED_VARCHAR_MAX + " " + EXPECTED_STRING
Expand Down
Expand Up @@ -317,12 +317,10 @@ protected void init() {
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService);
this.dataExtractorService = new DataExtractorService(this);
this.transportManager = new TransportManagerFactory(this).create();
this.dataLoaderService = new DataLoaderService(this);
this.dataLoaderService = buildDataLoaderService();
this.registrationService = new RegistrationService(this);
this.acknowledgeService = new AcknowledgeService(this);
this.pushService = new PushService(parameterService, symmetricDialect,
dataExtractorService, acknowledgeService, transportManager, nodeService,
clusterService, nodeCommunicationService, statisticManager, configurationService, extensionService);
this.pushService = buildPushService();
this.pullService = new PullService(parameterService, symmetricDialect,
nodeService, dataLoaderService, registrationService, clusterService, nodeCommunicationService,
configurationService, extensionService);
Expand All @@ -342,6 +340,14 @@ protected void init() {
}

}

protected IPushService buildPushService() {
return new PushService(this);
}

protected IDataLoaderService buildDataLoaderService() {
return new DataLoaderService(this);
}

protected IRouterService buildRouterService() {
return new RouterService(this);
Expand Down Expand Up @@ -559,6 +565,9 @@ public synchronized boolean start(boolean startJobs) {
log.info("Starting unregistered node [group={}, externalId={}]",
parameterService.getNodeGroupId(), parameterService.getExternalId());
}

pushService.start();
dataLoaderService.start();

if (startJobs && jobManager != null) {
jobManager.startJobs();
Expand Down Expand Up @@ -717,6 +726,14 @@ public synchronized void stop() {
nodeCommunicationService.stop();
}

if (pushService != null) {
pushService.stop();
}

if (dataLoaderService != null) {
dataLoaderService.stop();
}

if (statisticManager != null) {
List<ProcessInfo> infos = statisticManager.getProcessInfos();
for (ProcessInfo processInfo : infos) {
Expand Down Expand Up @@ -764,7 +781,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri

public RemoteNodeStatuses push() {
MDC.put("engineName", getEngineName());
return pushService.pushData(true);
return pushService.push(true);
}

public void syncTriggers() {
Expand Down
Expand Up @@ -115,7 +115,7 @@ public interface ISymmetricEngine {
/**
* Will perform a push the same way the {@link PushJob} would have.
*
* @see IPushService#pushData(boolean)
* @see IPushService#push(boolean)
* @return {@link RemoteNodeStatuses}
*/
public RemoteNodeStatuses push();
Expand Down
Expand Up @@ -168,6 +168,8 @@ private ParameterConstants() {

public final static String DATA_EXTRACTOR_ENABLED = "dataextractor.enable";
public final static String DATA_EXTRACTOR_TEXT_COLUMN_EXPRESSION = "dataextractor.text.column.expression";

@Deprecated
public final static String OUTGOING_BATCH_MAX_BATCHES_TO_SELECT = "outgoing.batches.max.to.select";

public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view";
Expand Down Expand Up @@ -220,7 +222,6 @@ private ParameterConstants() {
public final static String EXTERNAL_ID_IS_UNIQUE = "external.id.is.unique.enabled";

public final static String CLUSTER_SERVER_ID = "cluster.server.id";
public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled";
public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms";
public final static String LOCK_TIMEOUT_MS = "lock.timeout.ms";
public final static String LOCK_WAIT_RETRY_MILLIS = "lock.wait.retry.ms";
Expand Down

0 comments on commit df48a90

Please sign in to comment.