Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter;
import org.apache.iotdb.metrics.utils.InternalReporterType;
import org.apache.iotdb.metrics.utils.ReporterType;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.session.util.SessionUtils;

import org.apache.thrift.TException;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,7 +72,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;

public class IoTDBInternalLocalReporter extends IoTDBInternalReporter {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBInternalLocalReporter.class);
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
private static final Coordinator COORDINATOR = Coordinator.getInstance();
Expand Down Expand Up @@ -159,43 +165,83 @@ protected void writeMetricToIoTDB(Map<String, Object> valueMap, String prefix, l
service.execute(
() -> {
try {
TSInsertRecordReq request = new TSInsertRecordReq();
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
String measurement = entry.getKey();
Object value = entry.getValue();
measurements.add(measurement);
types.add(inferType(value));
values.add(value);
TSStatus result = insertRecord(valueMap, prefix, time);
if (result.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
createTimeSeries(valueMap, prefix);
result = insertRecord(valueMap, prefix, time);
}
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);

request.setPrefixPath(prefix);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(buffer);
request.setIsAligned(false);

InsertRowStatement s = StatementGenerator.createStatement(request);
final long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
COORDINATOR.executeForTreeModel(
s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Failed to update the value of metric with status {}", result.status);
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to update the value of metric with status {}", result);
}
} catch (IoTDBConnectionException e1) {
LOGGER.error(
LOGGER.warn(
"Failed to update the value of metric because of connection failure, because ", e1);
} catch (IllegalPathException | QueryProcessException e2) {
LOGGER.error(
LOGGER.warn(
"Failed to update the value of metric because of internal error, because ", e2);
}
});
}

private TSStatus insertRecord(Map<String, Object> valueMap, String prefix, long time)
throws IoTDBConnectionException, QueryProcessException, IllegalPathException {
TSInsertRecordReq request = new TSInsertRecordReq();
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
String measurement = entry.getKey();
Object value = entry.getValue();
measurements.add(measurement);
types.add(inferType(value));
values.add(value);
}
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);

request.setPrefixPath(prefix);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(buffer);
request.setIsAligned(false);

InsertRowStatement s = StatementGenerator.createStatement(request);
final long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
COORDINATOR.executeForTreeModel(
s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
return result.status;
}

private void createTimeSeries(Map<String, Object> valueMap, String prefix)
throws IllegalPathException {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
List<String> paths = new ArrayList<>();
List<Integer> types = new ArrayList<>();
List<Integer> encodings = new ArrayList<>();
List<Integer> compressors = new ArrayList<>();
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
String measurement = entry.getKey();
paths.add(prefix + "." + measurement);
TSDataType type = inferType(entry.getValue());
types.add(type.ordinal());
encodings.add((int) getDefaultEncoding(type).serialize());
compressors.add((int) TSFileDescriptor.getInstance().getConfig().getCompressor().serialize());
}
request.setPaths(paths);
request.setDataTypes(types);
request.setEncodings(encodings);
request.setCompressors(compressors);
CreateMultiTimeSeriesStatement s = StatementGenerator.createStatement(request);
final long queryId = SESSION_MANAGER.requestQueryId();

ExecutionResult result =
COORDINATOR.executeForTreeModel(
s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to auto create timeseries for {} with status {}", paths, result.status);
}
}

@Override
protected void writeMetricsToIoTDB(Map<String, Map<String, Object>> valueMap, long time) {
for (Map.Entry<String, Map<String, Object>> value : valueMap.entrySet()) {
Expand Down