diff --git a/distribution/pom.xml b/distribution/pom.xml index deb8f6f4f892c..1b33e9e613fde 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -128,6 +128,12 @@ spark-iotdb-connector_2.12 ${project.version} + + org.apache.iotdb + sparkplugb-connector + ${project.version} + jar-with-dependencies + diff --git a/distribution/src/assembly/all.xml b/distribution/src/assembly/all.xml index dd95f2a4e73e7..cac0ca665daf3 100644 --- a/distribution/src/assembly/all.xml +++ b/distribution/src/assembly/all.xml @@ -106,5 +106,6 @@ common-files.xml + sparkplugb-connector.xml diff --git a/distribution/src/assembly/sparkplugb-connector.xml b/distribution/src/assembly/sparkplugb-connector.xml new file mode 100644 index 0000000000000..7cf1fc1d949ea --- /dev/null +++ b/distribution/src/assembly/sparkplugb-connector.xml @@ -0,0 +1,32 @@ + + + + + + + *:sparkplugb-connector:jar:jar-with-dependencies:* + + ${file.separator}ext${file.separator}mqtt + ${artifact.artifactId}-${artifact.classifier}.${artifact.extension} + + + diff --git a/iotdb-connector/pom.xml b/iotdb-connector/pom.xml index 14321833059e9..042018b8bbaa7 100644 --- a/iotdb-connector/pom.xml +++ b/iotdb-connector/pom.xml @@ -38,6 +38,7 @@ hive-connector spark-iotdb-connector spark-tsfile + sparkplugb-connector zeppelin-interpreter diff --git a/iotdb-connector/sparkplugb-connector/README.md b/iotdb-connector/sparkplugb-connector/README.md new file mode 100644 index 0000000000000..e42d405d5ece7 --- /dev/null +++ b/iotdb-connector/sparkplugb-connector/README.md @@ -0,0 +1,58 @@ + + +# Sparkplug B 1.0 Connector + +## Installing the plugin + +In the Apache IoTDB directory, create a directory `ext/mqtt`. + +Copy the `jar` of thit plugin into this directory. + +Edit the `conf/iotdb-common.properties` file to contain the following configuration: + + #################### + ### MQTT Broker Configuration + #################### + + # whether to enable the mqtt service. + # Datatype: boolean + enable_mqtt_service=true + + # the mqtt service binding host. + # Datatype: String + mqtt_host=127.0.0.1 + + # the mqtt service binding port. + # Datatype: int + mqtt_port=1883 + + # the handler pool size for handing the mqtt messages. + # Datatype: int + mqtt_handler_pool_size=1 + + # the mqtt message payload formatter. + # Datatype: String + mqtt_payload_formatter=spB1.0 + + # max length of mqtt message in byte + # Datatype: int + mqtt_max_message_size=1048576 diff --git a/iotdb-connector/sparkplugb-connector/pom.xml b/iotdb-connector/sparkplugb-connector/pom.xml new file mode 100644 index 0000000000000..1a9a261784f76 --- /dev/null +++ b/iotdb-connector/sparkplugb-connector/pom.xml @@ -0,0 +1,127 @@ + + + + 4.0.0 + + org.apache.iotdb + iotdb-connector + 1.3.0-SNAPSHOT + + sparkplugb-connector + IoTDB: Connector: SparkplugB + + 3.24.2 + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + + + + get-unity + generate-sources + + wget + + + https://raw.githubusercontent.com/eclipse/tahu/master/sparkplug_b/sparkplug_b.proto + ${project.build.directory}/dependency + + + + + + com.github.os72 + protoc-jar-maven-plugin + 3.11.4 + + + generate-sources + + run + + + all + direct + ${proto.version} + + ${project.build.directory}/dependency + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + + package + + + single + + + + + + + + + + + + + + org.apache.iotdb + tsfile + 1.3.0-SNAPSHOT + provided + + + org.apache.iotdb + iotdb-server + 1.3.0-SNAPSHOT + provided + + + com.google.protobuf + protobuf-java + ${proto.version} + + + io.netty + netty-buffer + + + diff --git a/iotdb-connector/sparkplugb-connector/src/main/java/org/apache/iotdb/connector/sparkplugb/SparkplugBPayloadFormatter.java b/iotdb-connector/sparkplugb-connector/src/main/java/org/apache/iotdb/connector/sparkplugb/SparkplugBPayloadFormatter.java new file mode 100644 index 0000000000000..7a724425ab3e8 --- /dev/null +++ b/iotdb-connector/sparkplugb-connector/src/main/java/org/apache/iotdb/connector/sparkplugb/SparkplugBPayloadFormatter.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.connector.sparkplugb; + +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatterV2; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.buffer.ByteBuf; +import org.eclipse.tahu.protobuf.SparkplugBProto; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * The SparkplugBPayloadFormatter is a special form of JSONPayloadFormatter that is able to process + * messages in the SparkplugB format. + */ +public class SparkplugBPayloadFormatter implements PayloadFormatterV2 { + + private static final BigInteger TWO_64 = BigInteger.ONE.shiftLeft(64); + + @Override + public String getName() { + return "spB1.0"; + } + + @Override + public List format(ByteBuf rawPayload) { + throw new RuntimeException("SparkplugBPayloadFormatter needs V2 API"); + } + + @Override + public List format(String topic, ByteBuf rawPayload) { + if (rawPayload == null) { + return new ArrayList<>(); + } + try { + byte[] bytes = new byte[rawPayload.readableBytes()]; + rawPayload.duplicate().readBytes(bytes); + SparkplugBProto.Payload payload = SparkplugBProto.Payload.parseFrom(bytes); + return processMessage(topic, payload); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Error parsing protobuf payload", e); + } + } + + private List processMessage(String topic, SparkplugBProto.Payload payload) { + Map messages = new TreeMap<>(); + // Replace the prefix of "spB1.0" and replace that with "root" and replace all + // segments of the MQTT topic path with escaped versions for IoTDB + String deviceId = + "root.`" + + topic.substring(topic.indexOf("/") + 1).replace("/", "`.`").replace(" ", "_") + + "`"; + long messageTimestamp = payload.getTimestamp(); + for (SparkplugBProto.Payload.Metric metric : payload.getMetricsList()) { + // Get the timestamp for the current measurement, if none exists, use that of the + // message. + long measurementTimestamp = messageTimestamp; + if (metric.hasTimestamp()) { + measurementTimestamp = metric.getTimestamp(); + } + String measurementName = metric.getName(); + int sparkplugBDataType = metric.getDatatype(); + TSDataType measurementDataType = getTSDataTypeForSparkplugBDataType(sparkplugBDataType); + Object measurementValue = getValueForMetric(metric, sparkplugBDataType); + // Some types are not yet implemented ... we need to skip them. + if (measurementName.isEmpty() || measurementValue == null) { + continue; + } + + // Group together measurements of the same time. + if (!messages.containsKey(measurementTimestamp)) { + Message newMessage = new Message(); + newMessage.setTimestamp(measurementTimestamp); + newMessage.setDevice(deviceId); + newMessage.setMeasurements(new ArrayList<>()); + newMessage.setDataTypes(new ArrayList<>()); + newMessage.setValues(new ArrayList<>()); + messages.put(measurementTimestamp, newMessage); + } + Message curMessage = messages.get(measurementTimestamp); + curMessage.getMeasurements().add(measurementName); + curMessage.getDataTypes().add(measurementDataType); + curMessage.getValues().add(measurementValue); + } + return new ArrayList<>(messages.values()); + } + + /** + * Map the data types used in Sparkplug to the ones used in IoTDB. + * + * @param sparkplugBDataType string representation of the sparkplug datatype + * @return TSDataType used in IoTDB + */ + protected TSDataType getTSDataTypeForSparkplugBDataType(int sparkplugBDataType) { + SparkplugBProto.DataType dataType = SparkplugBProto.DataType.forNumber(sparkplugBDataType); + if (dataType == null) { + return TSDataType.TEXT; + } + switch (dataType) { + case Bytes: + case File: + case String: + case Text: + case Unknown: + case UUID: + return TSDataType.TEXT; + case Int8: + case Int16: + case Int32: + case UInt8: + case UInt16: + return TSDataType.INT32; + case Int64: + case UInt32: + case UInt64: + case DateTime: + return TSDataType.INT64; + case Float: + return TSDataType.FLOAT; + case Double: + return TSDataType.DOUBLE; + case Boolean: + return TSDataType.BOOLEAN; + case DataSet: + // A metric can be thought of as an array of values for a set of fields. + // TODO: This is a pretty complex datatype ... + case Template: + // TODO: This is a pretty complex datatype ... + } + return TSDataType.TEXT; + } + + protected Object getValueForMetric( + SparkplugBProto.Payload.Metric metric, int sparkplugBDataType) { + SparkplugBProto.DataType dataType = SparkplugBProto.DataType.forNumber(sparkplugBDataType); + if (dataType == null) { + throw new RuntimeException("Datatype not provided"); + } + switch (dataType) { + case Bytes: + case File: + case Unknown: + // TODO: Implement this ... + return null; + case String: + case Text: + case UUID: + return Binary.valueOf(metric.getStringValue()); + case Int8: + case Int16: + case Int32: + return metric.getIntValue(); + case UInt8: + return metric.getIntValue() & 0x000000FF; + case UInt16: + return metric.getIntValue() & 0x0000FFFF; + case Int64: + case DateTime: + return metric.getLongValue(); + case UInt32: + return metric.getLongValue() & 0x00000000FFFFFFFFL; + case UInt64: + /*BigInteger b = BigInteger.valueOf(metric.getLongValue()); + if (b.signum() < 0) { + b = b.add(TWO_64); + }*/ + // TODO: Fix this ... this is not quite correct. + return metric.getLongValue(); + case Float: + return metric.getFloatValue(); + case Double: + return metric.getDoubleValue(); + case Boolean: + return metric.getBooleanValue(); + case DataSet: + // A metric can be thought of as an array of values for a set of fields. + // TODO: Implement this ... + return null; + case Template: + // TODO: This is a pretty complex datatype ... + return null; + } + throw new RuntimeException("No idea how to read this"); + } +} diff --git a/iotdb-connector/sparkplugb-connector/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/iotdb-connector/sparkplugb-connector/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter new file mode 100644 index 0000000000000..74bf378694072 --- /dev/null +++ b/iotdb-connector/sparkplugb-connector/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.iotdb.connector.sparkplugb.SparkplugBPayloadFormatter diff --git a/iotdb-connector/sparkplugb-connector/src/test/java/org/apache/iotdb/connector/sparkplugb/SparkplugBPayloadFormatterTest.java b/iotdb-connector/sparkplugb-connector/src/test/java/org/apache/iotdb/connector/sparkplugb/SparkplugBPayloadFormatterTest.java new file mode 100644 index 0000000000000..80c4364dbd510 --- /dev/null +++ b/iotdb-connector/sparkplugb-connector/src/test/java/org/apache/iotdb/connector/sparkplugb/SparkplugBPayloadFormatterTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.connector.sparkplugb; + +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.eclipse.tahu.protobuf.SparkplugBProto; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SparkplugBPayloadFormatterTest { + + @Test + public void formatJson() { + String topic = "spBv1.0/Group ID/Message Type/Edge Node ID/Device ID"; + SparkplugBProto.Payload payload = + SparkplugBProto.Payload.newBuilder() + .setTimestamp(1479123452194L) + .setSeq(2L) + .addMetrics( + SparkplugBProto.Payload.Metric.newBuilder() + .setName("My Metric") + .setAlias(1) + .setTimestamp(1479123452194L) + .setDatatype(SparkplugBProto.DataType.String.getNumber()) + .setStringValue("Test") + .build()) + .build(); + ByteBuf buf = Unpooled.copiedBuffer(payload.toByteArray()); + + SparkplugBPayloadFormatter formatter = new SparkplugBPayloadFormatter(); + Message message = formatter.format(topic, buf).get(0); + + assertEquals("root.`Group ID`.`Message Type`.`Edge Node ID`.`Device ID`", message.getDevice()); + assertEquals(Long.valueOf(1479123452194L), message.getTimestamp()); + assertEquals("My Metric", message.getMeasurements().get(0)); + assertEquals(TSDataType.TEXT, message.getDataTypes().get(0)); + assertEquals("Test", message.getValues().get(0)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java index 524b0a4718ded..808387a6bac57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java @@ -100,7 +100,7 @@ private List formatBatchJson(JsonObject jsonObject) { List timestamps = GSON.fromJson( jsonObject.get(JSON_KEY_TIMESTAMPS), new TypeToken>() {}.getType()); - List> values = + List> values = GSON.fromJson( jsonObject.get(JSON_KEY_VALUES), new TypeToken>>() {}.getType()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java index a23df1d55d8a7..bfaacb2f535c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java @@ -33,7 +33,6 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; -import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -65,10 +64,13 @@ public class MPPPublishHandler extends AbstractInterceptHandler { private final IPartitionFetcher partitionFetcher; private final ISchemaFetcher schemaFetcher; + private final boolean useV2Api; + public MPPPublishHandler(IoTDBConfig config) { - this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter()); + payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter()); partitionFetcher = ClusterPartitionFetcher.getInstance(); schemaFetcher = ClusterSchemaFetcher.getInstance(); + useV2Api = (payloadFormat instanceof PayloadFormatterV2); } @Override @@ -119,7 +121,10 @@ public void onPublish(InterceptPublishMessage msg) { topic, payload); - List events = payloadFormat.format(payload); + List events = + (useV2Api) + ? ((PayloadFormatterV2) payloadFormat).format(topic, payload) + : payloadFormat.format(payload); if (events == null) { return; } @@ -136,20 +141,16 @@ public void onPublish(InterceptPublishMessage msg) { DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice())); statement.setTime(event.getTimestamp()); statement.setMeasurements(event.getMeasurements().toArray(new String[0])); - if (event.getDataTypes() == null) { + // If the formatter doesn't return the types, we need to infer them. + List dataTypes = event.getDataTypes(); + if (dataTypes == null) { statement.setDataTypes(new TSDataType[event.getMeasurements().size()]); - statement.setValues(event.getValues().toArray(new Object[0])); statement.setNeedInferType(true); } else { - List dataTypes = event.getDataTypes(); - List values = event.getValues(); - Object[] inferredValues = new Object[values.size()]; - for (int i = 0; i < values.size(); ++i) { - inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i)); - } statement.setDataTypes(dataTypes.toArray(new TSDataType[0])); - statement.setValues(inferredValues); } + Object[] values = event.getValues().toArray(new Object[0]); + statement.setValues(values); statement.setAligned(false); tsStatus = AuthorityChecker.checkAuthority(statement, session); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java index f0eed5a56609e..2f5fce65ba054 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java @@ -28,7 +28,7 @@ public class Message { private Long timestamp; private List measurements; private List dataTypes; - private List values; + private List values; public String getDevice() { return device; @@ -62,11 +62,11 @@ public void setDataTypes(List dataTypes) { this.dataTypes = dataTypes; } - public List getValues() { + public List getValues() { return values; } - public void setValues(List values) { + public void setValues(List values) { this.values = values; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java index bccd1b865c482..1783293db2076 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java @@ -78,7 +78,9 @@ private static void makeMqttPluginDir() throws IOException { } private static void buildMqttPluginMap() throws IOException { - ServiceLoader payloadFormatters = ServiceLoader.load(PayloadFormatter.class); + // Initialize the list with all built-in PayloadFormatters. + ServiceLoader payloadFormatters = + ServiceLoader.load(PayloadFormatter.class, PayloadFormatManager.class.getClassLoader()); for (PayloadFormatter formatter : payloadFormatters) { if (formatter == null) { logger.error("PayloadFormatManager(), formatter is null."); @@ -90,6 +92,7 @@ private static void buildMqttPluginMap() throws IOException { logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName); } + // Add all PayloadFormatters defined in the mqtt plugin directory. URL[] jarURLs = getPluginJarURLs(mqttDir); logger.debug("MQTT Plugin jarURLs: {}", jarURLs); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatterV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatterV2.java new file mode 100644 index 0000000000000..fe5b14692bc11 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatterV2.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.protocol.mqtt; + +import io.netty.buffer.ByteBuf; + +import java.util.List; + +/** + * PayloadFormatterV2 enhanced interface for PayloadFormatters + * + *

This is a SPI interface. + * + * @see PayloadFormatterV2 + */ +public interface PayloadFormatterV2 extends PayloadFormatter { + /** + * format a payload to a list of messages + * + * @param topic + * @param payload + * @return + */ + List format(String topic, ByteBuf payload); +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java index 082225984cae8..0eb169cc640f1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class JSONPayloadFormatterTest { @@ -45,7 +46,8 @@ public void formatJson() { assertEquals("root.sg.d1", message.getDevice()); assertEquals(Long.valueOf(1586076045524L), message.getTimestamp()); assertEquals("s1", message.getMeasurements().get(0)); - assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0); + assertTrue(message.getValues().get(0) instanceof String); + assertEquals(0.530635D, Double.parseDouble((String) message.getValues().get(0)), 0); } @Test @@ -66,7 +68,8 @@ public void formatBatchJson() { assertEquals("root.sg.d1", message.getDevice()); assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); assertEquals("s2", message.getMeasurements().get(1)); - assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0); + assertTrue(message.getValues().get(0) instanceof String); + assertEquals(0.530695D, Double.parseDouble((String) message.getValues().get(1)), 0); } @Test @@ -95,7 +98,8 @@ public void formatJsonArray() { assertEquals("root.sg.d2", message.getDevice()); assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); assertEquals("s3", message.getMeasurements().get(0)); - assertEquals(0.530655D, Double.parseDouble(message.getValues().get(0)), 0); + assertTrue(message.getValues().get(0) instanceof String); + assertEquals(0.530655D, Double.parseDouble((String) message.getValues().get(0)), 0); } @Test @@ -124,6 +128,7 @@ public void formatBatchJsonArray() { assertEquals("root.sg.d2", message.getDevice()); assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); assertEquals("s4", message.getMeasurements().get(1)); - assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0); + assertTrue(message.getValues().get(0) instanceof String); + assertEquals(0.530695D, Double.parseDouble((String) message.getValues().get(1)), 0); } } diff --git a/pom.xml b/pom.xml index 7479cc7ade419..0260a3b96e475 100644 --- a/pom.xml +++ b/pom.xml @@ -1123,6 +1123,7 @@ linux-x86_64 + linux-x64_64 Unix Makefiles thrift @@ -1138,6 +1139,7 @@ linux-aarch64 + linux-aarch_64 Unix Makefiles thrift @@ -1152,6 +1154,7 @@ mac-x86_64 + osx-x86_64 Unix Makefiles thrift /usr/local/include @@ -1167,6 +1170,7 @@ mac-aarch64 + osx-aarch_64 Unix Makefiles thrift /opt/homebrew/opt/boost/include @@ -1182,6 +1186,7 @@ windows-x86_64 + windows-x86_64 Visual Studio 17 2022 Release/thrift.exe @@ -1196,6 +1201,7 @@ windows-aarch64 + windows-aarch_64 Visual Studio 17 2022 Release/thrift.exe