From 82a3e5a935fd07c54c93a488d63aeaddf754e2b4 Mon Sep 17 00:00:00 2001 From: knaas Date: Fri, 21 Mar 2008 19:37:40 +0000 Subject: [PATCH] still trying to figure out the best way to get the data from symmetric into muleland --- .../symmetric/SimpleSymmetricEngine.java | 16 +++- .../symmetric/SymmetricMessageDispatcher.java | 3 +- .../symmetric/SymmetricMessageReceiver.java | 86 ++++++++++--------- 3 files changed, 62 insertions(+), 43 deletions(-) diff --git a/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SimpleSymmetricEngine.java b/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SimpleSymmetricEngine.java index a0376fb6eb..1fbba16381 100644 --- a/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SimpleSymmetricEngine.java +++ b/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SimpleSymmetricEngine.java @@ -20,6 +20,7 @@ package org.jumpmind.mule.transport.symmetric; +import java.util.List; import java.util.Properties; import org.apache.commons.logging.Log; @@ -32,7 +33,9 @@ import org.jumpmind.symmetric.job.PullJob; import org.jumpmind.symmetric.job.PurgeJob; import org.jumpmind.symmetric.job.PushJob; +import org.jumpmind.symmetric.model.BatchInfo; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IBootstrapService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataService; @@ -75,6 +78,8 @@ public class SimpleSymmetricEngine { private IDataExtractorService dataExtractorService; + private IAcknowledgeService acknowledgeService; + private boolean started = false; private boolean starting = false; @@ -108,6 +113,7 @@ private void init(ApplicationContext applicationContext) { .getBean(Constants.PURGE_SERVICE); dataService = (IDataService)applicationContext.getBean(Constants.DATA_SERVICE); dataExtractorService = (IDataExtractorService)applicationContext.getBean(Constants.DATAEXTRACTOR_SERVICE); + acknowledgeService = (IAcknowledgeService)applicationContext.getBean(Constants.ACKNOWLEDGE_SERVICE); dbDialect = (IDbDialect) applicationContext.getBean(Constants.DB_DIALECT); logger.info("Initialized SymmetricDS externalId=" + runtimeConfig.getExternalId() + " version=" + Version.version() + " database="+dbDialect.getName()); } @@ -238,15 +244,21 @@ public ApplicationContext getApplicationContext() { return applicationContext; } - public void extract(IExtractListener extractListener) + public boolean extract(IExtractListener extractListener) { try { - dataExtractorService.extract(nodeService.findIdentity(), extractListener); + return dataExtractorService.extract(nodeService.findIdentity(), extractListener); } + // why is extract throwing an Exception? catch (Exception e) { throw new IllegalStateException(e); } } + + public void acknowledge(List batches) + { + acknowledgeService.ack(batches); + } } diff --git a/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageDispatcher.java b/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageDispatcher.java index 02c1ea2e9a..12586e6559 100644 --- a/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageDispatcher.java +++ b/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageDispatcher.java @@ -21,11 +21,12 @@ */ public class SymmetricMessageDispatcher extends AbstractMessageDispatcher { + protected final SymmetricConnector connector; public SymmetricMessageDispatcher(OutboundEndpoint endpoint) { super(endpoint); - + this.connector = (SymmetricConnector) endpoint.getConnector(); } public void doConnect() throws Exception diff --git a/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageReceiver.java b/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageReceiver.java index 9f27978008..ebcbc8026f 100644 --- a/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageReceiver.java +++ b/mule-transport-symmetric/src/main/java/org/jumpmind/mule/transport/symmetric/SymmetricMessageReceiver.java @@ -10,24 +10,27 @@ package org.jumpmind.mule.transport.symmetric; +import java.util.ArrayList; import java.util.List; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.service.IExtractListener; +import org.mule.DefaultMuleMessage; +import org.mule.api.MuleMessage; import org.mule.api.endpoint.InboundEndpoint; import org.mule.api.lifecycle.CreateException; import org.mule.api.lifecycle.LifecycleException; import org.mule.api.service.Service; import org.mule.api.transport.Connector; import org.mule.config.i18n.CoreMessages; +import org.mule.transport.AbstractPollingMessageReceiver; import org.mule.transport.ConnectException; -import org.mule.transport.TransactedPollingMessageReceiver; /** * SymmetricMessageReceiver TODO document */ -public class SymmetricMessageReceiver extends TransactedPollingMessageReceiver { +public class SymmetricMessageReceiver extends AbstractPollingMessageReceiver { private SimpleSymmetricEngine engine; @@ -40,7 +43,6 @@ public SymmetricMessageReceiver(Connector connector, Service service, InboundEndpoint endpoint) throws CreateException { super(connector, service, endpoint); this.setFrequency(((SymmetricConnector) connector).getPollingFrequency()); - this.setReceiveMessagesInTransaction(false); } public void doConnect() throws ConnectException { @@ -83,45 +85,49 @@ public void doDispose() { } @Override - protected List getMessages() throws Exception + public void poll() throws Exception { - if (engine != null) { - engine.extract(new IExtractListener() { - - public void dataExtracted(Data data) throws Exception - { - - } - - public void done() throws Exception - { - - } - - public void endBatch(OutgoingBatch outgoingbatch) throws Exception - { - - } - - public void init() throws Exception - { - - } - - public void startBatch(OutgoingBatch outgoingbatch) throws Exception - { - - } - }); - } - return null; - } + engine.extract(new IExtractListener() { - @Override - protected void processMessage(Object message) throws Exception - { - // TODO Auto-generated method stub - + private List datum; + + public void dataExtracted(Data data) throws Exception + { + datum.add(data); + } + + public void done() throws Exception + { + + } + + public void endBatch(OutgoingBatch outgoingbatch) throws Exception + { + final Object payload = connector.getMessageAdapter(datum).getPayload(); + MuleMessage message = new DefaultMuleMessage(payload); + message.setProperty("symmetric.batchId", outgoingbatch.getBatchId()); + message.setProperty("symmetric.batchType", outgoingbatch.getBatchType()); + message.setProperty("symmetric.channelId", outgoingbatch.getChannelId()); + message.setProperty("symmetric.createTime", outgoingbatch.getCreateTime()); + message.setProperty("symmetric.nodeBatchId", outgoingbatch.getNodeBatchId()); + message.setProperty("symmetric.nodeId", outgoingbatch.getNodeId()); + message.setProperty("symmetric.status", outgoingbatch.getStatus()); + message.setProperty("symmetric.batchInfoList", outgoingbatch.getBatchInfoList()); + routeMessage(message, endpoint.isSynchronous()); + engine.acknowledge(outgoingbatch.getBatchInfoList()); + } + + public void init() throws Exception + { + + } + + public void startBatch(OutgoingBatch outgoingbatch) throws Exception + { + datum = new ArrayList(); + } + }); } + }