From 2ef279f61b4ab36099ebd7f65da9db8f3a5199f5 Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Mon, 22 Jul 2024 16:24:52 +0800 Subject: [PATCH] finish Signed-off-by: OneSizeFitQuorum --- .../metrics/IoTDBInternalLocalReporter.java | 100 +++++++++++++----- 1 file changed, 73 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index f33624fb0b2b1..2b6478f79596c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -44,16 +44,19 @@ import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter; import org.apache.iotdb.metrics.utils.InternalReporterType; import org.apache.iotdb.metrics.utils.ReporterType; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.session.util.SessionUtils; import org.apache.thrift.TException; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +72,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBInternalLocalReporter.class); private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); private static final Coordinator COORDINATOR = Coordinator.getInstance(); @@ -159,43 +165,83 @@ protected void writeMetricToIoTDB(Map valueMap, String prefix, l service.execute( () -> { try { - TSInsertRecordReq request = new TSInsertRecordReq(); - List measurements = new ArrayList<>(); - List types = new ArrayList<>(); - List values = new ArrayList<>(); - for (Map.Entry entry : valueMap.entrySet()) { - String measurement = entry.getKey(); - Object value = entry.getValue(); - measurements.add(measurement); - types.add(inferType(value)); - values.add(value); + TSStatus result = insertRecord(valueMap, prefix, time); + if (result.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + createTimeSeries(valueMap, prefix); + result = insertRecord(valueMap, prefix, time); } - ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); - - request.setPrefixPath(prefix); - request.setTimestamp(time); - request.setMeasurements(measurements); - request.setValues(buffer); - request.setIsAligned(false); - - InsertRowStatement s = StatementGenerator.createStatement(request); - final long queryId = SESSION_MANAGER.requestQueryId(); - ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); - if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error("Failed to update the value of metric with status {}", result.status); + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Failed to update the value of metric with status {}", result); } } catch (IoTDBConnectionException e1) { - LOGGER.error( + LOGGER.warn( "Failed to update the value of metric because of connection failure, because ", e1); } catch (IllegalPathException | QueryProcessException e2) { - LOGGER.error( + LOGGER.warn( "Failed to update the value of metric because of internal error, because ", e2); } }); } + private TSStatus insertRecord(Map valueMap, String prefix, long time) + throws IoTDBConnectionException, QueryProcessException, IllegalPathException { + TSInsertRecordReq request = new TSInsertRecordReq(); + List measurements = new ArrayList<>(); + List types = new ArrayList<>(); + List values = new ArrayList<>(); + for (Map.Entry entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + Object value = entry.getValue(); + measurements.add(measurement); + types.add(inferType(value)); + values.add(value); + } + ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); + + request.setPrefixPath(prefix); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(buffer); + request.setIsAligned(false); + + InsertRowStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + return result.status; + } + + private void createTimeSeries(Map valueMap, String prefix) + throws IllegalPathException { + TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq(); + List paths = new ArrayList<>(); + List types = new ArrayList<>(); + List encodings = new ArrayList<>(); + List compressors = new ArrayList<>(); + for (Map.Entry entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + paths.add(prefix + "." + measurement); + TSDataType type = inferType(entry.getValue()); + types.add(type.ordinal()); + encodings.add((int) getDefaultEncoding(type).serialize()); + compressors.add((int) TSFileDescriptor.getInstance().getConfig().getCompressor().serialize()); + } + request.setPaths(paths); + request.setDataTypes(types); + request.setEncodings(encodings); + request.setCompressors(compressors); + CreateMultiTimeSeriesStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + + ExecutionResult result = + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Failed to auto create timeseries for {} with status {}", paths, result.status); + } + } + @Override protected void writeMetricsToIoTDB(Map> valueMap, long time) { for (Map.Entry> value : valueMap.entrySet()) {