Skip to content

Commit

Permalink
Added staged management job
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jan 28, 2012
1 parent dd1f3c2 commit 7a72d57
Show file tree
Hide file tree
Showing 25 changed files with 269 additions and 155 deletions.
6 changes: 5 additions & 1 deletion symmetric/symmetric-assemble/TODO.txt
Expand Up @@ -30,6 +30,7 @@ DONE = +
* verify stats capturing, outgoing batch updates
* unit test
* purge job for staging manager
* make sure memory buffer is cleared when state goes to done

* Hook up JMX

Expand All @@ -49,6 +50,8 @@ DONE = +
* SqlMap create constants for columns and table names
* SqlMap, TriggerTemplate format better

* Test timezone columns. Do other databases other than oracle and postgres have timezone columns?

Performance Improvement Opportunities
* Pluggable data loaders.
* Sync based on updated column values (timestamp or flag)
Expand All @@ -75,4 +78,5 @@ Documentation
* db.default.schema is no longer used
* No longer stop purge from running if there wasn't an initial load
* Extensions no longer have services injected into them. If they need acccess to services, they should implement ISymmetricEngineAware
* Node concurrency manager no longer allows the same node to make a second request while it already has a reservation
* Node concurrency manager no longer allows the same node to make a second request while it already has a reservation
* Added stage management job (that purges staged files)
Expand Up @@ -14,6 +14,7 @@ public OracleTriggerTemplate() {
arrayColumnTemplate = null;
numberColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number(30,10))||'\"')" ;
datetimeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF3')),'\"'))" ;
dateTimeWithTimeZoneTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')),'\"'))" ;
timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS')),'\"'))" ;
dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS')),'\"'))" ;
clobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ;
Expand Down
Expand Up @@ -15,6 +15,7 @@ public PostgreSqlTriggerTemplate() {
arrayColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace(cast($(tableAlias).\"$(columnName)\" as varchar),$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end" ;
numberColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || cast($(tableAlias).\"$(columnName)\" as varchar) || '\"' end" ;
datetimeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US') || '\"' end" ;
dateTimeWithTimeZoneTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||lpad(cast(extract(timezone_hour from $(tableAlias).\"$(columnName)\") as varchar),2,'0')||':'||lpad(cast(extract(timezone_minute from $(tableAlias).\"$(columnName)\") as varchar), 2, '0') || '\"' end" ;
timeColumnTemplate = null;
dateColumnTemplate = null;
clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace($(tableAlias).\"$(columnName)\",$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end" ;
Expand Down
Expand Up @@ -610,6 +610,10 @@ public ITransportManager getTransportManager() {
public IExtensionPointManager getExtensionPointManager() {
return extensionPointManger;
}

public IStagingManager getStagingManager() {
return stagingManager;
}

private void removeMeFromMap(Map<String, ISymmetricEngine> map) {
Set<String> keys = new HashSet<String>(map.keySet());
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.jumpmind.symmetric.common.DeploymentType;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IExtensionPointManager;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.job.OutgoingPurgeJob;
import org.jumpmind.symmetric.job.PullJob;
Expand Down Expand Up @@ -256,4 +257,6 @@ public interface ISymmetricEngine {

public IExtensionPointManager getExtensionPointManager();

public IStagingManager getStagingManager();

}
Expand Down
Expand Up @@ -77,6 +77,8 @@ public class TriggerTemplate {
protected String timeColumnTemplate;

protected String dateColumnTemplate;

protected String dateTimeWithTimeZoneTemplate;

protected String clobColumnTemplate;

Expand Down Expand Up @@ -625,6 +627,11 @@ protected ColumnString buildColumnString(String origTableAlias, String tableAlia
case Types.OTHER:
templateToUse = this.otherColumnTemplate;
break;
case -101:
if (StringUtils.isNotBlank(this.dateTimeWithTimeZoneTemplate)) {
templateToUse = this.dateTimeWithTimeZoneTemplate;
break;
}
case Types.JAVA_OBJECT:
case Types.DISTINCT:
case Types.STRUCT:
Expand Down
Expand Up @@ -56,6 +56,7 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new SyncTriggersJob(engine, taskScheduler));
this.jobs.add(new HeartbeatJob(engine, taskScheduler));
this.jobs.add(new WatchdogJob(engine, taskScheduler));
this.jobs.add(new StageManagementJob(engine, taskScheduler, engine.getStagingManager()));

}

Expand Down
@@ -0,0 +1,40 @@
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class StageManagementJob extends AbstractJob {

private IStagingManager stagingManager;

public StageManagementJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler,
IStagingManager stagingManager) {
super("job.stage.management", true, engine.getParameterService().is(
"start.stage.management.job"), engine, taskScheduler);
}

public String getClusterLockName() {
return ClusterConstants.STAGE_MANAGEMENT;
}

public boolean isClusterable() {
return true;
}

@Override
long doJob() throws Exception {
if (stagingManager != null) {
long cleanupCount = stagingManager.clean();

// TODO it would be a nice feature to be able to import from an
// upload/import directory any files that are dropped there.

return cleanupCount;
} else {
return 0;
}
}

}
Expand Up @@ -21,10 +21,11 @@
package org.jumpmind.symmetric.service;

/**
*
* Names for jobs as locked by the {@link IClusterService}
*/
public class ClusterConstants {


public static final String STAGE_MANAGEMENT = "STAGE_MANAGEMENT";
public static final String ROUTE = "ROUTE";
public static final String PUSH = "PUSH";
public static final String PULL = "PULL";
Expand Down

This file was deleted.

0 comments on commit 7a72d57

Please sign in to comment.