diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java index eb8188f06d..dce7cc0a86 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.table.TableSpec; @@ -56,6 +57,9 @@ public Map generateConfig(Map config) { tableConfig.put(realKey, v); }); + // Enable host affinity + tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true"); + logger.info("Generated configuration for table " + tableSpec.getId()); return tableConfig; diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 3ed29ca118..817fb9f02c 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -81,7 +81,7 @@ public void testWithTableDescriptorsProviderClass() throws Exception { String tableRewriterName = "tableRewriter"; configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName()); Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs)); - Assert.assertTrue(resultConfig.size() == 17); + Assert.assertTrue(resultConfig.size() == 18); String localTableId = "local-table-1"; String remoteTableId = "remote-table-1";