Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2561: Add job features to MetricsHeader #1402

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions samza-core/src/main/java/org/apache/samza/metrics/ApiType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

public enum ApiType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SamzaAPI not a better name?

SAMZA_LOW_LEVEL, SAMZA_HIGH_LEVEL, SAMZA_SQL, SAMZA_BEAM
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

public enum DeploymentType {
YARN, STANDALONE
}
Comment on lines +1 to +24
Copy link
Contributor

@rmatharu-zz rmatharu-zz Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line enums like these can likely be added to the metrics-header class, since that seems like the only place theyre used no?
Same for API type above.

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public static void writeMetadataFile(String jobName, String jobId, String contai
MetricsHeader metricsHeader =
new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""),
LocalContainerRunner.class.getName(), Util.getTaskClassVersion(config), Util.getSamzaVersion(),
Util.getLocalHost().getHostName(), System.currentTimeMillis(), System.currentTimeMillis());
Util.getLocalHost().getHostName(), System.currentTimeMillis(), System.currentTimeMillis(), Util.getDeploymentType(config),
Util.getApiType(config), Util.getContainerCount(config), Util.getContainerMemoryMb(config), Util.getNumCores(config),
Util.getThreadPoolSize(config), Util.getHostAffinityEnabled(config), Util.getSspGrouperFactory(config),
Util.getContainerRetryCount(config), Util.getContainerRetryWindowMs(config), Util.getMaxConcurrency(config), Util.getMaxJvmHeapMb());

class MetadataFileContents {
public final String version;
Expand Down Expand Up @@ -127,6 +130,14 @@ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildD
String hostName = Util.getLocalHost().getHostName();
Optional<String> diagnosticsReporterStreamName =
metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName);
String deploymentType = Util.getDeploymentType(config);
String apiType = Util.getApiType(config);
int numContainers = Util.getContainerCount(config);
boolean hostAffinityEnabled = Util.getHostAffinityEnabled(config);
String sspGrouperFactory = Util.getSspGrouperFactory(config);
int containerRetryCount = Util.getContainerRetryCount(config);
long containerRetryWindowMs = Util.getContainerRetryWindowMs(config);
int maxConcurrency = Util.getMaxConcurrency(config);

if (!diagnosticsReporterStreamName.isPresent()) {
throw new ConfigException(
Expand All @@ -151,7 +162,9 @@ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildD
new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize,
containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName,
diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled());
Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(),
deploymentType, apiType, numContainers, hostAffinityEnabled, sspGrouperFactory, containerRetryCount,
containerRetryWindowMs, maxConcurrency);

diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
}
Expand Down
89 changes: 89 additions & 0 deletions samza-core/src/main/java/org/apache/samza/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.metrics.ApiType;
import org.apache.samza.metrics.DeploymentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Util {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your change but JobInfoUtil is a better name for this Util class rather than just Util

private static final Logger LOG = LoggerFactory.getLogger(Util.class);
private static final String YARN_JOB_FACTORY_CLASS = "org.apache.samza.job.yarn.YarnJobFactory";
private static final String BEAM_RUNNER_CLASS = "org.apache.beam.runners.samza.SamzaRunner";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually needed given
#1185 ?

private static final String SQL_RUNNER_CLASS = "org.apache.samza.sql.runner.SamzaSqlApplication";

static final String FALLBACK_VERSION = "0.0.1";

Expand Down Expand Up @@ -123,4 +131,85 @@ private static InetAddress doGetLocalHost() throws UnknownHostException, SocketE
}
return localHost;
}

public static String getDeploymentType(Config config) {
JobConfig jobConfig = new JobConfig(config);
Optional<String> streamJobFactoryClass = jobConfig.getStreamJobFactoryClass();
if (streamJobFactoryClass.isPresent()) {
if (streamJobFactoryClass.get().equals(YARN_JOB_FACTORY_CLASS)) {
return DeploymentType.YARN.name();
} else {
return DeploymentType.STANDALONE.name();
}
}
return "NOT_DEFINED";
}

public static String getApiType(Config config) {
ApplicationConfig appConfig = new ApplicationConfig(config);
String appClass = appConfig.getAppClass();
if (appClass == null || appClass.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever true? for legacy task class applications?

return ApiType.SAMZA_LOW_LEVEL.name();
}
if (appClass.equals(BEAM_RUNNER_CLASS)) {
return ApiType.SAMZA_BEAM.name();
}
if (appClass.equals(SQL_RUNNER_CLASS)) {
return ApiType.SAMZA_SQL.name();
}
if (appClass.getClass().isInstance(StreamApplication.class)) {
return ApiType.SAMZA_HIGH_LEVEL.name();
}
return ApiType.SAMZA_LOW_LEVEL.name();
}

public static int getContainerCount(Config config) {
JobConfig jobConfig = new JobConfig(config);
return jobConfig.getContainerCount();
}

Comment on lines +166 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even have it as a util?
Could the caller not cast the "config" object into JobConfig, TaskConfig and ClusterManagerConfig once and invoke the respective function, rather than do a cast in each separate util.

Same for getContainerMemoryMb, getNumCores, getThreadPoolSize, getSspGrouperFactory, getHostAffinityEnabled, getContainerRetryCount, getContainerRetryWindowMs, getMaxConcurrency.
Then none of these "util" methods need to be there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

public static int getContainerMemoryMb(Config config) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
return clusterManagerConfig.getContainerMemoryMb();
}

public static int getNumCores(Config config) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
return clusterManagerConfig.getNumCores();
}

public static int getThreadPoolSize(Config config) {
JobConfig jobConfig = new JobConfig(config);
return jobConfig.getThreadPoolSize();
}

public static String getSspGrouperFactory(Config config) {
JobConfig jobConfig = new JobConfig(config);
return jobConfig.getSystemStreamPartitionGrouperFactory();
}

public static boolean getHostAffinityEnabled(Config config) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
return clusterManagerConfig.getHostAffinityEnabled();
}

public static int getContainerRetryCount(Config config) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
return clusterManagerConfig.getContainerRetryCount();
}

public static int getContainerRetryWindowMs(Config config) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
return clusterManagerConfig.getContainerRetryWindowMs();
}

public static int getMaxConcurrency(Config config) {
TaskConfig taskConfig = new TaskConfig(config);
return taskConfig.getMaxConcurrency();
}

public static int getMaxJvmHeapMb() {
Long maxJvmHeapMb = Runtime.getRuntime().maxMemory() / (1024 * 1024);
return maxJvmHeapMb.intValue();
}
Comment on lines +211 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can DiagnosticsUtil.buildDiagnosticsManager also invoke this util method, instead of calling Runtime.getRuntime().maxMemory directly ?

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ public class DiagnosticsManager {
private final int containerThreadPoolSize;
private final Map<String, ContainerModel> containerModels;
private final boolean autosizingEnabled;
private final String deploymentType;
private final String apiType;
private final int numContainers;
private final boolean hostAffinityEnabled;
private final String sspGrouperFactory;
private final int containerRetryCount;
private final long containerRetryWindowMs;
private final int maxConcurrency;

private boolean jobParamsEmitted = false;

private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
Expand All @@ -93,12 +102,23 @@ public DiagnosticsManager(String jobName,
String hostname,
SystemStream diagnosticSystemStream,
SystemProducer systemProducer,
Duration terminationDuration, boolean autosizingEnabled) {
Duration terminationDuration,
boolean autosizingEnabled,
String deploymentType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should deploymentType and api type be enums, since we defined an enum for them above?

String apiType,
int numContainers,
boolean hostAffinityEnabled,
String sspGrouperFactory,
int containerRetryCount,
long containerRetryWindowMs,
int maxConcurrency) {

this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
terminationDuration, Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled);
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled,
deploymentType, apiType, numContainers, hostAffinityEnabled, sspGrouperFactory, containerRetryCount,
containerRetryWindowMs, maxConcurrency);
}

@VisibleForTesting
Expand All @@ -118,7 +138,16 @@ public DiagnosticsManager(String jobName,
SystemStream diagnosticSystemStream,
SystemProducer systemProducer,
Duration terminationDuration,
ScheduledExecutorService executorService, boolean autosizingEnabled) {
ScheduledExecutorService executorService,
boolean autosizingEnabled,
String deploymentType,
String apiType,
int numContainers,
boolean hostAffinityEnabled,
String sspGrouperFactory,
int containerRetryCount,
long containerRetryWindowMs,
int maxConcurrency) {
this.jobName = jobName;
this.jobId = jobId;
this.containerModels = containerModels;
Expand All @@ -140,6 +169,14 @@ public DiagnosticsManager(String jobName,
this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
this.scheduler = executorService;
this.autosizingEnabled = autosizingEnabled;
this.deploymentType = deploymentType;
this.apiType = apiType;
this.numContainers = numContainers;
this.hostAffinityEnabled = hostAffinityEnabled;
this.sspGrouperFactory = sspGrouperFactory;
this.containerRetryCount = containerRetryCount;
this.containerRetryWindowMs = containerRetryWindowMs;
this.maxConcurrency = maxConcurrency;

resetTime = Instant.now();
this.systemProducer.register(getClass().getSimpleName());
Expand Down Expand Up @@ -195,9 +232,12 @@ private class DiagnosticsStreamPublisher implements Runnable {
@Override
public void run() {
try {
Long maxJvmHeapMb = maxHeapSizeBytes / (1024 * 1024);
DiagnosticsStreamMessage diagnosticsStreamMessage =
new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli());
taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli(), deploymentType,
apiType, numContainers, containerMemoryMb, containerNumCores, containerThreadPoolSize, hostAffinityEnabled,
sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency, maxJvmHeapMb.intValue());

// Add job-related params to the message (if not already published)
if (!jobParamsEmitted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ public class DiagnosticsStreamMessage {
private final Map<String, Map<String, Object>> metricsMessage;

public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId,
String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) {
String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp,
String deploymentType, String apiType, int numContainers, int containerMemoryMb, int numCores, int threadPoolSize,
boolean hostAffinityEnabled, String sspGrouperFactory, int containerRetryCount,
long containerRetryWindowMs, int maxConcurrency, int maxJvmHeapMb) {

// Create the metricHeader
metricsHeader =
new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(),
taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp);
taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp, deploymentType, apiType, numContainers,
containerMemoryMb, numCores, threadPoolSize, hostAffinityEnabled, sspGrouperFactory,
containerRetryCount, containerRetryWindowMs, maxConcurrency, maxJvmHeapMb);

this.metricsMessage = new HashMap<>();
}
Expand Down Expand Up @@ -237,7 +242,13 @@ public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(Metrics
metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(),
metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(),
metricsSnapshot.getHeader().getResetTime());
metricsSnapshot.getHeader().getResetTime(), metricsSnapshot.getHeader().getDeploymentType(),
metricsSnapshot.getHeader().getApiType(), metricsSnapshot.getHeader().getNumContainers(),
metricsSnapshot.getHeader().getContainerMemoryMb(), metricsSnapshot.getHeader().getContainerCpuCores(),
metricsSnapshot.getHeader().getContainerThreadPoolSize(), metricsSnapshot.getHeader().getHostAffinity(),
metricsSnapshot.getHeader().getSspGrouper(), metricsSnapshot.getHeader().getMaxContainerRetryCount(),
metricsSnapshot.getHeader().getContainerRetryWindowMs(), metricsSnapshot.getHeader().getTaskMaxConcurrency(),
metricsSnapshot.getHeader().getMaxJvmHeapMb());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a new separate constructor for DiagnosticsStreamMessage that takes a metricsHeader as input?


Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
Map<String, Object> diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,19 @@ object MetricsHeader {
map.get("samza-version").toString,
map.get("host").toString,
map.get("time").asInstanceOf[Number].longValue,
map.get("reset-time").asInstanceOf[Number].longValue)
map.get("reset-time").asInstanceOf[Number].longValue,
map.get("deployment-type").toString,
map.get("api-type").toString,
map.get("num-containers").asInstanceOf[Number].intValue(),
map.get("container-memory-mb").asInstanceOf[Number].intValue(),
map.get("container-cpu-cores").asInstanceOf[Number].intValue(),
map.get("container-thread-pool-size").asInstanceOf[Number].intValue(),
map.get("host-affinity").asInstanceOf[Boolean].booleanValue(),
map.get("ssp-grouper").toString,
map.get("max-container-retry-count").asInstanceOf[Number].intValue(),
map.get("container-retry-window-ms").asInstanceOf[Number].longValue(),
map.get("task-max-concurrency").asInstanceOf[Number].intValue(),
map.get("max-jvm-heap-mb").asInstanceOf[Number].intValue())
}
}

Expand All @@ -52,7 +64,19 @@ class MetricsHeader(
@BeanProperty val samzaVersion: String,
@BeanProperty val host: String,
@BeanProperty val time: Long,
@BeanProperty val resetTime: Long) {
@BeanProperty val resetTime: Long,
@BeanProperty val deploymentType: String,
@BeanProperty val apiType: String,
@BeanProperty val numContainers: Int,
@BeanProperty val containerMemoryMb: Int,
@BeanProperty val containerCpuCores: Int,
@BeanProperty val containerThreadPoolSize: Int,
@BeanProperty val hostAffinity: Boolean,
@BeanProperty val sspGrouper: String,
@BeanProperty val maxContainerRetryCount: Int,
@BeanProperty val containerRetryWindowMs: Long,
@BeanProperty val taskMaxConcurrency: Int,
@BeanProperty val maxJvmHeapMb: Int) {

def getAsMap: Map[String, Object] = {
val map = new HashMap[String, Object]
Expand All @@ -66,6 +90,18 @@ class MetricsHeader(
map.put("host", host)
map.put("time", time: java.lang.Long)
map.put("reset-time", resetTime: java.lang.Long)
map.put("deployment-type", deploymentType)
map.put("api-type", apiType)
map.put("num-containers", numContainers: java.lang.Integer)
map.put("container-memory-mb", containerMemoryMb: java.lang.Integer)
map.put("container-cpu-cores", containerCpuCores: java.lang.Integer)
map.put("container-thread-pool-size", containerThreadPoolSize: java.lang.Integer)
map.put("host-affinity", hostAffinity: java.lang.Boolean)
map.put("ssp-grouper", sspGrouper)
map.put("max-container-retry-count", maxContainerRetryCount: java.lang.Integer)
map.put("container-retry-window-ms", containerRetryWindowMs: java.lang.Long)
map.put("task-max-concurrency", taskMaxConcurrency: java.lang.Integer)
map.put("max-jvm-heap-mb", maxJvmHeapMb: java.lang.Integer)
map
}
}
Loading