Pipe: fix NPE in PipeEnrichedInsertBaseStatement#isQuery when analyzing statement#11203
Merged
SteveYurongSu merged 1 commit intoapache:masterfrom Sep 24, 2023
Merged
Conversation
SteveYurongSu
approved these changes
Sep 24, 2023
Member
SteveYurongSu
left a comment
There was a problem hiding this comment.
LGTM. Please cherry-pick it to rel/1.2 :)
VGalaxies
added a commit
to VGalaxies/iotdb
that referenced
this pull request
Sep 24, 2023
…ng statement in metadata mismatch scenarios (apache#11203) ## Background Consider the following data synchronization scenario (metadata mismatch) using Pipe engine. 1. starting two instances of IoTDB - A datanode -> 127.0.0.1:6667 - B datanode -> 127.0.0.1:6668 **NOTE**: IoTDB B should be configured with the following `iotdb-common.properties`: ```properties enable_partial_insert=false enable_auto_create_schema=false ``` 2. connecting IoTDB B (6668) by cli and send ```sql create TIMESERIES root.sg.d1.s0 with datatype=float; create TIMESERIES root.sg.d1.s1 with datatype=float; ``` 3. connecting IoTDB A (6667) by cli and send ```sql create pipe test with connector ( 'connector'='iotdb-thrift-connector', 'connector.ip'='127.0.0.1', 'connector.port'='6668' ); start pipe test; create TIMESERIES root.sg.d1.s0 with datatype=text; create TIMESERIES root.sg.d1.s1 with datatype=float; insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2); ``` --- In IoTDB B, the following errors occur: ```plain 2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer java.lang.NullPointerException: null at org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70) at org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170) at org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113) at org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43) at org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` By debugging and tracing back to the source of the error: - `org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)` - `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process` - `org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert` - `org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher, org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, org.apache.iotdb.db.queryengine.common.MPPQueryContext)` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType` we found that the error occurs in `CommonUtils.parseValue` in `transferType`, where it throws a `QueryProcessException`. This exception is caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the `analysis` object to be set to true. This subsequently leads to the `statement` field being null, ultimately triggering a `NullPointerException`. The purpose of this PR is to fix this NPE error, **providing more accurate error information for both the sender (IoTDB A) and receiver (IoTDB B)**. ## Description Notice that in `iotdb-common.properties`, the default value of `enable_partial_insert` is true. In the default configuration, **executing the above data synchronization scenario will NOT trigger a `NullPointerException` (NPE) error**. This is because in this situation, the `transferType` method will not rethrow a **`QueryProcessException`** exception. ```java try { values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString()); } catch (Exception e) { LOGGER.warn( "data type of {}.{} is not consistent, " + "registered type {}, inserting timestamp {}, value {}", devicePath, measurements[i], dataTypes[i], time, values[i]); if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { // <-- NOTE HERE throw e; } else { markFailedMeasurement(i, e); } } ``` The `markFailedMeasurement` method also leads to the analysis becoming failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔. ```java if (hasFailedMeasurement) { partialInsertMessage = String.format( "Fail to insert measurements %s caused by %s", insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages()); logger.warn(partialInsertMessage); analysis.setFailStatus( RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); } ``` In this situation, the execution state machine's transition to a failed state is reasonable: ```java if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) { stateMachine.transitionToFailed(analysis.getFailStatus()); } ``` and warning logs are recorded as below: ```java 2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement, statement: org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83, result status is: TSStatus(code:507, message:Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one]) 2023-09-23 17:42:25,720 [pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one] ``` Therefore, when `enable_partial_insert` is set to false, we only need to avoid THIS NPE **directly** to achieve the expected purpose.
VGalaxies
added a commit
to VGalaxies/iotdb
that referenced
this pull request
Sep 24, 2023
… when analyzing statement in metadata mismatch scenarios (apache#11203) Consider the following data synchronization scenario (metadata mismatch) using Pipe engine. 1. starting two instances of IoTDB - A datanode -> 127.0.0.1:6667 - B datanode -> 127.0.0.1:6668 **NOTE**: IoTDB B should be configured with the following `iotdb-common.properties`: ```properties enable_partial_insert=false enable_auto_create_schema=false ``` 2. connecting IoTDB B (6668) by cli and send ```sql create TIMESERIES root.sg.d1.s0 with datatype=float; create TIMESERIES root.sg.d1.s1 with datatype=float; ``` 3. connecting IoTDB A (6667) by cli and send ```sql create pipe test with connector ( 'connector'='iotdb-thrift-connector', 'connector.ip'='127.0.0.1', 'connector.port'='6668' ); start pipe test; create TIMESERIES root.sg.d1.s0 with datatype=text; create TIMESERIES root.sg.d1.s1 with datatype=float; insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2); ``` --- In IoTDB B, the following errors occur: ```plain 2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer java.lang.NullPointerException: null at org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70) at org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170) at org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113) at org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43) at org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` By debugging and tracing back to the source of the error: - `org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)` - `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process` - `org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert` - `org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher, org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, org.apache.iotdb.db.queryengine.common.MPPQueryContext)` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType` we found that the error occurs in `CommonUtils.parseValue` in `transferType`, where it throws a `QueryProcessException`. This exception is caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the `analysis` object to be set to true. This subsequently leads to the `statement` field being null, ultimately triggering a `NullPointerException`. The purpose of this PR is to fix this NPE error, **providing more accurate error information for both the sender (IoTDB A) and receiver (IoTDB B)**. Notice that in `iotdb-common.properties`, the default value of `enable_partial_insert` is true. In the default configuration, **executing the above data synchronization scenario will NOT trigger a `NullPointerException` (NPE) error**. This is because in this situation, the `transferType` method will not rethrow a **`QueryProcessException`** exception. ```java try { values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString()); } catch (Exception e) { LOGGER.warn( "data type of {}.{} is not consistent, " + "registered type {}, inserting timestamp {}, value {}", devicePath, measurements[i], dataTypes[i], time, values[i]); if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { // <-- NOTE HERE throw e; } else { markFailedMeasurement(i, e); } } ``` The `markFailedMeasurement` method also leads to the analysis becoming failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔. ```java if (hasFailedMeasurement) { partialInsertMessage = String.format( "Fail to insert measurements %s caused by %s", insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages()); logger.warn(partialInsertMessage); analysis.setFailStatus( RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); } ``` In this situation, the execution state machine's transition to a failed state is reasonable: ```java if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) { stateMachine.transitionToFailed(analysis.getFailStatus()); } ``` and warning logs are recorded as below: ```java 2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement, statement: org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83, result status is: TSStatus(code:507, message:Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one]) 2023-09-23 17:42:25,720 [pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one] ``` Therefore, when `enable_partial_insert` is set to false, we only need to avoid THIS NPE **directly** to achieve the expected purpose.
VGalaxies
added a commit
to VGalaxies/iotdb
that referenced
this pull request
Sep 24, 2023
… when analyzing statement in metadata mismatch scenarios (apache#11203) Consider the following data synchronization scenario (metadata mismatch) using Pipe engine. 1. starting two instances of IoTDB - A datanode -> 127.0.0.1:6667 - B datanode -> 127.0.0.1:6668 **NOTE**: IoTDB B should be configured with the following `iotdb-common.properties`: ```properties enable_partial_insert=false enable_auto_create_schema=false ``` 2. connecting IoTDB B (6668) by cli and send ```sql create TIMESERIES root.sg.d1.s0 with datatype=float; create TIMESERIES root.sg.d1.s1 with datatype=float; ``` 3. connecting IoTDB A (6667) by cli and send ```sql create pipe test with connector ( 'connector'='iotdb-thrift-connector', 'connector.ip'='127.0.0.1', 'connector.port'='6668' ); start pipe test; create TIMESERIES root.sg.d1.s0 with datatype=text; create TIMESERIES root.sg.d1.s1 with datatype=float; insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2); ``` --- In IoTDB B, the following errors occur: ```plain 2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer java.lang.NullPointerException: null at org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70) at org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170) at org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113) at org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43) at org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` By debugging and tracing back to the source of the error: - `org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)` - `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process` - `org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert` - `org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher, org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, org.apache.iotdb.db.queryengine.common.MPPQueryContext)` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType` we found that the error occurs in `CommonUtils.parseValue` in `transferType`, where it throws a `QueryProcessException`. This exception is caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the `analysis` object to be set to true. This subsequently leads to the `statement` field being null, ultimately triggering a `NullPointerException`. The purpose of this PR is to fix this NPE error, **providing more accurate error information for both the sender (IoTDB A) and receiver (IoTDB B)**. Notice that in `iotdb-common.properties`, the default value of `enable_partial_insert` is true. In the default configuration, **executing the above data synchronization scenario will NOT trigger a `NullPointerException` (NPE) error**. This is because in this situation, the `transferType` method will not rethrow a **`QueryProcessException`** exception. ```java try { values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString()); } catch (Exception e) { LOGGER.warn( "data type of {}.{} is not consistent, " + "registered type {}, inserting timestamp {}, value {}", devicePath, measurements[i], dataTypes[i], time, values[i]); if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { // <-- NOTE HERE throw e; } else { markFailedMeasurement(i, e); } } ``` The `markFailedMeasurement` method also leads to the analysis becoming failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔. ```java if (hasFailedMeasurement) { partialInsertMessage = String.format( "Fail to insert measurements %s caused by %s", insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages()); logger.warn(partialInsertMessage); analysis.setFailStatus( RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); } ``` In this situation, the execution state machine's transition to a failed state is reasonable: ```java if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) { stateMachine.transitionToFailed(analysis.getFailStatus()); } ``` and warning logs are recorded as below: ```java 2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement, statement: org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83, result status is: TSStatus(code:507, message:Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one]) 2023-09-23 17:42:25,720 [pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one] ``` Therefore, when `enable_partial_insert` is set to false, we only need to avoid THIS NPE **directly** to achieve the expected purpose.
SteveYurongSu
pushed a commit
that referenced
this pull request
Sep 24, 2023
… when analyzing statement in metadata mismatch scenarios (#11203) (#11206) Consider the following data synchronization scenario (metadata mismatch) using Pipe engine. 1. starting two instances of IoTDB - A datanode -> 127.0.0.1:6667 - B datanode -> 127.0.0.1:6668 **NOTE**: IoTDB B should be configured with the following `iotdb-common.properties`: ```properties enable_partial_insert=false enable_auto_create_schema=false ``` 2. connecting IoTDB B (6668) by cli and send ```sql create TIMESERIES root.sg.d1.s0 with datatype=float; create TIMESERIES root.sg.d1.s1 with datatype=float; ``` 3. connecting IoTDB A (6667) by cli and send ```sql create pipe test with connector ( 'connector'='iotdb-thrift-connector', 'connector.ip'='127.0.0.1', 'connector.port'='6668' ); start pipe test; create TIMESERIES root.sg.d1.s0 with datatype=text; create TIMESERIES root.sg.d1.s1 with datatype=float; insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2); ``` --- In IoTDB B, the following errors occur: ```plain 2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer java.lang.NullPointerException: null at org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70) at org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310) at org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170) at org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113) at org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111) at org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43) at org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295) at org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` By debugging and tracing back to the source of the error: - `org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)` - `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process` - `org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert` - `org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher, org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, org.apache.iotdb.db.queryengine.common.MPPQueryContext)` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation` - `org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType` we found that the error occurs in `CommonUtils.parseValue` in `transferType`, where it throws a `QueryProcessException`. This exception is caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the `analysis` object to be set to true. This subsequently leads to the `statement` field being null, ultimately triggering a `NullPointerException`. The purpose of this PR is to fix this NPE error, **providing more accurate error information for both the sender (IoTDB A) and receiver (IoTDB B)**. Notice that in `iotdb-common.properties`, the default value of `enable_partial_insert` is true. In the default configuration, **executing the above data synchronization scenario will NOT trigger a `NullPointerException` (NPE) error**. This is because in this situation, the `transferType` method will not rethrow a **`QueryProcessException`** exception. ```java try { values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString()); } catch (Exception e) { LOGGER.warn( "data type of {}.{} is not consistent, " + "registered type {}, inserting timestamp {}, value {}", devicePath, measurements[i], dataTypes[i], time, values[i]); if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { // <-- NOTE HERE throw e; } else { markFailedMeasurement(i, e); } } ``` The `markFailedMeasurement` method also leads to the analysis becoming failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔. ```java if (hasFailedMeasurement) { partialInsertMessage = String.format( "Fail to insert measurements %s caused by %s", insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages()); logger.warn(partialInsertMessage); analysis.setFailStatus( RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); } ``` In this situation, the execution state machine's transition to a failed state is reasonable: ```java if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) { stateMachine.transitionToFailed(analysis.getFailStatus()); } ``` and warning logs are recorded as below: ```java 2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement, statement: org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83, result status is: TSStatus(code:507, message:Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one]) 2023-09-23 17:42:25,720 [pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting type TEXT, timestamp 3, value one] ``` Therefore, when `enable_partial_insert` is set to false, we only need to avoid THIS NPE **directly** to achieve the expected purpose.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Background
Consider the following data synchronization scenario (metadata mismatch) using Pipe engine.
NOTE: IoTDB B should be configured with the following
iotdb-common.properties:create pipe test with connector ( 'connector'='iotdb-thrift-connector', 'connector.ip'='127.0.0.1', 'connector.port'='6668' ); start pipe test; create TIMESERIES root.sg.d1.s0 with datatype=text; create TIMESERIES root.sg.d1.s1 with datatype=float; insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2);In IoTDB B, the following errors occur:
By debugging and tracing back to the source of the error:
org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#processorg.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsertorg.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher, org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, org.apache.iotdb.db.queryengine.common.MPPQueryContext)org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidationorg.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferTypewe found that the error occurs in
CommonUtils.parseValueintransferType, where it throws aQueryProcessException. This exception is caught byvalidateSchema, causing thefinishQueryAfterAnalyzefield of theanalysisobject to be set to true. This subsequently leads to thestatementfield being null, ultimately triggering aNullPointerException.The purpose of this PR is to fix this NPE error, providing more accurate error information for both the sender (IoTDB A) and receiver (IoTDB B).
Description
Notice that in
iotdb-common.properties, the default value ofenable_partial_insertis true. In the default configuration, executing the above data synchronization scenario will NOT trigger aNullPointerException(NPE) error. This is because in this situation, thetransferTypemethod will not rethrow aQueryProcessExceptionexception.The
markFailedMeasurementmethod also leads to the analysis becoming failed, but it does not setfinishQueryAfterAnalyzeto true (why?) 🤔.In this situation, the execution state machine's transition to a failed state is reasonable:
and warning logs are recorded as below:
Therefore, when
enable_partial_insertis set to false, we only need to avoid THIS NPE directly to achieve the expected purpose.This PR has:
for an unfamiliar reader.
for code coverage.
Key changed/added classes (or packages if there are too many classes) in this PR