Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,35 +318,35 @@ public Timer getOrCreateTimerWithInternalReport(
}

/** Count with internal report. */
public void countWithInternalReport(
public void countWithInternalReportAsync(
long delta, String metric, MetricLevel metricLevel, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.count(delta, metric, metricLevel, tags), metric, tags);
}

/** Gauge value with internal report */
public void gaugeWithInternalReport(
public void gaugeWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.gauge(value, metric, metricLevel, tags), metric, tags);
}

/** Rate with internal report. */
public void rateWithInternalReport(
public void rateWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.rate(value, metric, metricLevel, tags), metric, tags);
}

/** Histogram with internal report. */
public void histogramWithInternalReport(
public void histogramWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.histogram(value, metric, metricLevel, tags), metric, tags);
}

/** Timer with internal report. */
public void timerWithInternalReport(
public void timerWithInternalReportAsync(
long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void run() {
lastIndex = storageGroup.length();
}
MetricService.getInstance()
.gaugeWithInternalReport(
.gaugeWithInternalReportAsync(
memTable.getTotalPointsNum(),
Metric.POINTS.toString(),
MetricLevel.CORE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,39 +150,43 @@ public ReporterType getReporterType() {

@Override
protected void writeMetricToIoTDB(Map<String, Object> valueMap, String prefix, long time) {
try {
TSInsertRecordReq request = new TSInsertRecordReq();
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
String measurement = entry.getKey();
Object value = entry.getValue();
measurements.add(measurement);
types.add(inferType(value));
values.add(value);
}
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
service.execute(
() -> {
try {
TSInsertRecordReq request = new TSInsertRecordReq();
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
String measurement = entry.getKey();
Object value = entry.getValue();
measurements.add(measurement);
types.add(inferType(value));
values.add(value);
}
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);

request.setPrefixPath(prefix);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(buffer);
request.setIsAligned(false);
request.setPrefixPath(prefix);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(buffer);
request.setIsAligned(false);

InsertRowStatement s = StatementGenerator.createStatement(request);
final long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Failed to update the value of metric with status {}", result.status);
}
} catch (IoTDBConnectionException e1) {
LOGGER.error(
"Failed to update the value of metric because of connection failure, because ", e1);
} catch (IllegalPathException | QueryProcessException e2) {
LOGGER.error("Failed to update the value of metric because of internal error, because ", e2);
}
InsertRowStatement s = StatementGenerator.createStatement(request);
final long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Failed to update the value of metric with status {}", result.status);
}
} catch (IoTDBConnectionException e1) {
LOGGER.error(
"Failed to update the value of metric because of connection failure, because ", e1);
} catch (IllegalPathException | QueryProcessException e2) {
LOGGER.error(
"Failed to update the value of metric because of internal error, because ", e2);
}
});
}

@Override
Expand Down