Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.utils.CommonUtils;
Expand Down Expand Up @@ -108,10 +109,10 @@
}

@Override
public Response executeFastLastQueryStatement(

Check warning on line 112 in external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/impl/RestApiServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 116 to 64, Complexity from 15 to 14, Nesting Level from 4 to 2, Number of Variables from 33 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2GA6xekud7vHWqZhhu&open=AZ2GA6xekud7vHWqZhhu&pullRequest=17465
PrefixPathList prefixPathList, SecurityContext securityContext) {
Long queryId = null;
Statement statement = null;
QueryStatement statement = null;
boolean finish = false;
long startTime = System.nanoTime();
Throwable t = null;
Expand All @@ -123,30 +124,33 @@
new PartialPath(prefixPathList.getPrefixPaths().toArray(new String[0]));
final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>> resultMap =
new HashMap<>();
int sensorNum = 0;

// Check permission, the cost is rather low because the req only contains one prefix path
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
final TSLastDataQueryReq tsLastDataQueryReq =
FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList);
statement = StatementGenerator.createStatement(tsLastDataQueryReq);

final Response response = authorizationHandler.checkAuthority(securityContext, statement);
if (response != null) {
return response;
}
Comment thread
jt2594838 marked this conversation as resolved.

final String prefixString = prefixPath.toString();
for (final ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) {
if (!prefixString.startsWith(region.getDatabaseFullPath())
&& !region.getDatabaseFullPath().startsWith(prefixString)) {
continue;
}
sensorNum += region.fillLastQueryMap(prefixPath, resultMap);
region.fillLastQueryMap(prefixPath, resultMap, statement.getAuthorityScope());
}

// Check cache first
if (!TableDeviceSchemaCache.getInstance().getLastCache(resultMap)) {
IClientSession clientSession = SESSION_MANAGER.getCurrSession();
TSLastDataQueryReq tsLastDataQueryReq =
FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList);
statement = StatementGenerator.createStatement(tsLastDataQueryReq);

if (ExecuteStatementHandler.validateStatement(statement)) {
return FastLastHandler.buildErrorResponse(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}

Optional.ofNullable(authorizationHandler.checkAuthority(securityContext, statement))
.ifPresent(Response.class::cast);

queryId = SESSION_MANAGER.requestQueryId();
SessionInfo sessionInfo = SESSION_MANAGER.getSessionInfo(clientSession);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.db.it;

import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -476,6 +478,7 @@ public void insertAndQuery() {
selectLast(httpClient);

queryV2(httpClient);
selectFastLast(httpClient);
queryGroupByLevelV2(httpClient);
queryRowLimitV2(httpClient);
queryShowChildPathsV2(httpClient);
Expand Down Expand Up @@ -886,6 +889,71 @@ public void queryWithWrongAuthorization() {
}
}

@Test
public void queryFastLastWithWrongAuthorization() {
CloseableHttpResponse response = null;

TestUtils.executeNonQuery("create user abcd 'strongPassword@1234'");
try {
final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
final HttpPost httpPost = new HttpPost("http://127.0.0.1:" + port + "/rest/v2/fastLastQuery");
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
final String authorization = getAuthorization("abcd", "strongPassword@1234");
httpPost.setHeader("Authorization", authorization);
final String sql = "{\"prefix_paths\":[\"root\",\"sg25\"]}";
httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
response = httpClient.execute(httpPost);
break;
} catch (Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

Assert.assertEquals(200, response.getStatusLine().getStatusCode());
String message = EntityUtils.toString(response.getEntity(), "utf-8");
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(message, Map.class);
List<Long> timestampsResult = (List<Long>) map.get("timestamps");
List<Long> expressionsResult = (List<Long>) map.get("expressions");
List<List<Object>> valuesResult = (List<List<Object>>) map.get("values");
Assert.assertTrue(map.size() > 0);
List<Object> expressions =
new ArrayList<Object>() {
{
add("Timeseries");
add("Value");
add("DataType");
}
};

Assert.assertEquals(expressions, expressionsResult);
Assert.assertEquals(Collections.emptyList(), timestampsResult);
Assert.assertEquals(Collections.emptyList(), valuesResult);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

public void query(CloseableHttpClient httpClient) {
CloseableHttpResponse response = null;
try {
Expand Down Expand Up @@ -1651,6 +1719,98 @@ public void queryV2(CloseableHttpClient httpClient) {
}
}

public void selectFastLast(CloseableHttpClient httpClient) {
// Only used in 1D scenarios
if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) {
return;
}
CloseableHttpResponse response = null;
try {
HttpPost httpPost = getHttpPost("http://127.0.0.1:" + port + "/rest/v2/fastLastQuery");
String sql = "{\"prefix_paths\":[\"root\",\"sg25\"]}";
httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
response = httpClient.execute(httpPost);
HttpEntity responseEntity = response.getEntity();
String message = EntityUtils.toString(responseEntity, "utf-8");
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(message, Map.class);
List<Long> timestampsResult = (List<Long>) map.get("timestamps");
List<Long> expressionsResult = (List<Long>) map.get("expressions");
List<List<Object>> valuesResult = (List<List<Object>>) map.get("values");
Assert.assertTrue(map.size() > 0);
List<Object> expressions =
new ArrayList<Object>() {
{
add("Timeseries");
add("Value");
add("DataType");
}
};
List<Object> timestamps =
new ArrayList<Object>() {
{
add(1635232153960l);
add(1635232153960l);
add(1635232153960l);
add(1635232143960l);
add(1635232153960l);
add(1635232153960l);
}
};
List<Object> values1 =
new ArrayList<Object>() {
{
add("root.sg25.s3");
add("root.sg25.s4");
add("root.sg25.s5");
add("root.sg25.s6");
add("root.sg25.s7");
add("root.sg25.s8");
}
};
List<Object> values2 =
new ArrayList<Object>() {
{
add("");
add("2");
add("1635000012345556");
add("1.41");
add("false");
add("3.5555");
}
};
List<Object> values3 =
new ArrayList<Object>() {
{
add("TEXT");
add("INT32");
add("INT64");
add("FLOAT");
add("BOOLEAN");
add("DOUBLE");
}
};

Assert.assertEquals(expressions, expressionsResult);
Assert.assertEquals(timestamps, timestampsResult);
Assert.assertEquals(values1, valuesResult.get(0));
Assert.assertEquals(values2, valuesResult.get(1));
Assert.assertEquals(values3, valuesResult.get(2));
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

public void queryGroupByLevelV2(CloseableHttpClient httpClient) {
CloseableHttpResponse response = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1451,8 +1451,9 @@ public static void assertResultSetEqual(
public static void assertResultSetEqual(
SessionDataSet actualResultSet,
List<String> expectedColumnNames,
Set<String> expectedRetArray,
Set<String> expectedRetSet,
boolean ignoreTimeStamp) {
final Set<String> copiedSet = new HashSet<>(expectedRetSet);
try {
List<String> actualColumnNames = actualResultSet.getColumnNames();
if (ignoreTimeStamp) {
Expand All @@ -1462,12 +1463,11 @@ public static void assertResultSetEqual(
assertEquals(expectedColumnNames, actualColumnNames.subList(1, actualColumnNames.size()));
}

int count = 0;
while (actualResultSet.hasNext()) {
RowRecord rowRecord = actualResultSet.next();
assertTrue(expectedRetArray.remove(rowRecord.toString().replace('\t', ',')));
assertTrue(copiedSet.remove(rowRecord.toString().replace('\t', ',')));
}
assertEquals(expectedRetArray.size(), count);
assertEquals(0, copiedSet.size());
} catch (IoTDBConnectionException | StatementExecutionException e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.it.utils.AlignedWriteUtil;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;

import org.junit.AfterClass;
Expand Down Expand Up @@ -249,6 +251,81 @@ public void lastQueryForOneDeviceNoSchema() throws IoTDBConnectionException {
}
}

@Test
public void lastQueryWithPrefixTest() throws IoTDBConnectionException {
// Only used in 1D scenarios
if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) {
return;
}
final Set<String> retArray =
new HashSet<>(
Arrays.asList(
"30,root.sg1.d1.s3,30,INT64",
"30,root.sg1.d1.s4,false,BOOLEAN",
"40,root.sg1.d1.s5,aligned_test40,TEXT",
"23,root.sg1.d1.s1,230000.0,FLOAT",
"40,root.sg1.d1.s2,40,INT32"));

try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
// Push last cache first
try (final SessionDataSet resultSet =
session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}

try (final SessionDataSet resultSet =
session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}
} catch (StatementExecutionException | RedirectException e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void lastQueryWithoutPermissionTest() throws IoTDBConnectionException {
// Only used in 1D scenarios
if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) {
return;
}
final String[] retArray = new String[] {};
final Set<String> retArray2 =
new HashSet<>(
Arrays.asList(
"30,root.sg1.d1.s3,30,INT64",
"30,root.sg1.d1.s4,false,BOOLEAN",
"40,root.sg1.d1.s5,aligned_test40,TEXT",
"23,root.sg1.d1.s1,230000.0,FLOAT",
"40,root.sg1.d1.s2,40,INT32"));
TestUtils.executeNonQuery(EnvFactory.getEnv(), "create user abcd 'veryComplexPassword@123'");

try (final ISession session =
EnvFactory.getEnv().getSessionConnection("abcd", "veryComplexPassword@123");
final ISession rootSession = EnvFactory.getEnv().getSessionConnection()) {
// Push last cache first
try (final SessionDataSet resultSet =
rootSession.executeFastLastDataQueryForOnePrefixPath(
Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray2, true);
}
Comment thread
jt2594838 marked this conversation as resolved.

try (final SessionDataSet resultSet =
session.executeLastDataQueryForOneDevice(
"root.sg1", "root.sg1.d1", Arrays.asList("notExist", "s1"), true)) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}

try (final SessionDataSet resultSet =
session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}
} catch (StatementExecutionException | RedirectException e) {
e.printStackTrace();
fail(e.getMessage());
}
}

// ------------------------------ Aggregation Query ------------------------------
@Test
public void aggregationQueryTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ private void rollbackRenameColumn(final ConfigNodeProcedureEnv env) {
env.getConfigManager()
.getClusterSchemaManager()
.executePlan(
new RenameTableColumnPlan(database, tableName, newName, oldName),
this instanceof RenameViewColumnProcedure
? new RenameViewColumnPlan(database, tableName, newName, oldName)
: new RenameTableColumnPlan(database, tableName, newName, oldName),
isGeneratedByPipe);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
setFailure(new ProcedureException(new IoTDBException(status)));
Expand Down
Loading
Loading