From d0394038710975cc4153c803c4ab780b14aeb721 Mon Sep 17 00:00:00 2001 From: Daniel Chen Date: Fri, 23 Aug 2019 11:11:10 -0700 Subject: [PATCH 1/2] SAMZA-2309: Remove readFn requirement for remote tables --- .../apache/samza/table/descriptors/RemoteTableDescriptor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java index b0590c5f15..6461639614 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java @@ -77,7 +77,7 @@ public class RemoteTableDescriptor extends BaseTableDescriptor readFn; // Output support for a specific remote store (optional) @@ -86,6 +86,7 @@ public class RemoteTableDescriptor extends BaseTableDescriptor toConfig(Config jobConfig) { @Override protected void validate() { - Preconditions.checkNotNull(readFn, "TableReadFunction is required."); Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(), "Only one of rateLimiter instance or read/write limits can be specified"); // Assume callback executor pool should have no more than 20 threads From f416bb89358b499a732f6ff21abfe7f9f9a463b3 Mon Sep 17 00:00:00 2001 From: Daniel Chen Date: Fri, 23 Aug 2019 13:27:22 -0700 Subject: [PATCH 2/2] updated remotetable and asyncremotetable --- .../descriptors/RemoteTableDescriptor.java | 8 +++-- .../samza/table/remote/AsyncRemoteTable.java | 10 ++++-- .../samza/table/remote/RemoteTable.java | 7 ++-- .../table/remote/TestAsyncRemoteTable.java | 4 +-- .../samza/table/remote/TestRemoteTable.java | 15 ++++++++- .../TestRemoteTableDescriptor.java | 33 ++++++++++++++++++- 6 files changed, 67 insertions(+), 10 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java index 6461639614..3eed914b6a 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java @@ -328,8 +328,10 @@ public Map toConfig(Config jobConfig) { addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig); // Handle table reader function - addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig); - addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig); + if (readFn != null) { + addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig); + addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig); + } // Handle table write function if (writeFn != null) { @@ -346,6 +348,8 @@ public Map toConfig(Config jobConfig) { @Override protected void validate() { + Preconditions.checkArgument(writeFn != null || readFn != null, + "Must have one of TableReadFunction or TableWriteFunction"); Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(), "Only one of rateLimiter instance or read/write limits can be specified"); // Assume callback executor pool should have no more than 20 threads diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java index 4b1851b9db..ebe48584c1 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java @@ -42,13 +42,15 @@ public class AsyncRemoteTable implements AsyncReadWriteTable { private final TableWriteFunction writeFn; public AsyncRemoteTable(TableReadFunction readFn, TableWriteFunction writeFn) { - Preconditions.checkNotNull(readFn, "null readFn"); + Preconditions.checkArgument(writeFn != null || readFn != null, + "Must have one of TableReadFunction or TableWriteFunction"); this.readFn = readFn; this.writeFn = writeFn; } @Override public CompletableFuture getAsync(K key, Object ... args) { + Preconditions.checkNotNull(readFn, "null readFn"); return args.length > 0 ? readFn.getAsync(key, args) : readFn.getAsync(key); @@ -56,6 +58,7 @@ public CompletableFuture getAsync(K key, Object ... args) { @Override public CompletableFuture> getAllAsync(List keys, Object ... args) { + Preconditions.checkNotNull(readFn, "null readFn"); return args.length > 0 ? readFn.getAllAsync(keys, args) : readFn.getAllAsync(keys); @@ -63,6 +66,7 @@ public CompletableFuture> getAllAsync(List keys, Object ... args) { @Override public CompletableFuture readAsync(int opId, Object... args) { + Preconditions.checkNotNull(readFn, "null readFn"); return readFn.readAsync(opId, args); } @@ -119,7 +123,9 @@ public void flush() { @Override public void close() { - readFn.close(); + if (readFn != null) { + readFn.close(); + } if (writeFn != null) { writeFn.close(); } diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java index 85f612c12b..6d6c23adca 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java @@ -130,7 +130,8 @@ public RemoteTable( ExecutorService callbackExecutor) { super(tableId); - Preconditions.checkNotNull(readFn, "null readFn"); + Preconditions.checkArgument(writeFn != null || readFn != null, + "Must have one of TableReadFunction or TableWriteFunction"); this.readFn = readFn; this.writeFn = writeFn; @@ -348,7 +349,9 @@ public CompletableFuture writeAsync(int opId, Object... args) { public void init(Context context) { super.init(context); asyncTable.init(context); - readFn.init(context, this); + if (readFn != null) { + readFn.init(context, this); + } if (writeFn != null) { writeFn.init(context, this); } diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java index d557c31086..20706dc4e8 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java @@ -176,8 +176,8 @@ public void testFlush() { verify(writeFn, times(1)).flush(); } - @Test(expected = NullPointerException.class) - public void testFailOnNullReadFn() { + @Test(expected = IllegalArgumentException.class) + public void testFailOnNullReadFnAndWriteFn() { new AsyncRemoteTable(null, null); } diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java index 7a98504dfe..718aa2cf47 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java @@ -88,7 +88,9 @@ private > T getTable(String tableId, TableRead readRateLimiter, writeRateLimiter, rateLimitingExecutor, readPolicy, writePolicy, retryExecutor, null, null, cbExecutor); table.init(getMockContext()); - verify(readFn, times(1)).init(any(), any()); + if (readFn != null) { + verify(readFn, times(1)).init(any(), any()); + } if (writeFn != null) { verify(writeFn, times(1)).init(any(), any()); } @@ -122,6 +124,17 @@ private void doTestGet(boolean sync, boolean error, boolean retry) { verify(table.readRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString()); } + @Test(expected = IllegalArgumentException.class) + public void testFailOnNullReadFnAndWriteFn() { + getTable("id", null, null, false); + } + + @Test + public void testSucceedValidationOnNullReadFn() { + RemoteTable table = getTable("tableId", null, mock(TableWriteFunction.class), false); + Assert.assertNotNull(table); + } + @Test public void testInit() { String tableId = "testInit"; diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java index ce89c5a016..cf035990c4 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java @@ -99,6 +99,37 @@ private void doTestSerialize(RateLimiter rateLimiter, CreditFunction readCredFn, tableConfig.containsKey(JavaTableConfig.buildKey(tableId, RemoteTableDescriptor.WRITE_CREDIT_FN))); } + @Test + public void testValidateOnlyReadOrWriteFn() { + // Only read defined + String tableId = "1"; + RemoteTableDescriptor desc = new RemoteTableDescriptor(tableId) + .withReadFunction(createMockTableReadFunction()) + .withReadRateLimiterDisabled(); + Map tableConfig = desc.toConfig(new MapConfig()); + Assert.assertNotNull(tableConfig); + + // Only write defined + String tableId2 = "2"; + RemoteTableDescriptor desc2 = new RemoteTableDescriptor(tableId2) + .withWriteFunction(createMockTableWriteFunction()) + .withWriteRateLimiterDisabled(); + tableConfig = desc2.toConfig(new MapConfig()); + Assert.assertNotNull(tableConfig); + + // Neither read or write defined (Failure case) + String tableId3 = "3"; + RemoteTableDescriptor desc3 = new RemoteTableDescriptor(tableId3); + try { + desc3.toConfig(new MapConfig()); + Assert.fail("Should not allow neither readFn or writeFn defined"); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalArgumentException); + Assert.assertTrue(e.getMessage().contains("Must have one of TableReadFunction or TableWriteFunction")); + } + } + + @Test public void testSerializeSimple() { doTestSerialize(null, null, null); @@ -135,7 +166,7 @@ public void testSerializeNullWriteFunction() { assertEquals(null, RemoteTableDescriptor.WRITE_FN, tableId, tableConfig); } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testSerializeNullReadFunction() { RemoteTableDescriptor desc = new RemoteTableDescriptor("1"); Map tableConfig = desc.toConfig(new MapConfig());