From 3f7ed71f8f5d6efe3567f039734ac96bb12679b6 Mon Sep 17 00:00:00 2001 From: Wei Song Date: Tue, 24 Jul 2018 18:34:06 -0700 Subject: [PATCH 1/3] Added self to committer list --- docs/community/committers.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/community/committers.md b/docs/community/committers.md index b8847ee974..222a661736 100644 --- a/docs/community/committers.md +++ b/docs/community/committers.md @@ -96,3 +96,8 @@ Committer
**Prateek Maheshwari**
Committer
+ +**Wei Song**
+Committer
+ + From a53e56281e97c8db81b07453f5c1d0177703019f Mon Sep 17 00:00:00 2001 From: Wei Song Date: Fri, 19 Oct 2018 11:57:20 -0700 Subject: [PATCH 2/3] SAMZA-1964 Make getTableSpec() in RemoteTableDescriptor reentrant --- .../remote/descriptors/RemoteTableDescriptor.java | 15 ++++++--------- .../descriptors/TestRemoteTableDescriptor.java | 1 + .../descriptors/TestInMemoryTableDescriptor.java | 7 +++++++ .../descriptors/TestRocksDbTableDescriptor.java | 6 ++++-- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java index 0187b2ebf6..1d1bca6eb8 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java @@ -65,8 +65,7 @@ public class RemoteTableDescriptor extends BaseTableDescriptor writeFn; - // Rate limiter for client-side throttling; - // can either be constructed indirectly from rates or overridden by withRateLimiter() + // Rate limiter for client-side throttling; it is set by withRateLimiter() private RateLimiter rateLimiter; // Rates for constructing the default rate limiter when they are non-zero @@ -113,21 +112,19 @@ public TableSpec getTableSpec() { tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn)); } - // Serialize the rate limiter if specified if (!tagCreditsMap.isEmpty()) { - rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap); - } - - if (rateLimiter != null) { + tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", + new EmbeddedTaggedRateLimiter(tagCreditsMap))); + } else if (rateLimiter != null) { tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter)); } - // Serialize the readCredit and writeCredit functions + // Serialize the readCredit functions if (readCreditFn != null) { tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize( "read credit function", readCreditFn)); } - + // Serialize the writeCredit functions if (writeCreditFn != null) { tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize( "write credit function", writeCreditFn)); diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java index c6d969ee9e..fb9d2e7dd8 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java @@ -197,6 +197,7 @@ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean r } TableSpec spec = desc.getTableSpec(); + spec = desc.getTableSpec(); RemoteTableProvider provider = new RemoteTableProvider(spec); provider.init(createMockContext()); Table table = provider.getTable(); diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java index 33e3c3507c..5353837441 100644 --- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java @@ -28,6 +28,13 @@ public class TestInMemoryTableDescriptor { + @Test + public void testMinimal() { + InMemoryTableDescriptor tableDescriptor = new InMemoryTableDescriptor("1"); + tableDescriptor.getTableSpec(); + tableDescriptor.getTableSpec(); + } + @Test public void testTableSpec() { diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java index 86efea5548..86f6e5ceac 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java @@ -30,8 +30,10 @@ public class TestRocksDbTableDescriptor { @Test public void testMinimal() { - new RocksDbTableDescriptor("1") - .validate(); + RocksDbTableDescriptor tableDescriptor = new RocksDbTableDescriptor("1"); + tableDescriptor.validate(); + tableDescriptor.getTableSpec(); + tableDescriptor.getTableSpec(); } @Test From 2f70f7820a40aa93e0658ebba8787f667052e777 Mon Sep 17 00:00:00 2001 From: Wei Song Date: Thu, 17 Jan 2019 13:35:57 -0800 Subject: [PATCH 3/3] Changed TTL type to long in RocksDbTableDescriptor --- .../samza/storage/kv/descriptors/RocksDbTableDescriptor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java index ce2b8f6ba6..0703317f7a 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java @@ -55,7 +55,7 @@ public class RocksDbTableDescriptor extends LocalTableDescriptor withBlockSize(int blockSize) { * @param ttl the time to live in milliseconds * @return this table descriptor instance */ - public RocksDbTableDescriptor withTtl(int ttl) { + public RocksDbTableDescriptor withTtl(long ttl) { this.ttl = ttl; return this; }