Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1956 Graphite Plugin for monitoring
Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: "Pavan Kumar Kolamuri <pavan.kolamuri@gmail.com>, Srikanth Sundarrajan <sriksun@hotmail.com>"

Closes #141 from PraveenAdlakha/codahale
  • Loading branch information
PraveenAdlakha authored and pavankumar526 committed May 17, 2016
1 parent a31fa54 commit c59aa1070b9445227b41ad0ba04ab23ed8885689
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 1 deletion.
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.metrics;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import org.apache.falcon.FalconException;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.StartupProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Service for metrics notification.
*/
public class MetricNotificationService implements FalconService {
private static final Logger LOG = LoggerFactory.getLogger(MetricNotificationService.class);

public static final String SERVICE_NAME = MetricNotificationService.class.getSimpleName();
private static final MetricNotificationService METRIC_NOTIFICATION_SERVICE = new MetricNotificationService();
private final GraphiteReporter graphiteReporter;
private final MetricRegistry metricRegistry;

private Map<String, MetricGauge> metricMap = new ConcurrentHashMap<>();

public static MetricNotificationService get(){
return METRIC_NOTIFICATION_SERVICE;
}

public MetricNotificationService(){
Graphite graphite = new Graphite(new InetSocketAddress(StartupProperties
.get().getProperty("falcon.graphite.hostname"), Integer.parseInt(StartupProperties.get()
.getProperty("falcon.graphite.port"))));
metricRegistry=new MetricRegistry();
this.graphiteReporter = GraphiteReporter.forRegistry(metricRegistry)
.convertDurationsTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL)
.build(graphite);
}

@Override
public String getName() {
return SERVICE_NAME;
}

@Override
public void init() throws FalconException {
LOG.info("Starting Graphite Service");
graphiteReporter.start(Long.parseLong(StartupProperties.get().getProperty("falcon.graphite.frequency")),
TimeUnit.SECONDS);
}

@Override
public void destroy() throws FalconException {
graphiteReporter.stop();
}

private MetricGauge createMetric(final String metricName){
if (!metricMap.containsKey(metricName)) {
MetricGauge metricGauge = new MetricGauge();
metricMap.put(metricName, metricGauge);
metricRegistry.register(metricName, metricGauge);
}
return metricMap.get(metricName);
}

public void publish(String metricsName, Long value){
synchronized(this){
createMetric(metricsName).setValue(value);
}
}

private static class MetricGauge implements Gauge<Long> {

private Long value=0L;
public void setValue(Long value){
this.value=value;
}

@Override
public Long getValue() {
return value;
}
}
}
@@ -45,6 +45,8 @@
org.apache.falcon.service.ProxyUserService,\
org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.extensions.ExtensionService
##Add if you want to send data to graphite
# org.apache.falcon.metrics.MetricNotificationService\
## Add if you want to use Falcon Azure integration ##
# org.apache.falcon.adfservice.ADFProviderService
## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
@@ -310,3 +312,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
#*.falcon.statestore.create.db.schema=true

# Graphite properties
#*.falcon.graphite.hostname=localhost
#*.falcon.graphite.port=2003
#*.falcon.graphite.frequency=1
#*.falcon.graphite.prefix=falcon
@@ -92,5 +92,15 @@
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>

</dependencies>
</project>
18 pom.xml
@@ -1164,6 +1164,24 @@
<version>1.1.3</version>
</dependency>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
<exclusions>
<exclusion>
<groupId>org.acplt</groupId>
<artifactId>oncrpc</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
<version>3.0.2</version>
</dependency>

</dependencies>

</dependencyManagement>
@@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.plugin;

import org.apache.commons.lang.StringUtils;
import org.apache.falcon.aspect.ResourceMessage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.metrics.MetricNotificationService;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.joda.time.DateTime;
import org.joda.time.Seconds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Graphite Notification Plugin.
*/
public class GraphiteNotificationPlugin implements MonitoringPlugin {

private static final Logger LOG = LoggerFactory.getLogger(GraphiteNotificationPlugin.class);

@Override
public void monitor(ResourceMessage message) {
MetricNotificationService metricNotificationService =
Services.get().getService(MetricNotificationService.SERVICE_NAME);
try {
String entityType = message.getDimensions().get("entity-type");
String entityName = message.getDimensions().get("entity-name");
String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix");
if (entityType.equals(EntityType.PROCESS.name())) {
Entity entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
Process process = (Process) entity;
String pipeline = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines() : "default";


if ((message.getAction().equals("wf-instance-succeeded"))) {
Long timeTaken = message.getExecutionTime() / 1000000000;
String metricsName = prefix + message.getDimensions().get("cluster") + pipeline
+ ".GENERATE." + entityName + ".processing_time";
metricNotificationService.publish(metricsName, timeTaken);

DateTime nominalTime = new DateTime(message.getDimensions().get("nominal-time"));
DateTime startTime = new DateTime(message.getDimensions().get("start-time"));
metricsName = prefix + message.getDimensions().get("cluster") + pipeline
+ ".GENERATE." + entityName + ".start_delay";
metricNotificationService.publish(metricsName,
(long)Seconds.secondsBetween(nominalTime, startTime).getSeconds());
}

if (message.getAction().equals("wf-instance-failed")){
String metricName = prefix + message.getDimensions().get("cluster") + pipeline
+ ".GENERATE." + entityName + ".failure"
+ message.getDimensions().get("error-message");
metricNotificationService.publish(metricName, (long) 1);
}
}
} catch (Exception e) {
LOG.error("Exception in sending metrics to Graphite:", e);
}
}
}
@@ -42,7 +42,6 @@
*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
org.apache.falcon.service.ProcessSubscriberService,\
org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.service.FeedSLAMonitoringService,\
org.apache.falcon.service.LifecyclePolicyMap,\
org.apache.falcon.entity.store.ConfigurationStore,\
@@ -53,6 +52,8 @@
org.apache.falcon.service.GroupsService,\
org.apache.falcon.service.ProxyUserService,\
org.apache.falcon.extensions.ExtensionService
##Add if you want to send data to graphite
# org.apache.falcon.metrics.MetricNotificationService\
## Add if you want to use Falcon Azure integration ##
# org.apache.falcon.adfservice.ADFProviderService
## If you wish to use Falcon native scheduler uncomment out below application services and comment out above application services ##
@@ -80,6 +81,7 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
org.apache.falcon.entity.store.ConfigurationStore



# List of Lifecycle policies configured.
*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
# List of builders for the policies.
@@ -309,3 +311,8 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# Setting monitoring plugin, if SMTP parameters is defined
#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
# org.apache.falcon.plugin.EmailNotificationPlugin
# Graphite properties
#*.falcon.graphite.hostname=localhost
#*.falcon.graphite.port=2003
#*.falcon.graphite.frequency=1
#*.falcon.graphite.prefix=falcon

0 comments on commit c59aa10

Please sign in to comment.