From 83fc9880135d5e32dd81a68b32be423732cf4837 Mon Sep 17 00:00:00 2001 From: Brennon York Date: Thu, 15 Dec 2016 20:16:18 -0800 Subject: [PATCH 1/3] updated catch blocks for async reader benchmark --- .../benchmark/stream/AsyncReaderBenchmark.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java index 6fc7e03a8..5042c76c5 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java @@ -44,6 +44,8 @@ public class AsyncReaderBenchmark extends AbstractReaderBenchmark { @Override protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { DistributedLogManager dlm = null; + Integer zkSessionTimeoutMilliseconds = conf.getZKSessionTimeoutMilliseconds(); + while (null == dlm) { try { dlm = namespace.openLog(streamName); @@ -52,8 +54,10 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == dlm) { try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { + logger.warn("Failed to create dlm for stream {} after {} ms timeout : ", + new Object[] { streamName, zkSessionTimeoutMilliseconds, e }); } } } @@ -118,8 +122,10 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == reader) { try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { + logger.warn("Failed to create reader for stream {} after {} ms timeout : ", + new Object[] { streamName, zkSessionTimeoutMilliseconds, e }); } continue; } @@ -145,8 +151,10 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } } try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { + logger.warn("Reader stream {} interrupted after {} ms timeout : ", + new Object[] { streamName, zkSessionTimeoutMilliseconds, e }); } } } From 7decca2d173d10a6bfefc13a64a5fe40b1f0ebe2 Mon Sep 17 00:00:00 2001 From: Brennon York Date: Thu, 15 Dec 2016 20:28:13 -0800 Subject: [PATCH 2/3] updated warn text for async reader, added catch blocks for ledger read benchmark --- .../benchmark/stream/AsyncReaderBenchmark.java | 12 ++++++------ .../benchmark/stream/LedgerReadBenchmark.java | 10 ++++++++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java index 5042c76c5..8b2aecc32 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java @@ -56,8 +56,8 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat try { TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { - logger.warn("Failed to create dlm for stream {} after {} ms timeout : ", - new Object[] { streamName, zkSessionTimeoutMilliseconds, e }); + logger.warn("Interrupted from sleep while creating dlm for stream {} : ", + streamName, e); } } } @@ -124,8 +124,8 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat try { TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { - logger.warn("Failed to create reader for stream {} after {} ms timeout : ", - new Object[] { streamName, zkSessionTimeoutMilliseconds, e }); + logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", + streamName, e); } continue; } @@ -153,8 +153,8 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat try { TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { - logger.warn("Reader stream {} interrupted after {} ms timeout : ", - new Object[] { streamName, zkSessionTimeoutMilliseconds, e }); + logger.warn("Interrupted from sleep while creating reader for stream {} : ", + streamName, e); } } } diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java index 7d3083e99..e7ec7e364 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java @@ -50,6 +50,8 @@ public class LedgerReadBenchmark extends AbstractReaderBenchmark { @Override protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { DistributedLogManager dlm = null; + Integer zkSessionTimeoutMilliseconds = conf.getZKSessionTimeoutMilliseconds(); + while (null == dlm) { try { dlm = namespace.openLog(streamName); @@ -58,8 +60,10 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == dlm) { try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating dlm for stream {} : ", + streamName, e); } } } @@ -74,8 +78,10 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == segments) { try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while geting log segments for stream {} : ", + streamName, e); } } } From 3a2b5533f683f5d51cc865fbe8d6a25cc850ff1b Mon Sep 17 00:00:00 2001 From: Brennon York Date: Thu, 15 Dec 2016 20:34:40 -0800 Subject: [PATCH 3/3] updated logging verbage, added catch blocks for sync reader and zkmetadata resolver --- .../benchmark/stream/AsyncReaderBenchmark.java | 8 +++----- .../benchmark/stream/LedgerReadBenchmark.java | 6 ++---- .../benchmark/stream/SyncReaderBenchmark.java | 6 ++++++ .../distributedlog/metadata/ZkMetadataResolver.java | 1 + 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java index 8b2aecc32..5c18705fd 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java @@ -44,8 +44,6 @@ public class AsyncReaderBenchmark extends AbstractReaderBenchmark { @Override protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { DistributedLogManager dlm = null; - Integer zkSessionTimeoutMilliseconds = conf.getZKSessionTimeoutMilliseconds(); - while (null == dlm) { try { dlm = namespace.openLog(streamName); @@ -54,7 +52,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == dlm) { try { - TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { logger.warn("Interrupted from sleep while creating dlm for stream {} : ", streamName, e); @@ -122,7 +120,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == reader) { try { - TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", streamName, e); @@ -151,7 +149,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } } try { - TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { logger.warn("Interrupted from sleep while creating reader for stream {} : ", streamName, e); diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java index e7ec7e364..0daffd515 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java @@ -50,8 +50,6 @@ public class LedgerReadBenchmark extends AbstractReaderBenchmark { @Override protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { DistributedLogManager dlm = null; - Integer zkSessionTimeoutMilliseconds = conf.getZKSessionTimeoutMilliseconds(); - while (null == dlm) { try { dlm = namespace.openLog(streamName); @@ -60,7 +58,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == dlm) { try { - TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { logger.warn("Interrupted from sleep while creating dlm for stream {} : ", streamName, e); @@ -78,7 +76,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat } if (null == segments) { try { - TimeUnit.MILLISECONDS.sleep(zkSessionTimeoutMilliseconds); + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { logger.warn("Interrupted from sleep while geting log segments for stream {} : ", streamName, e); diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java index b21fd5005..88755e2ee 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java @@ -49,6 +49,8 @@ protected void benchmark(DistributedLogNamespace namespace, String streamName, S try { TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating dlm for stream {} : ", + streamName, e); } } } @@ -103,6 +105,8 @@ protected void benchmark(DistributedLogNamespace namespace, String streamName, S try { TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { + logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", + streamName, e); } continue; } @@ -140,6 +144,8 @@ record = reader.readNext(nonBlocking); try { TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating reader for stream {} : ", + streamName, e); } } } diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java index f2be95c0b..303fbe68b 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java @@ -62,6 +62,7 @@ public DLMetadata resolve(URI uri) throws IOException { try { return DLMetadata.deserialize(uri, data); } catch (IOException ie) { + throw new IOException("Failed to deserialize uri : " + uri); } } throw new IOException("No bkdl config bound under dl path : " + dlPath);