Skip to content

Commit

Permalink
fix bug in transactional routing. allow jobs to be force run
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 25, 2012
1 parent 49fbca2 commit 74be750
Show file tree
Hide file tree
Showing 31 changed files with 179 additions and 170 deletions.
Expand Up @@ -328,13 +328,13 @@ private void runPurge(CommandLine line, List<String> args) {
IPurgeService purgeService = getSymmetricEngine().getPurgeService();
boolean all = args.contains("all") || args.size() == 0;
if (args.contains("outgoing") || all) {
purgeService.purgeOutgoing();
purgeService.purgeOutgoing(true);
}
if (args.contains("incoming") || all) {
purgeService.purgeIncoming();
purgeService.purgeIncoming(true);
}
if (args.contains("data-gaps") || all) {
purgeService.purgeDataGaps();
purgeService.purgeDataGaps(true);
}
}

Expand Down
Expand Up @@ -159,7 +159,7 @@ public boolean invoke(boolean force) {
.getRegistrationService()
.isRegisteredWithServer())) {
hasNotRegisteredMessageBeenLogged = false;
doJob();
doJob(force);
} else {
if (!hasNotRegisteredMessageBeenLogged) {
log.warn(
Expand Down Expand Up @@ -202,7 +202,7 @@ public void run() {
invoke(false);
}

abstract void doJob() throws Exception;
abstract void doJob(boolean force) throws Exception;

@ManagedOperation(description = "Pause this job")
public void pause() {
Expand Down
Expand Up @@ -36,8 +36,8 @@ public DataGapPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche
}

@Override
public void doJob() throws Exception {
engine.getPurgeService().purgeDataGaps();
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeDataGaps(force);
}

public String getClusterLockName() {
Expand Down
Expand Up @@ -36,7 +36,7 @@ public HeartbeatJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSchedul
}

@Override
public void doJob() throws Exception {
public void doJob(boolean force) throws Exception {
if (engine.getClusterService().lock(getClusterLockName())) {
try {
engine.getDataService().heartbeat(false);
Expand Down
Expand Up @@ -36,8 +36,8 @@ public IncomingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSch
}

@Override
public void doJob() throws Exception {
engine.getPurgeService().purgeIncoming();
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeIncoming(force);
}

public String getClusterLockName() {
Expand Down
Expand Up @@ -35,8 +35,8 @@ public OutgoingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSch
engine, taskScheduler);
}
@Override
public void doJob() throws Exception {
engine.getPurgeService().purgeOutgoing();
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeOutgoing(force);
}

public String getClusterLockName() {
Expand Down
Expand Up @@ -34,8 +34,8 @@ public PullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
}

@Override
public void doJob() throws Exception {
engine.getPullService().pullData();
public void doJob(boolean force) throws Exception {
engine.getPullService().pullData(force);
}

public String getClusterLockName() {
Expand Down
Expand Up @@ -36,9 +36,9 @@ public PushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
}

@Override
public void doJob() throws Exception {
public void doJob(boolean force) throws Exception {
if (engine != null) {
engine.getPushService().pushData().getDataProcessedCount();
engine.getPushService().pushData(force).getDataProcessedCount();
}
}

Expand Down
Expand Up @@ -36,8 +36,8 @@ public RouterJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler)
}

@Override
void doJob() throws Exception {
engine.getRouterService().routeData();
void doJob(boolean force) throws Exception {
engine.getRouterService().routeData(force);
}

public String getClusterLockName() {
Expand Down
Expand Up @@ -24,7 +24,7 @@ public boolean isClusterable() {
}

@Override
void doJob() throws Exception {
void doJob(boolean force) throws Exception {
if (stagingManager != null) {
stagingManager.clean();
}
Expand Down
Expand Up @@ -36,7 +36,7 @@ public StatisticFlushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSc
}

@Override
public void doJob() throws Exception {
public void doJob(boolean force) throws Exception {
engine.getStatisticManager().flush();
}

Expand Down
Expand Up @@ -36,7 +36,7 @@ public SyncTriggersJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche
}

@Override
public void doJob() throws Exception {
public void doJob(boolean force) throws Exception {
engine.getTriggerRouterService().syncTriggers();
}

Expand Down
Expand Up @@ -37,7 +37,7 @@ public WatchdogJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSchedule
}

@Override
public void doJob() throws Exception {
public void doJob(boolean force) throws Exception {
if (engine.getClusterService().lock(ClusterConstants.WATCHDOG)) {
synchronized (this) {
try {
Expand Down
Expand Up @@ -100,7 +100,7 @@ public void stop() {

@ManagedOperation(description = "Run the purge process")
public void purge() {
engine.getPurgeService().purgeOutgoing();
engine.getPurgeService().purgeOutgoing(true);
}

@ManagedOperation(description = "Force the channel settings to be read from the database")
Expand Down
Expand Up @@ -519,7 +519,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri

public RemoteNodeStatuses push() {
MDC.put("engineName", getEngineName());
return pushService.pushData();
return pushService.pushData(true);
}

public void syncTriggers() {
Expand All @@ -533,19 +533,19 @@ public NodeStatus getNodeStatus() {

public RemoteNodeStatuses pull() {
MDC.put("engineName", getEngineName());
return pullService.pullData();
return pullService.pullData(true);
}

public void route() {
MDC.put("engineName", getEngineName());
routerService.routeData();
routerService.routeData(true);
}

public void purge() {
MDC.put("engineName", getEngineName());
purgeService.purgeOutgoing();
purgeService.purgeIncoming();
purgeService.purgeDataGaps();
purgeService.purgeOutgoing(true);
purgeService.purgeIncoming(true);
purgeService.purgeDataGaps(true);
}

public boolean isConfigured() {
Expand Down Expand Up @@ -621,7 +621,7 @@ public boolean isConfigured() {

public void heartbeat(boolean force) {
MDC.put("engineName", getEngineName());
dataService.heartbeat(force);
dataService.heartbeat(true);
}

public void openRegistration(String nodeGroupId, String externalId) {
Expand Down
Expand Up @@ -104,7 +104,7 @@ public interface ISymmetricEngine {
/**
* Will perform a push the same way the {@link PushJob} would have.
*
* @see IPushService#pushData()
* @see IPushService#pushData(boolean)
* @return {@link RemoteNodeStatuses}
*/
public RemoteNodeStatuses push();
Expand All @@ -126,7 +126,7 @@ public interface ISymmetricEngine {
/**
* Will perform a pull the same way the {@link PullJob} would have.
*
* @see IPullService#pullData()
* @see IPullService#pullData(boolean)
* @return {@link RemoteNodeStatuses}
*/
public RemoteNodeStatuses pull();
Expand All @@ -140,7 +140,7 @@ public interface ISymmetricEngine {
* This can be called to do a purge. It may be called only if the
* {@link OutgoingPurgeJob} has not been enabled.
*
* @see IPurgeService#purgeOutgoing()
* @see IPurgeService#purgeOutgoing(boolean)
*/
public void purge();

Expand Down
Expand Up @@ -57,8 +57,7 @@ public class ChannelRouterContext extends SimpleRouterContext {
private ISqlTransaction sqlTransaction;
private boolean needsCommitted = false;
private long createdTimeInMs = System.currentTimeMillis();
private long lastDataIdProcessed;
private Map<String, Long> transactionIdDataIds = new HashMap<String, Long>();
private Data lastDataProcessed;
private List<DataEvent> dataEventsToSend = new ArrayList<DataEvent>();
private boolean produceCommonBatches = false;

Expand Down Expand Up @@ -109,7 +108,7 @@ public void rollback() {
try {
sqlTransaction.rollback();
} catch (SqlException e) {
log.warn(e.getMessage(),e);
log.warn(e.getMessage(), e);
} finally {
clearState();
}
Expand All @@ -119,7 +118,7 @@ public void cleanup() {
try {
this.sqlTransaction.commit();
} catch (Exception ex) {
log.warn(ex.getMessage(),ex);
log.warn(ex.getMessage(), ex);
} finally {
this.sqlTransaction.close();
}
Expand Down Expand Up @@ -149,35 +148,24 @@ public long getCreatedTimeInMs() {
return createdTimeInMs;
}

public void setLastDataIdForTransactionId(Data data) {
if (data.getTransactionId() != null) {
this.transactionIdDataIds.put(data.getTransactionId(), data.getDataId());
}
}

public void recordTransactionBoundaryEncountered(Data data) {
Long dataId = transactionIdDataIds.get(data.getTransactionId());
setEncountedTransactionBoundary(dataId == null ? true : dataId == data.getDataId());
public void setLastDataProcessed(Data lastDataProcessed) {
this.lastDataProcessed = lastDataProcessed;
}

public void setLastDataIdProcessed(long lastDataIdProcessed) {
this.lastDataIdProcessed = lastDataIdProcessed;
public Data getLastDataProcessed() {
return lastDataProcessed;
}

public long getLastDataIdProcessed() {
return lastDataIdProcessed;
}

public ISqlTransaction getSqlTransaction() {
return sqlTransaction;
}

public void setProduceCommonBatches(boolean defaultRoutersOnly) {
this.produceCommonBatches = defaultRoutersOnly;
}

public boolean isProduceCommonBatches() {
return produceCommonBatches;
}

}
Expand Up @@ -279,7 +279,6 @@ protected boolean fillPeekAheadQueue(List<Data> peekAheadQueue, int peekAheadCou
Data data = cursor.next();
if (data != null) {
if (process(data)) {
context.setLastDataIdForTransactionId(data);
peekAheadQueue.add(data);
dataCount++;
context.incrementStat(System.currentTimeMillis() - ts,
Expand Down
Expand Up @@ -31,6 +31,6 @@
*/
public interface IPullService extends IOfflineDetectorService {

public RemoteNodeStatuses pullData();
public RemoteNodeStatuses pullData(boolean force);

}
Expand Up @@ -30,17 +30,17 @@
*/
public interface IPurgeService {

public long purgeOutgoing();
public long purgeOutgoing(boolean force);

public long purgeIncoming();
public long purgeIncoming(boolean force);

public long purgeDataGaps();
public long purgeDataGaps(boolean force);

public long purgeDataGaps(Calendar retentionCutoff);
public long purgeDataGaps(Calendar retentionCutoff, boolean force);

public long purgeOutgoing(Calendar retentionCutoff);
public long purgeOutgoing(Calendar retentionCutoff, boolean force);

public long purgeIncoming(Calendar retentionCutoff);
public long purgeIncoming(Calendar retentionCutoff, boolean force);

public void purgeAllIncomingEventsForNode(String nodeId);

Expand Down
Expand Up @@ -37,10 +37,11 @@ public interface IPushService extends IOfflineDetectorService {
/**
* Attempt to push data, if any has been captured, to nodes that the
* captured data is targeted for.
* @param force TODO
*
* @return RemoteNodeStatuses the status of the push attempt(s)
*/
public RemoteNodeStatuses pushData();
public RemoteNodeStatuses pushData(boolean force);

public Map<String, Date> getStartTimesOfNodesBeingPushedTo();

Expand Down
Expand Up @@ -38,7 +38,7 @@
*/
public interface IRouterService extends IService {

public long routeData();
public long routeData(boolean force);

public long getUnroutedDataCount();

Expand Down
Expand Up @@ -285,7 +285,7 @@ public List<OutgoingBatch> extract(Node targetNode, IOutgoingTransport targetTra
// make sure that data is routed before extracting if the route
// job is not configured to start automatically
if (!parameterService.is(ParameterConstants.START_ROUTE_JOB)) {
routerService.routeData();
routerService.routeData(true);
}

OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode, false);
Expand Down
Expand Up @@ -229,6 +229,7 @@ public void run() {
boolean failed = false;
try {
executor.execute(nodeCommunication, status);
failed = status.failed();
} catch (Throwable ex) {
failed = true;
log.error(String.format("Failed to execute %s for node %s",
Expand Down

0 comments on commit 74be750

Please sign in to comment.