From 85c21c0a95db14e663794a5ee87bd87f78ca9c81 Mon Sep 17 00:00:00 2001 From: Ray Matharu Date: Fri, 23 Aug 2019 14:55:17 -0700 Subject: [PATCH 1/3] Emitting numPersistentStores instead of num stores with changelog --- .../org/apache/samza/config/StorageConfig.java | 12 ++++++++---- .../org/apache/samza/util/DiagnosticsUtil.java | 2 +- .../samza/diagnostics/DiagnosticsManager.java | 12 ++++++------ .../diagnostics/DiagnosticsStreamMessage.java | 17 +++++++++-------- .../diagnostics/TestDiagnosticsManager.java | 10 +++++----- .../TestDiagnosticsStreamMessage.java | 6 +++--- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index 0bb9b99709..9df77c427a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -63,6 +63,7 @@ public class StorageConfig extends MapConfig { static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s.side.inputs.processor.factory"; static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = STORE_PREFIX + "%s.side.inputs.processor.serialized.instance"; + static final String PERSISTENT_STORE_FACTORY = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; public StorageConfig(Config config) { super(config); @@ -225,10 +226,13 @@ public boolean hasDurableStores() { } /** - * Helper method to get the number of stores configured with a changelog. + * Helper method to get the number of persistent stores. */ - public int getNumStoresWithChangelog() { - Config subConfig = subset(STORE_PREFIX, true); - return new Long(subConfig.keySet().stream().filter(key -> key.endsWith(CHANGELOG_SUFFIX)).count()).intValue(); + public int getNumPersistentStores() { + return (int) getStoreNames().stream() + .map(storeName -> getStorageFactoryClassName(storeName)) + .filter(factoryName -> factoryName.isPresent()) + .filter(factoryName -> factoryName.get().equals(PERSISTENT_STORE_FACTORY)) + .count(); } } diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java index a3245a17bf..2870153502 100644 --- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java @@ -132,7 +132,7 @@ public static Optional> buildD systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap()); DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores, - new StorageConfig(config).getNumStoresWithChangelog(), maxHeapSizeBytes, containerThreadPoolSize, containerId, execEnvContainerId.orElse(""), + new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize, containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName, diagnosticsSystemStream, systemProducer, Duration.ofMillis(new TaskConfig(config).getShutdownMs())); diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java index ed5179e5a7..80ddf0dc5f 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java @@ -67,7 +67,7 @@ public class DiagnosticsManager { // Job-related params private final int containerMemoryMb; private final int containerNumCores; - private final int numStoresWithChangelog; + private final int numPersistentStores; private final long maxHeapSizeBytes; private final int containerThreadPoolSize; private final Map containerModels; @@ -86,7 +86,7 @@ public DiagnosticsManager(String jobName, Map containerModels, int containerMemoryMb, int containerNumCores, - int numStoresWithChangelog, + int numPersistentStores, long maxHeapSizeBytes, int containerThreadPoolSize, String containerId, @@ -98,7 +98,7 @@ public DiagnosticsManager(String jobName, SystemProducer systemProducer, Duration terminationDuration) { - this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numStoresWithChangelog, maxHeapSizeBytes, containerThreadPoolSize, + this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize, containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer, terminationDuration, Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build())); @@ -110,7 +110,7 @@ public DiagnosticsManager(String jobName, Map containerModels, int containerMemoryMb, int containerNumCores, - int numStoresWithChangelog, + int numPersistentStores, long maxHeapSizeBytes, int containerThreadPoolSize, String containerId, @@ -127,7 +127,7 @@ public DiagnosticsManager(String jobName, this.containerModels = containerModels; this.containerMemoryMb = containerMemoryMb; this.containerNumCores = containerNumCores; - this.numStoresWithChangelog = numStoresWithChangelog; + this.numPersistentStores = numPersistentStores; this.maxHeapSizeBytes = maxHeapSizeBytes; this.containerThreadPoolSize = containerThreadPoolSize; this.containerId = containerId; @@ -211,7 +211,7 @@ public void run() { if (!jobParamsEmitted) { diagnosticsStreamMessage.addContainerMb(containerMemoryMb); diagnosticsStreamMessage.addContainerNumCores(containerNumCores); - diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog); + diagnosticsStreamMessage.addNumPersistentStores(numPersistentStores); diagnosticsStreamMessage.addContainerModels(containerModels); diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes); diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize); diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java index 81642d5545..de12bde90f 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java @@ -55,7 +55,7 @@ public class DiagnosticsStreamMessage { private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents"; private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb"; private static final String CONTAINER_NUM_CORES_METRIC_NAME = "containerNumCores"; - private static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME = "numStoresWithChangelog"; + private static final String CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME = "numPersistentStores"; private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME = "maxHeap"; private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = "containerThreadPoolSize"; private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels"; @@ -92,11 +92,11 @@ public void addContainerNumCores(Integer containerNumCores) { /** * Add the num stores with changelog parameter to the message. - * @param numStoresWithChangelog the parameter value. + * @param numPersistentStores the parameter value. */ - public void addNumStoresWithChangelog(Integer numStoresWithChangelog) { - addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME, - numStoresWithChangelog); + public void addNumPersistentStores(Integer numPersistentStores) { + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME, + numPersistentStores); } /** @@ -198,9 +198,9 @@ public Integer getContainerNumCores() { return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_CORES_METRIC_NAME); } - public Integer getNumStoresWithChangelog() { + public Integer getNumPersistentStores() { return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, - CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME); + CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME); } public Long getMaxHeapSize() { @@ -234,7 +234,8 @@ public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(Metrics diagnosticsStreamMessage.addContainerNumCores((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME)); diagnosticsStreamMessage.addContainerMb((Integer) diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME)); - diagnosticsStreamMessage.addNumStoresWithChangelog((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME)); + diagnosticsStreamMessage.addNumPersistentStores((Integer) diagnosticsManagerGroupMap.get( + CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME)); diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String) diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME))); diagnosticsStreamMessage.addMaxHeapSize((Long) diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME)); diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME)); diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java index 33d16e3300..d1acdd2c07 100644 --- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java +++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java @@ -55,7 +55,7 @@ public class TestDiagnosticsManager { private int containerMb = 1024; private int containerThreadPoolSize = 2; private long maxHeapSize = 900; - private int numStoresWithChangelog = 2; + private int numPersistentStores = 2; private int containerNumCores = 2; private Map containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels(); private Collection exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList(); @@ -75,7 +75,7 @@ public void setup() { }); this.diagnosticsManager = - new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numStoresWithChangelog, maxHeapSize, containerThreadPoolSize, + new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService); @@ -136,7 +136,7 @@ public void testSecondPublishWithProcessorStopInSecondMessage() { Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 102))); Assert.assertNull(diagnosticsStreamMessage.getContainerModels()); Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores()); - Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog()); + Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores()); } @Test @@ -169,7 +169,7 @@ public void testSecondPublishWithExceptionInSecondMessage() { Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents()); Assert.assertNull(diagnosticsStreamMessage.getContainerModels()); Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores()); - Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog()); + Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores()); } @After @@ -210,7 +210,7 @@ private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope outgoingMes Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101))); Assert.assertEquals(containerModels, diagnosticsStreamMessage.getContainerModels()); Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue()); - Assert.assertEquals(numStoresWithChangelog, diagnosticsStreamMessage.getNumStoresWithChangelog().intValue()); + Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue()); } private class MockSystemProducer implements SystemProducer { diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java index 81bc57747e..cd506b25e0 100644 --- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java +++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java @@ -54,7 +54,7 @@ private DiagnosticsStreamMessage getDiagnosticsStreamMessage() { diagnosticsStreamMessage.addContainerMb(1024); diagnosticsStreamMessage.addContainerNumCores(2); - diagnosticsStreamMessage.addNumStoresWithChangelog(3); + diagnosticsStreamMessage.addNumPersistentStores(3); diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList()); return diagnosticsStreamMessage; @@ -106,7 +106,7 @@ public void basicTest() { Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb()); Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores()); - Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumStoresWithChangelog()); + Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumPersistentStores()); Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents()); Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels()); Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList()); @@ -135,7 +135,7 @@ public void serdeTest() { Map> metricsMap = metricsSnapshot.getMetrics().getAsMap(); Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions")); Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels")); - Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog")); + Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numPersistentStores")); Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores")); Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb")); Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents")); From eb66a27a0f31482c857c0f7bcbe6280510e58dc4 Mon Sep 17 00:00:00 2001 From: Ray Matharu Date: Sat, 24 Aug 2019 11:28:43 -0700 Subject: [PATCH 2/3] Using inmemory factory name instead --- .../src/main/java/org/apache/samza/config/StorageConfig.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index 9df77c427a..7bc6cb4c72 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -63,7 +63,8 @@ public class StorageConfig extends MapConfig { static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s.side.inputs.processor.factory"; static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = STORE_PREFIX + "%s.side.inputs.processor.serialized.instance"; - static final String PERSISTENT_STORE_FACTORY = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; + static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY = + "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"; public StorageConfig(Config config) { super(config); @@ -232,7 +233,7 @@ public int getNumPersistentStores() { return (int) getStoreNames().stream() .map(storeName -> getStorageFactoryClassName(storeName)) .filter(factoryName -> factoryName.isPresent()) - .filter(factoryName -> factoryName.get().equals(PERSISTENT_STORE_FACTORY)) + .filter(factoryName -> !factoryName.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) .count(); } } From 04937ccf9006cdadb740fc6b3079dc21943ed4d8 Mon Sep 17 00:00:00 2001 From: Ray Matharu Date: Mon, 26 Aug 2019 11:13:01 -0700 Subject: [PATCH 3/3] Trigger notification