From 5ae58d039dfba2b9debcc8e42f4f328e34d5bdc2 Mon Sep 17 00:00:00 2001 From: Hitesh-Scorpio Date: Fri, 14 Oct 2016 20:45:38 +0530 Subject: [PATCH] APEXMALHAR-2291 Fix for Exactly-once processing of JdbcPOJOInsertOutput Operator. Added a check in endWindow() to not to commit if committed window id is greater than current window id. --- ...hruTransactionableStoreOutputOperator.java | 8 +- .../lib/db/jdbc/JdbcPojoOperatorTest.java | 92 +++++++++++++++++++ 2 files changed, 97 insertions(+), 3 deletions(-) diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java index b471a632ed..d21bd0154b 100644 --- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java @@ -49,9 +49,11 @@ public void beginWindow(long windowId) @Override public void endWindow() { - store.storeCommittedWindowId(appId, operatorId, currentWindowId); - store.commitTransaction(); - committedWindowId = currentWindowId; + if ( committedWindowId < currentWindowId ) { + store.storeCommittedWindowId(appId, operatorId, currentWindowId); + store.commitTransaction(); + committedWindowId = currentWindowId; + } super.endWindow(); } diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java index e6d8b42fe8..91cb2f2f5b 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java @@ -334,6 +334,98 @@ public void testJdbcPojoInsertOutputOperator() Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size()); } + /** + * This test will assume direct mapping for POJO fields to DB columns All + * fields in DB present in POJO and will test it for exactly once criteria + */ + @Test + public void testJdbcPojoInsertOutputOperatorExactlyOnce() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + CollectorTestSink errorSink = new CollectorTestSink<>(); + TestUtils.setSink(outputOperator.error, errorSink); + + outputOperator.activate(context); + + List events = Lists.newArrayList(); + for (int i = 0; i < 70; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + + outputOperator.beginWindow(0); + for (int i = 0; i < 10; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(1); + for (int i = 10; i < 20; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(2); + for (int i = 20; i < 30; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.setup(context); + outputOperator.input.setup(tpc); + outputOperator.activate(context); + + outputOperator.beginWindow(0); + for (int i = 30; i < 40; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(1); + for (int i = 40; i < 50; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(2); + for (int i = 50; i < 60; i++) { + outputOperator.input.process(events.get(i)); + } + + outputOperator.beginWindow(3); + for (int i = 60; i < 70; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.deactivate(); + outputOperator.teardown(); + + Assert.assertEquals("rows in db", 40, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + + } + + /** * This test will assume direct mapping for POJO fields to DB columns Nullable * DB field missing in POJO name1 field, which is nullable in DB is missing