Skip to content

Commit

Permalink
still trying to figure out the best way to get the data from symmetri…
Browse files Browse the repository at this point in the history
…c into muleland
  • Loading branch information
knaas committed Mar 21, 2008
1 parent 0f5cf2b commit 82a3e5a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 43 deletions.
Expand Up @@ -20,6 +20,7 @@

package org.jumpmind.mule.transport.symmetric;

import java.util.List;
import java.util.Properties;

import org.apache.commons.logging.Log;
Expand All @@ -32,7 +33,9 @@
import org.jumpmind.symmetric.job.PullJob;
import org.jumpmind.symmetric.job.PurgeJob;
import org.jumpmind.symmetric.job.PushJob;
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IBootstrapService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataService;
Expand Down Expand Up @@ -75,6 +78,8 @@ public class SimpleSymmetricEngine {

private IDataExtractorService dataExtractorService;

private IAcknowledgeService acknowledgeService;

private boolean started = false;

private boolean starting = false;
Expand Down Expand Up @@ -108,6 +113,7 @@ private void init(ApplicationContext applicationContext) {
.getBean(Constants.PURGE_SERVICE);
dataService = (IDataService)applicationContext.getBean(Constants.DATA_SERVICE);
dataExtractorService = (IDataExtractorService)applicationContext.getBean(Constants.DATAEXTRACTOR_SERVICE);
acknowledgeService = (IAcknowledgeService)applicationContext.getBean(Constants.ACKNOWLEDGE_SERVICE);
dbDialect = (IDbDialect) applicationContext.getBean(Constants.DB_DIALECT);
logger.info("Initialized SymmetricDS externalId=" + runtimeConfig.getExternalId() + " version=" + Version.version() + " database="+dbDialect.getName());
}
Expand Down Expand Up @@ -238,15 +244,21 @@ public ApplicationContext getApplicationContext() {
return applicationContext;
}

public void extract(IExtractListener extractListener)
public boolean extract(IExtractListener extractListener)
{
try
{
dataExtractorService.extract(nodeService.findIdentity(), extractListener);
return dataExtractorService.extract(nodeService.findIdentity(), extractListener);
}
// why is extract throwing an Exception?
catch (Exception e)
{
throw new IllegalStateException(e);
}
}

public void acknowledge(List<BatchInfo> batches)
{
acknowledgeService.ack(batches);
}
}
Expand Up @@ -21,11 +21,12 @@
*/
public class SymmetricMessageDispatcher extends AbstractMessageDispatcher
{
protected final SymmetricConnector connector;

public SymmetricMessageDispatcher(OutboundEndpoint endpoint)
{
super(endpoint);

this.connector = (SymmetricConnector) endpoint.getConnector();
}

public void doConnect() throws Exception
Expand Down
Expand Up @@ -10,24 +10,27 @@

package org.jumpmind.mule.transport.symmetric;

import java.util.ArrayList;
import java.util.List;

import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.service.IExtractListener;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleMessage;
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.lifecycle.CreateException;
import org.mule.api.lifecycle.LifecycleException;
import org.mule.api.service.Service;
import org.mule.api.transport.Connector;
import org.mule.config.i18n.CoreMessages;
import org.mule.transport.AbstractPollingMessageReceiver;
import org.mule.transport.ConnectException;
import org.mule.transport.TransactedPollingMessageReceiver;

/**
* <code>SymmetricMessageReceiver</code> TODO document
*/
public class SymmetricMessageReceiver extends TransactedPollingMessageReceiver {
public class SymmetricMessageReceiver extends AbstractPollingMessageReceiver {

private SimpleSymmetricEngine engine;

Expand All @@ -40,7 +43,6 @@ public SymmetricMessageReceiver(Connector connector, Service service,
InboundEndpoint endpoint) throws CreateException {
super(connector, service, endpoint);
this.setFrequency(((SymmetricConnector) connector).getPollingFrequency());
this.setReceiveMessagesInTransaction(false);
}

public void doConnect() throws ConnectException {
Expand Down Expand Up @@ -83,45 +85,49 @@ public void doDispose() {
}

@Override
protected List getMessages() throws Exception
public void poll() throws Exception
{
if (engine != null) {
engine.extract(new IExtractListener() {

public void dataExtracted(Data data) throws Exception
{

}

public void done() throws Exception
{

}

public void endBatch(OutgoingBatch outgoingbatch) throws Exception
{

}

public void init() throws Exception
{

}

public void startBatch(OutgoingBatch outgoingbatch) throws Exception
{

}
});
}
return null;
}
engine.extract(new IExtractListener() {

@Override
protected void processMessage(Object message) throws Exception
{
// TODO Auto-generated method stub

private List<Data> datum;

public void dataExtracted(Data data) throws Exception
{
datum.add(data);
}

public void done() throws Exception
{

}

public void endBatch(OutgoingBatch outgoingbatch) throws Exception
{
final Object payload = connector.getMessageAdapter(datum).getPayload();
MuleMessage message = new DefaultMuleMessage(payload);
message.setProperty("symmetric.batchId", outgoingbatch.getBatchId());
message.setProperty("symmetric.batchType", outgoingbatch.getBatchType());
message.setProperty("symmetric.channelId", outgoingbatch.getChannelId());
message.setProperty("symmetric.createTime", outgoingbatch.getCreateTime());
message.setProperty("symmetric.nodeBatchId", outgoingbatch.getNodeBatchId());
message.setProperty("symmetric.nodeId", outgoingbatch.getNodeId());
message.setProperty("symmetric.status", outgoingbatch.getStatus());
message.setProperty("symmetric.batchInfoList", outgoingbatch.getBatchInfoList());
routeMessage(message, endpoint.isSynchronous());
engine.acknowledge(outgoingbatch.getBatchInfoList());
}

public void init() throws Exception
{

}

public void startBatch(OutgoingBatch outgoingbatch) throws Exception
{
datum = new ArrayList<Data>();
}
});
}


}

0 comments on commit 82a3e5a

Please sign in to comment.