Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Merge pull request #612 from tsegismont/jira/HAWKULAR-751
Browse files Browse the repository at this point in the history
HAWKULAR-751 Avail Creator: server runs out of memory under heavy load
  • Loading branch information
pilhuhn committed Oct 27, 2015
2 parents 4bc85ba + 05afc5f commit d20c8ab
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.hawkular.component.availcreator;

import static javax.ejb.TransactionAttributeType.NOT_SUPPORTED;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.ejb.Asynchronous;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
Expand All @@ -40,6 +42,7 @@
* @author Heiko W. Rupp
*/
@Stateless
@TransactionAttribute(NOT_SUPPORTED)
public class AvailPublisher {

private static final String METRICS_BASE_URI;
Expand All @@ -49,11 +52,7 @@ public class AvailPublisher {
METRICS_BASE_URI = "http://"+ host + ":"+ port + "/hawkular/metrics";
}

// Avoid concurrent Asynchronous calls to REST services. There seems to be a serious issue with undertow and
// concurrent async calls, which hangs the thread. (note - this is a pooled MDB, not a singleton)
//@Asynchronous
// I don't think we need to propagate the Tx here, just make the rest call outside of a Tx.
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@Asynchronous
public void sendToMetricsViaRest(List<SingleAvail> availabilities) {
// Send it to metrics via rest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
*/
package org.hawkular.component.availcreator;

import static javax.ejb.TransactionAttributeType.NOT_SUPPORTED;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.ConnectionFactory;
import javax.ejb.TransactionAttribute;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
Expand All @@ -51,13 +54,10 @@
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "HawkularMetricData")
})
@TransactionAttribute(value = NOT_SUPPORTED)
public class MetricReceiver implements MessageListener {

@javax.annotation.Resource(lookup = "java:/topic/HawkularAvailData")
javax.jms.Topic topic;

@javax.annotation.Resource(lookup = "java:/HawkularBusConnectionFactory")
ConnectionFactory connectionFactory;
private final ObjectMapper objectMapper = new ObjectMapper();

@EJB
AvailPublisher availPublisher;
Expand All @@ -67,31 +67,34 @@ public void onMessage(Message message) {

try {
String payload = ((TextMessage) message).getText();
Map map = new ObjectMapper().readValue(payload, Map.class);
JsonNode rootNode = objectMapper.readTree(payload);

Map metricDataMap = (Map) map.get("metricData");
JsonNode metricData = rootNode.get("metricData");
// Get <rid>.status.code metrics
String tenant = (String) metricDataMap.get("tenantId");
List<Map<String, Object>> inputList = (List<Map<String, Object>>) metricDataMap.get("data");
String tenant = metricData.get("tenantId").textValue();
JsonNode data = metricData.get("data");
List<SingleAvail> outer = new ArrayList<>();

for (Map<String, Object> item : inputList) {
Iterator<JsonNode> items = data.elements();
while (items.hasNext()) {
JsonNode item = items.next();

String source = (String) item.get("source");
String source = item.get("source").textValue();
if (source.endsWith(".status.code")) {
double codeD = (double) item.get("value");
int code = (int) codeD;
int code = item.get("value").intValue();

String id = source.substring(0, source.indexOf("."));
long timestamp = (long) item.get("timestamp");
long timestamp = item.get("timestamp").longValue();

String avail = computeAvail(code);

SingleAvail ar = new SingleAvail(tenant, id, timestamp, avail);
outer.add(ar);
}
}
availPublisher.sendToMetricsViaRest(outer);
if (!outer.isEmpty()) {
availPublisher.sendToMetricsViaRest(outer);
}

} catch (Exception e) {
Log.LOG.eCouldNotHandleBusMessage(e);
Expand Down

0 comments on commit d20c8ab

Please sign in to comment.