Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class StorageConfig extends MapConfig {
static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s.side.inputs.processor.factory";
static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE =
STORE_PREFIX + "%s.side.inputs.processor.serialized.instance";
static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";

public StorageConfig(Config config) {
super(config);
Expand Down Expand Up @@ -225,10 +227,13 @@ public boolean hasDurableStores() {
}

/**
* Helper method to get the number of stores configured with a changelog.
* Helper method to get the number of persistent stores.
*/
public int getNumStoresWithChangelog() {
Config subConfig = subset(STORE_PREFIX, true);
return new Long(subConfig.keySet().stream().filter(key -> key.endsWith(CHANGELOG_SUFFIX)).count()).intValue();
public int getNumPersistentStores() {
return (int) getStoreNames().stream()
.map(storeName -> getStorageFactoryClassName(storeName))
.filter(factoryName -> factoryName.isPresent())
.filter(factoryName -> !factoryName.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY))
.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildD
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
DiagnosticsManager diagnosticsManager =
new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores,
new StorageConfig(config).getNumStoresWithChangelog(), maxHeapSizeBytes, containerThreadPoolSize, containerId, execEnvContainerId.orElse(""),
new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize, containerId, execEnvContainerId.orElse(""),
taskClassVersion, samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class DiagnosticsManager {
// Job-related params
private final int containerMemoryMb;
private final int containerNumCores;
private final int numStoresWithChangelog;
private final int numPersistentStores;
private final long maxHeapSizeBytes;
private final int containerThreadPoolSize;
private final Map<String, ContainerModel> containerModels;
Expand All @@ -86,7 +86,7 @@ public DiagnosticsManager(String jobName,
Map<String, ContainerModel> containerModels,
int containerMemoryMb,
int containerNumCores,
int numStoresWithChangelog,
int numPersistentStores,
long maxHeapSizeBytes,
int containerThreadPoolSize,
String containerId,
Expand All @@ -98,7 +98,7 @@ public DiagnosticsManager(String jobName,
SystemProducer systemProducer,
Duration terminationDuration) {

this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numStoresWithChangelog, maxHeapSizeBytes, containerThreadPoolSize,
this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
terminationDuration, Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
Expand All @@ -110,7 +110,7 @@ public DiagnosticsManager(String jobName,
Map<String, ContainerModel> containerModels,
int containerMemoryMb,
int containerNumCores,
int numStoresWithChangelog,
int numPersistentStores,
long maxHeapSizeBytes,
int containerThreadPoolSize,
String containerId,
Expand All @@ -127,7 +127,7 @@ public DiagnosticsManager(String jobName,
this.containerModels = containerModels;
this.containerMemoryMb = containerMemoryMb;
this.containerNumCores = containerNumCores;
this.numStoresWithChangelog = numStoresWithChangelog;
this.numPersistentStores = numPersistentStores;
this.maxHeapSizeBytes = maxHeapSizeBytes;
this.containerThreadPoolSize = containerThreadPoolSize;
this.containerId = containerId;
Expand Down Expand Up @@ -211,7 +211,7 @@ public void run() {
if (!jobParamsEmitted) {
diagnosticsStreamMessage.addContainerMb(containerMemoryMb);
diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
diagnosticsStreamMessage.addNumPersistentStores(numPersistentStores);
diagnosticsStreamMessage.addContainerModels(containerModels);
diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class DiagnosticsStreamMessage {
private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
private static final String CONTAINER_NUM_CORES_METRIC_NAME = "containerNumCores";
private static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME = "numStoresWithChangelog";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all readers of this data handle a missing entry for this? Just double checking on compatibility.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, only the asc prototype uses this currently.

private static final String CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME = "numPersistentStores";
private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME = "maxHeap";
private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = "containerThreadPoolSize";
private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
Expand Down Expand Up @@ -92,11 +92,11 @@ public void addContainerNumCores(Integer containerNumCores) {

/**
* Add the num stores with changelog parameter to the message.
* @param numStoresWithChangelog the parameter value.
* @param numPersistentStores the parameter value.
*/
public void addNumStoresWithChangelog(Integer numStoresWithChangelog) {
addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME,
numStoresWithChangelog);
public void addNumPersistentStores(Integer numPersistentStores) {
addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME,
numPersistentStores);
}

/**
Expand Down Expand Up @@ -198,9 +198,9 @@ public Integer getContainerNumCores() {
return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_CORES_METRIC_NAME);
}

public Integer getNumStoresWithChangelog() {
public Integer getNumPersistentStores() {
return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME);
}

public Long getMaxHeapSize() {
Expand Down Expand Up @@ -234,7 +234,8 @@ public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(Metrics

diagnosticsStreamMessage.addContainerNumCores((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME));
diagnosticsStreamMessage.addContainerMb((Integer) diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
diagnosticsStreamMessage.addNumStoresWithChangelog((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
diagnosticsStreamMessage.addNumPersistentStores((Integer) diagnosticsManagerGroupMap.get(
CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME));
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String) diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
diagnosticsStreamMessage.addMaxHeapSize((Long) diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME));
diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class TestDiagnosticsManager {
private int containerMb = 1024;
private int containerThreadPoolSize = 2;
private long maxHeapSize = 900;
private int numStoresWithChangelog = 2;
private int numPersistentStores = 2;
private int containerNumCores = 2;
private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList();
Expand All @@ -75,7 +75,7 @@ public void setup() {
});

this.diagnosticsManager =
new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numStoresWithChangelog, maxHeapSize, containerThreadPoolSize,
new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
"0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream,
mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);

Expand Down Expand Up @@ -136,7 +136,7 @@ public void testSecondPublishWithProcessorStopInSecondMessage() {
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 102)));
Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
}

@Test
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testSecondPublishWithExceptionInSecondMessage() {
Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents());
Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
}

@After
Expand Down Expand Up @@ -210,7 +210,7 @@ private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope outgoingMes
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101)));
Assert.assertEquals(containerModels, diagnosticsStreamMessage.getContainerModels());
Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue());
Assert.assertEquals(numStoresWithChangelog, diagnosticsStreamMessage.getNumStoresWithChangelog().intValue());
Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue());
}

private class MockSystemProducer implements SystemProducer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {

diagnosticsStreamMessage.addContainerMb(1024);
diagnosticsStreamMessage.addContainerNumCores(2);
diagnosticsStreamMessage.addNumStoresWithChangelog(3);
diagnosticsStreamMessage.addNumPersistentStores(3);

diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
return diagnosticsStreamMessage;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void basicTest() {

Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores());
Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumStoresWithChangelog());
Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumPersistentStores());
Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels());
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList());
Expand Down Expand Up @@ -135,7 +135,7 @@ public void serdeTest() {
Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numPersistentStores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
Expand Down