Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-1796]DataProxy support monitor indicator with JMX. #1796 #1797

Merged
merged 11 commits into from
Nov 18, 2021
Merged

[INLONG-1796]DataProxy support monitor indicator with JMX. #1796 #1797

merged 11 commits into from
Nov 18, 2021

Conversation

luchunliang
Copy link
Contributor

@luchunliang luchunliang commented Nov 15, 2021

Fixes #1796

Motivation

[Feature]DataProxy provide monitor indicator based on JMX, user can implement the code that read the metrics and report to user-defined monitor system.

DataProxy provide monitor indicator based on JMX, user can implement the code that read the metrics and report to user-defined monitor system.
Source-module and Sink-module can add monitor metric class that is the subclass of org.apache.inlong.commons.config.metrics.MetricItemSet, and register it to MBeanServer.
User-defined plugin can get module metric with JMX, and report metric data to different monitor system.

User can describe the configuration in the file "common.properties ".
For example:
metricDomains=DataProxy metricDomains.DataProxy.domainListeners=com.tencent.pcg.atta.dataproxy.metrics.m007.M007MetricListener org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener metricDomains.DataProxy.snapshotInterval=60000

The JMX domain name of DataProxy is "DataProxy".
It is defined by the parameter "metricDomains".
The listeners of JMX domain is defined by the parameter "metricDomains.$domainName.domainListeners".
The class names of the listeners is separated by the space char.
The listener class need to implement the interface "org.apache.inlong.dataproxy.metrics.MetricListener".
The snapshot interval of the listeners is defined by the parameter "metricDomains.$domainName.snapshotInterval", the parameter unit is "millisecond".

The method proto of org.apache.inlong.dataproxy.metrics.MetricListener is:
public void snapshot(String domain, List itemValues);

The field of MetricItemValue.dimensions has these key(The fields of DataProxyMetricItem defined by the Annotation "@Dimension"):

  • public String clusterId;
  • public String sourceId;
  • public String sourceDataId;
  • public String inlongGroupId;
  • public String inlongStreamId;
  • public String sinkId;
  • public String sinkDataId;

The field of MetricItemValue.metrics has these key(The fields of DataProxyMetricItem defined by the Annotation "@CountMetric"):

  • readSuccessCount
  • readSuccessSize
  • readSuccessCount
  • readSuccessSize
  • readFailCount
  • readFailSize
  • sendCount
  • sendSize
  • sendSuccessCount
  • sendSuccessSize
  • sendFailCount
  • sendFailSize
  • sinkDuration, the unit is millisecond, the duration is between current timepoint and the timepoint in sending to sink destination.
  • nodeDuration, the unit is millisecond, the duration is between current timepoint and the timepoint in getting event from source.
  • wholeDuration, the unit is millisecond, the duration is between current timepoint and the timepoint in generating event.

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@codecov-commenter
Copy link

codecov-commenter commented Nov 15, 2021

Codecov Report

Merging #1797 (ec31078) into master (d0279f6) will decrease coverage by 0.24%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1797      +/-   ##
============================================
- Coverage     12.18%   11.94%   -0.25%     
  Complexity     1049     1049              
============================================
  Files           396      399       +3     
  Lines         32833    33466     +633     
  Branches       5149     5276     +127     
============================================
- Hits           4002     3997       -5     
- Misses        28065    28701     +636     
- Partials        766      768       +2     
Impacted Files Coverage Δ
.../java/org/apache/flume/sink/tubemq/TubemqSink.java 51.42% <0.00%> (-4.00%) ⬇️
...tubemq/client/factory/TubeMultiSessionFactory.java 42.30% <0.00%> (-1.70%) ⬇️
...ubemq/client/factory/TubeSingleSessionFactory.java 34.21% <0.00%> (-0.93%) ⬇️
...he/inlong/tubemq/client/config/ConsumerConfig.java 22.12% <0.00%> (-0.74%) ⬇️
.../tubemq/client/factory/TubeBaseSessionFactory.java 58.13% <0.00%> (-0.69%) ⬇️
...ache/inlong/tubemq/server/master/MasterConfig.java 32.63% <0.00%> (-0.50%) ⬇️
...rg/apache/inlong/tubemq/server/master/TMaster.java 0.00% <0.00%> (ø)
...g/apache/inlong/tubemq/client/common/PeerInfo.java 0.00% <0.00%> (ø)
...ster/nodemanage/nodeconsumer/ConsumeGroupInfo.java 0.00% <0.00%> (ø)
...aster/nodemanage/nodeconsumer/TopicConfigInfo.java
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d0279f6...ec31078. Read the comment docs.

@dockerzhang dockerzhang changed the title [Feature]DataProxy support monitor indicator with JMX. #1796 [INLONG-1796] DataProxy support monitor indicator with JMX. #1796 Nov 16, 2021
@dockerzhang dockerzhang requested review from gosonzhang and dockerzhang and removed request for gosonzhang November 16, 2021 02:19
@dockerzhang
Copy link
Contributor

@luchunliang please add some UTs to verify this feature.

@luchunliang luchunliang changed the title [INLONG-1796] DataProxy support monitor indicator with JMX. #1796 DataProxy support monitor indicator with JMX. #1796 Nov 16, 2021
@luchunliang luchunliang changed the title DataProxy support monitor indicator with JMX. #1796 [INLONG-1796]DataProxy support monitor indicator with JMX. #1796 Nov 16, 2021
Copy link
Contributor

@gosonzhang gosonzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

this.reload();
this.setReloadTimer();
} catch (Exception e) {
e.printStackTrace();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not print stacktrace direct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, print exception with log4j.

*/
private void setReloadTimer() {
reloadTimer = new Timer(true);
TimerTask task = new TimerTask() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimerTask is not recommend to use . use ScheduledExecutorService instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScheduledExecutorService is used to ThreadPool case, it is better for a lot of same task.
The reload task of CacheClusterConfigHolder is a single schedule task, single thread of Timer is better.

this.reload();
this.setReloadTimer();
} catch (Exception e) {
e.printStackTrace();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.printstacktrace is not suitable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, remove this line.

*/
protected Map<String, String> loadProperties(String fileName) {
Map<String, String> result = new ConcurrentHashMap<>();
InputStream inStream = null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend to use try-resource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, use try-with-resource mode.

*/
@Override
public List<CacheClusterConfig> load() {
String clusterNames = context.getString("cacheClusterConfig");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract to constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, extract to constant.

@Override
public List<IdTopicConfig> load() {
Map<String, String> idTopicMap = context.getSubProperties("idTopicConfig.");
List<IdTopicConfig> configList = new ArrayList<>(idTopicMap.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.getSubProperties() should not return null.
public ImmutableMap<String, String> getSubProperties(String prefix) {
Preconditions.checkArgument(prefix.endsWith("."),
"The given prefix does not end with a period (" + prefix + ")");
Map<String, String> result = Maps.newHashMap();
synchronized (parameters) {
for (Entry<String, String> entry : parameters.entrySet()) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
String name = key.substring(prefix.length());
result.put(name, entry.getValue());
}
}
}
return ImmutableMap.copyOf(result);
}


public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
private static final AtomicBoolean isInited = new AtomicBoolean(false);
private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(5);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should init with thead name for online trouble shooting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statExecutor is a thread pool. Runtime exception will print stack trace to log file.
It is enough to online trouble shooting based on log file.

try {
List<MetricItemValue> itemValues = this.getItemValues();
LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
// LOG.info("snapshot metric:{},size:{},contents:{}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove un-used code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, remove un-used code.

} finally {
tx.close();
}
// Channel channel = getChannel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove un-used code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, remove un-used code.

Copy link
Contributor

@dockerzhang dockerzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@gosonzhang gosonzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@aloyszhang aloyszhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@aloyszhang aloyszhang merged commit e117b3c into apache:master Nov 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature]DataProxy support monitor indicator with JMX.
6 participants