/
AbstractKapuaConverter.java
121 lines (111 loc) · 5.85 KB
/
AbstractKapuaConverter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*******************************************************************************
* Copyright (c) 2011, 2020 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eurotech - initial API and implementation
* Red Hat Inc
*******************************************************************************/
package org.eclipse.kapua.broker.core.converter;
import com.codahale.metrics.Counter;
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.impl.DefaultMessage;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.broker.core.listener.CamelConstants;
import org.eclipse.kapua.broker.core.message.CamelKapuaMessage;
import org.eclipse.kapua.broker.core.message.CamelUtil;
import org.eclipse.kapua.broker.core.message.JmsUtil;
import org.eclipse.kapua.broker.core.message.MessageConstants;
import org.eclipse.kapua.broker.core.plugin.ConnectorDescriptor;
import org.eclipse.kapua.broker.core.plugin.ConnectorDescriptor.MessageType;
import org.eclipse.kapua.commons.metric.MetricServiceFactory;
import org.eclipse.kapua.commons.metric.MetricsService;
import org.eclipse.kapua.model.id.KapuaId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.Base64;
import java.util.Date;
/**
* Kapua message converter reference implementation used to convert from Camel incoming messages ({@link JmsMessage}) to a platform specific message type.
*
* @since 1.0
*/
public abstract class AbstractKapuaConverter {
public static final Logger logger = LoggerFactory.getLogger(AbstractKapuaConverter.class);
// metrics
protected static final String METRIC_MODULE_NAME = "converter";
protected static final String METRIC_COMPONENT_NAME = "kapua";
protected static final MetricsService METRICS_SERVICE = MetricServiceFactory.getInstance();
private final Counter metricConverterJmsMessage;
private final Counter metricConverterJmsErrorMessage;
private final Counter metricConverterErrorMessage;
/**
* Constructor
*/
protected AbstractKapuaConverter() {
metricConverterJmsMessage = METRICS_SERVICE.getCounter(METRIC_MODULE_NAME, METRIC_COMPONENT_NAME, "jms", "message", "count");
metricConverterJmsErrorMessage = METRICS_SERVICE.getCounter(METRIC_MODULE_NAME, METRIC_COMPONENT_NAME, "jms", "message", "error", "count");
metricConverterErrorMessage = METRICS_SERVICE.getCounter(METRIC_MODULE_NAME, METRIC_COMPONENT_NAME, "kapua_message", "message", "error", "count");
}
/**
* Convert incoming message to a Kapua message (depending on messageType parameter)
*
* @param exchange
* @param value
* @param messageType expected incoming message type
* @return Message container that contains message of asked type
* @throws KapuaException if incoming message does not contain a javax.jms.BytesMessage or an error during conversion occurred
*/
protected CamelKapuaMessage<?> convertTo(Exchange exchange, Object value, MessageType messageType) throws KapuaException {
if (value instanceof byte[]) {
byte[] messageContent = (byte[]) value;
if (exchange.getIn() instanceof DefaultMessage) {
DefaultMessage message = (DefaultMessage) exchange.getIn();
try {
// FIX #164
Date queuedOn = new Date(message.getHeader(CamelConstants.JMS_HEADER_TIMESTAMP, Long.class));
KapuaId connectionId = (KapuaId) SerializationUtils.deserialize(Base64.getDecoder().decode(message.getHeader(MessageConstants.HEADER_KAPUA_CONNECTION_ID, String.class)));
String clientId = message.getHeader(MessageConstants.HEADER_KAPUA_CLIENT_ID, String.class);
ConnectorDescriptor connectorDescriptor = (ConnectorDescriptor) SerializationUtils
.deserialize(Base64.getDecoder().decode(message.getHeader(MessageConstants.HEADER_KAPUA_CONNECTOR_DEVICE_PROTOCOL, String.class)));
return JmsUtil.convertToCamelKapuaMessage(connectorDescriptor, messageType, messageContent, CamelUtil.getTopic(message), queuedOn, connectionId, clientId);
} catch (JMSException e) {
metricConverterErrorMessage.inc();
logger.error("Exception converting message {}", e.getMessage(), e);
throw KapuaException.internalError(e, "Cannot convert the message type " + exchange.getIn().getClass());
}
}
}
metricConverterErrorMessage.inc();
throw KapuaException.internalError("Cannot convert the message - Wrong instance type: " + exchange.getIn().getClass());
}
/**
* Convert incoming message to a javax.jms.Message
*
* @param exchange
* @param value
* @return jms Message
* @throws KapuaException if incoming message does not contain a javax.jms.BytesMessage
*/
@Converter
public Message convertToJmsMessage(Exchange exchange, Object value) throws KapuaException {
metricConverterJmsMessage.inc();
// assume that the message is a Camel Jms message
JmsMessage message = exchange.getIn(JmsMessage.class);
if (message.getJmsMessage() instanceof BytesMessage) {
return message.getJmsMessage();
}
metricConverterJmsErrorMessage.inc();
throw KapuaException.internalError("Cannot convert the message - Wrong instance type: " + exchange.getIn().getClass());
}
}