diff --git a/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java b/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java index 923d1fc950eab..31e1e352ed11f 100644 --- a/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java +++ b/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java @@ -318,35 +318,35 @@ public Timer getOrCreateTimerWithInternalReport( } /** Count with internal report. */ - public void countWithInternalReport( + public void countWithInternalReportAsync( long delta, String metric, MetricLevel metricLevel, String... tags) { internalReporter.writeMetricToIoTDB( metricManager.count(delta, metric, metricLevel, tags), metric, tags); } /** Gauge value with internal report */ - public void gaugeWithInternalReport( + public void gaugeWithInternalReportAsync( long value, String metric, MetricLevel metricLevel, String... tags) { internalReporter.writeMetricToIoTDB( metricManager.gauge(value, metric, metricLevel, tags), metric, tags); } /** Rate with internal report. */ - public void rateWithInternalReport( + public void rateWithInternalReportAsync( long value, String metric, MetricLevel metricLevel, String... tags) { internalReporter.writeMetricToIoTDB( metricManager.rate(value, metric, metricLevel, tags), metric, tags); } /** Histogram with internal report. */ - public void histogramWithInternalReport( + public void histogramWithInternalReportAsync( long value, String metric, MetricLevel metricLevel, String... tags) { internalReporter.writeMetricToIoTDB( metricManager.histogram(value, metric, metricLevel, tags), metric, tags); } /** Timer with internal report. */ - public void timerWithInternalReport( + public void timerWithInternalReportAsync( long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, String... tags) { internalReporter.writeMetricToIoTDB( metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, tags); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index a0400d0b33758..1db911af33f1d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -263,7 +263,7 @@ public void run() { lastIndex = storageGroup.length(); } MetricService.getInstance() - .gaugeWithInternalReport( + .gaugeWithInternalReportAsync( memTable.getTotalPointsNum(), Metric.POINTS.toString(), MetricLevel.CORE, diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index a275bbad6ae9f..a634b678a0395 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -150,39 +150,43 @@ public ReporterType getReporterType() { @Override protected void writeMetricToIoTDB(Map valueMap, String prefix, long time) { - try { - TSInsertRecordReq request = new TSInsertRecordReq(); - List measurements = new ArrayList<>(); - List types = new ArrayList<>(); - List values = new ArrayList<>(); - for (Map.Entry entry : valueMap.entrySet()) { - String measurement = entry.getKey(); - Object value = entry.getValue(); - measurements.add(measurement); - types.add(inferType(value)); - values.add(value); - } - ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); + service.execute( + () -> { + try { + TSInsertRecordReq request = new TSInsertRecordReq(); + List measurements = new ArrayList<>(); + List types = new ArrayList<>(); + List values = new ArrayList<>(); + for (Map.Entry entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + Object value = entry.getValue(); + measurements.add(measurement); + types.add(inferType(value)); + values.add(value); + } + ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); - request.setPrefixPath(prefix); - request.setTimestamp(time); - request.setMeasurements(measurements); - request.setValues(buffer); - request.setIsAligned(false); + request.setPrefixPath(prefix); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(buffer); + request.setIsAligned(false); - InsertRowStatement s = StatementGenerator.createStatement(request); - final long queryId = SESSION_MANAGER.requestQueryId(); - ExecutionResult result = - COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); - if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error("Failed to update the value of metric with status {}", result.status); - } - } catch (IoTDBConnectionException e1) { - LOGGER.error( - "Failed to update the value of metric because of connection failure, because ", e1); - } catch (IllegalPathException | QueryProcessException e2) { - LOGGER.error("Failed to update the value of metric because of internal error, because ", e2); - } + InsertRowStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.error("Failed to update the value of metric with status {}", result.status); + } + } catch (IoTDBConnectionException e1) { + LOGGER.error( + "Failed to update the value of metric because of connection failure, because ", e1); + } catch (IllegalPathException | QueryProcessException e2) { + LOGGER.error( + "Failed to update the value of metric because of internal error, because ", e2); + } + }); } @Override