diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java index 67ae9bf5a3..6dc61a5884 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java @@ -36,9 +36,9 @@ public interface IIncomingBatchService { public List findIncomingBatchErrors(int maxRows); - public boolean acquireIncomingBatch(IncomingBatch status); + public boolean acquireIncomingBatch(IncomingBatch batch); - public void insertIncomingBatch(IncomingBatch status); + public void insertIncomingBatch(IncomingBatch batch); public int updateIncomingBatch(JdbcTemplate jdbcTemplate, IncomingBatch batch); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 5ef42dfd0b..d93cf1e5eb 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -111,12 +111,12 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) { outgoingBatch.getFailedDataId(), outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId() }, - new int[] { Types.CHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, - Types.INTEGER, Types.INTEGER, Types.VARCHAR, Types.INTEGER, - Types.VARCHAR, Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP, + new int[] { Types.CHAR, Types.INTEGER, Types.INTEGER, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.INTEGER, + Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER }); } diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml index 9845bf93e0..050eb466e8 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml @@ -135,8 +135,6 @@ - - --> @@ -329,25 +327,25 @@ - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + - + @@ -368,16 +366,16 @@ - - - - - - - - - - + + + + + + + + + + @@ -396,4 +394,74 @@
+ + + + + + + + + + + + + + +
+ + + + + + + + +
+ + + + + + + +
+ + + + + + + + +
+ + + + + + + + + +
+ + + + + + + + + +
+ + + + + + + +
+ \ No newline at end of file diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceTest.java new file mode 100644 index 0000000000..1da27f38e2 --- /dev/null +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.jumpmind.symmetric.service.impl; + +import java.util.List; + +import junit.framework.Assert; + +import org.jumpmind.symmetric.model.IncomingBatch; +import org.jumpmind.symmetric.model.IncomingBatch.Status; +import org.jumpmind.symmetric.test.AbstractDatabaseTest; +import org.jumpmind.symmetric.test.TestConstants; +import org.junit.Test; + +public class IncomingBatchServiceTest extends AbstractDatabaseTest { + + + public IncomingBatchServiceTest() throws Exception { + super(); + } + + @Test + public void testInsertAndUpdateIncomingBatchMaxSize() { + IncomingBatch batch = new IncomingBatch(); + batch.setStatus(Status.ER); + batch.setNodeId("XXXXX"); + batch.setChannelId(TestConstants.TEST_CHANNEL_ID); + batch.setByteCount(Long.MAX_VALUE); + batch.setDatabaseMillis(Long.MAX_VALUE); + batch.setFailedRowNumber(Long.MAX_VALUE); + batch.setFallbackInsertCount(Long.MAX_VALUE); + batch.setFilterMillis(Long.MAX_VALUE); + batch.setMissingDeleteCount(Long.MAX_VALUE); + batch.setNetworkMillis(Long.MAX_VALUE); + batch.setSkipCount(Long.MAX_VALUE); + batch.setStatementCount(Long.MAX_VALUE); + getIncomingBatchService().insertIncomingBatch(batch); + List batches = getIncomingBatchService().findIncomingBatchErrors(1); + Assert.assertEquals(1, batches.size()); + batch.setBatchId(batches.get(0).getBatchId()); + batch.setStatus(Status.OK); + getIncomingBatchService().updateIncomingBatch(batch); + } + + +} \ No newline at end of file diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java index aed57458b4..d9f32dacdc 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java @@ -21,32 +21,34 @@ package org.jumpmind.symmetric.service.impl; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.Set; - -import junit.framework.Assert; - -import org.jumpmind.symmetric.common.Constants; -import org.jumpmind.symmetric.model.Data; -import org.jumpmind.symmetric.model.DataEventType; -import org.jumpmind.symmetric.model.NodeChannel; -import org.jumpmind.symmetric.model.OutgoingBatch; -import org.jumpmind.symmetric.model.OutgoingBatches; -import org.jumpmind.symmetric.model.TriggerHistory; -import org.jumpmind.symmetric.service.IDataService; -import org.jumpmind.symmetric.service.IOutgoingBatchService; -import org.jumpmind.symmetric.test.AbstractDatabaseTest; -import org.jumpmind.symmetric.test.TestConstants; -import org.junit.Before; -import org.junit.Test; -import org.springframework.dao.DataAccessException; -import org.springframework.jdbc.core.ConnectionCallback; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.Set; + +import junit.framework.Assert; + +import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.DataEventType; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.OutgoingBatch.Status; +import org.jumpmind.symmetric.model.OutgoingBatches; +import org.jumpmind.symmetric.model.TriggerHistory; +import org.jumpmind.symmetric.service.IDataService; +import org.jumpmind.symmetric.service.IOutgoingBatchService; +import org.jumpmind.symmetric.test.AbstractDatabaseTest; +import org.jumpmind.symmetric.test.TestConstants; +import org.junit.Before; +import org.junit.Test; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.ConnectionCallback; /** * @@ -137,6 +139,36 @@ public void testChannelCachingLastExtracted() { Assert.assertEquals(formatter.format(halfHourAgo.getTime()), formatter.format(nodeChannel .getLastExtractedTime())); + } + + @Test + public void testInsertOutgoingBatchMaxSize() { + OutgoingBatch batch = new OutgoingBatch(); + batch.setStatus(Status.NE); + batch.setNodeId("XXXXX"); + batch.setChannelId(TestConstants.TEST_CHANNEL_ID); + batch.setByteCount(Long.MAX_VALUE); + batch.setDataEventCount(Long.MAX_VALUE); + batch.setDeleteEventCount(Long.MAX_VALUE); + batch.setFailedDataId(Long.MAX_VALUE); + batch.setExtractCount(Long.MAX_VALUE); + batch.setExtractMillis(Long.MAX_VALUE); + batch.setFilterMillis(Long.MAX_VALUE); + batch.setInsertEventCount(Long.MAX_VALUE); + batch.setLoadCount(Long.MAX_VALUE); + batch.setLoadMillis(Long.MAX_VALUE); + batch.setNetworkMillis(Long.MAX_VALUE); + batch.setOtherEventCount(Long.MAX_VALUE); + batch.setReloadEventCount(Long.MAX_VALUE); + batch.setRouterMillis(Long.MAX_VALUE); + batch.setUpdateEventCount(Long.MAX_VALUE); + batch.setSentCount(Long.MAX_VALUE); + getOutgoingBatchService().insertOutgoingBatch(batch); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(new Node("XXXXX", TestConstants.TEST_ROOT_NODE_GROUP)); + Assert.assertEquals(1, batches.getBatches().size()); + batch.setBatchId(batches.getBatches().get(0).getBatchId()); + batch.setStatus(Status.OK); + getOutgoingBatchService().updateOutgoingBatch(batch); } @Test @@ -262,7 +294,7 @@ protected void createDataEvent(String tableName, int triggerHistoryId, String ch history.setTriggerHistoryId(triggerHistoryId); Data data = new Data(tableName, type, "r.o.w., dat-a", "p-k d.a.t.a", history, channelId, null, null); dataService.insertDataAndDataEventAndOutgoingBatch(data, nodeId, "", false); - } + } protected int getBatchSize(final long batchId) { return (Integer) getJdbcTemplate().execute(new ConnectionCallback() { diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java index ad261feb56..8ee760d1ce 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java @@ -31,6 +31,7 @@ import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataService; +import org.jumpmind.symmetric.service.IIncomingBatchService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IParameterService; @@ -120,6 +121,10 @@ protected ITriggerRouterService getTriggerRouterService() { protected IOutgoingBatchService getOutgoingBatchService() { return AppUtils.find(Constants.OUTGOING_BATCH_SERVICE, getSymmetricEngine()); + } + + protected IIncomingBatchService getIncomingBatchService() { + return AppUtils.find(Constants.INCOMING_BATCH_SERVICE, getSymmetricEngine()); } protected DataSource getDataSource() {