Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
public static final String BATCH_PROVIDER = "io.batch.provider";

// Input support for a specific remote store (required)
// Input support for a specific remote store (optional)
private TableReadFunction<K, V> readFn;

// Output support for a specific remote store (optional)
Expand All @@ -86,6 +86,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
// Rate limiter for client-side throttling; it is set by withRateLimiter()
private RateLimiter rateLimiter;

// Indicate whether read rate limiter is enabled or not
private boolean enableReadRateLimiter = true;

// Indicate whether write rate limiter is enabled or not
Expand Down Expand Up @@ -327,8 +328,10 @@ public Map<String, String> toConfig(Config jobConfig) {
addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig);

// Handle table reader function
addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
if (readFn != null) {
addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
}

// Handle table write function
if (writeFn != null) {
Expand All @@ -345,7 +348,8 @@ public Map<String, String> toConfig(Config jobConfig) {

@Override
protected void validate() {
Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
Comment thread
dxichen marked this conversation as resolved.
Preconditions.checkArgument(writeFn != null || readFn != null,
"Must have one of TableReadFunction or TableWriteFunction");
Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
"Only one of rateLimiter instance or read/write limits can be specified");
// Assume callback executor pool should have no more than 20 threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,31 @@ public class AsyncRemoteTable<K, V> implements AsyncReadWriteTable<K, V> {
private final TableWriteFunction<K, V> writeFn;

public AsyncRemoteTable(TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
Preconditions.checkNotNull(readFn, "null readFn");
Preconditions.checkArgument(writeFn != null || readFn != null,
"Must have one of TableReadFunction or TableWriteFunction");
this.readFn = readFn;
this.writeFn = writeFn;
}

@Override
public CompletableFuture<V> getAsync(K key, Object ... args) {
Preconditions.checkNotNull(readFn, "null readFn");
return args.length > 0
? readFn.getAsync(key, args)
: readFn.getAsync(key);
}

@Override
public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object ... args) {
Preconditions.checkNotNull(readFn, "null readFn");
return args.length > 0
? readFn.getAllAsync(keys, args)
: readFn.getAllAsync(keys);
}

@Override
public <T> CompletableFuture<T> readAsync(int opId, Object... args) {
Preconditions.checkNotNull(readFn, "null readFn");
return readFn.readAsync(opId, args);
}

Expand Down Expand Up @@ -119,7 +123,9 @@ public void flush() {

@Override
public void close() {
readFn.close();
if (readFn != null) {
readFn.close();
}
if (writeFn != null) {
writeFn.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public RemoteTable(
ExecutorService callbackExecutor) {

super(tableId);
Preconditions.checkNotNull(readFn, "null readFn");
Preconditions.checkArgument(writeFn != null || readFn != null,
"Must have one of TableReadFunction or TableWriteFunction");

this.readFn = readFn;
this.writeFn = writeFn;
Expand Down Expand Up @@ -348,7 +349,9 @@ public <T> CompletableFuture<T> writeAsync(int opId, Object... args) {
public void init(Context context) {
super.init(context);
asyncTable.init(context);
readFn.init(context, this);
if (readFn != null) {
readFn.init(context, this);
}
if (writeFn != null) {
writeFn.init(context, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ public void testFlush() {
verify(writeFn, times(1)).flush();
}

@Test(expected = NullPointerException.class)
public void testFailOnNullReadFn() {
@Test(expected = IllegalArgumentException.class)
public void testFailOnNullReadFnAndWriteFn() {
new AsyncRemoteTable(null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, TableRead
readRateLimiter, writeRateLimiter, rateLimitingExecutor,
readPolicy, writePolicy, retryExecutor, null, null, cbExecutor);
table.init(getMockContext());
verify(readFn, times(1)).init(any(), any());
if (readFn != null) {
verify(readFn, times(1)).init(any(), any());
}
if (writeFn != null) {
verify(writeFn, times(1)).init(any(), any());
}
Expand Down Expand Up @@ -122,6 +124,17 @@ private void doTestGet(boolean sync, boolean error, boolean retry) {
verify(table.readRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString());
}

@Test(expected = IllegalArgumentException.class)
public void testFailOnNullReadFnAndWriteFn() {
getTable("id", null, null, false);
}

@Test
public void testSucceedValidationOnNullReadFn() {
RemoteTable<String, String> table = getTable("tableId", null, mock(TableWriteFunction.class), false);
Assert.assertNotNull(table);
}

@Test
public void testInit() {
String tableId = "testInit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,37 @@ private void doTestSerialize(RateLimiter rateLimiter, CreditFunction readCredFn,
tableConfig.containsKey(JavaTableConfig.buildKey(tableId, RemoteTableDescriptor.WRITE_CREDIT_FN)));
}

@Test
public void testValidateOnlyReadOrWriteFn() {
// Only read defined
String tableId = "1";
RemoteTableDescriptor desc = new RemoteTableDescriptor(tableId)
.withReadFunction(createMockTableReadFunction())
.withReadRateLimiterDisabled();
Map<String, String> tableConfig = desc.toConfig(new MapConfig());
Assert.assertNotNull(tableConfig);

// Only write defined
String tableId2 = "2";
RemoteTableDescriptor desc2 = new RemoteTableDescriptor(tableId2)
.withWriteFunction(createMockTableWriteFunction())
.withWriteRateLimiterDisabled();
tableConfig = desc2.toConfig(new MapConfig());
Assert.assertNotNull(tableConfig);

// Neither read or write defined (Failure case)
String tableId3 = "3";
RemoteTableDescriptor desc3 = new RemoteTableDescriptor(tableId3);
try {
desc3.toConfig(new MapConfig());
Assert.fail("Should not allow neither readFn or writeFn defined");
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalArgumentException);
Assert.assertTrue(e.getMessage().contains("Must have one of TableReadFunction or TableWriteFunction"));
}
}


@Test
public void testSerializeSimple() {
doTestSerialize(null, null, null);
Expand Down Expand Up @@ -135,7 +166,7 @@ public void testSerializeNullWriteFunction() {
assertEquals(null, RemoteTableDescriptor.WRITE_FN, tableId, tableConfig);
}

@Test(expected = NullPointerException.class)
@Test(expected = IllegalArgumentException.class)
public void testSerializeNullReadFunction() {
RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
Map<String, String> tableConfig = desc.toConfig(new MapConfig());
Expand Down