Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<module>rest-java-example</module>
<module>schema</module>
<module>session</module>
<module>subscription</module>
<module>trigger</module>
<module>udf</module>
</modules>
Expand Down
38 changes: 38 additions & 0 deletions example/subscription/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-examples</artifactId>
<version>1.3.7-SNAPSHOT</version>
</parent>
<artifactId>subscription-example</artifactId>
<name>IoTDB: Example: Subscription Client</name>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-subscription</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
5 changes: 5 additions & 0 deletions integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<artifactId>iotdb-session</artifactId>
<version>1.3.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-subscription</artifactId>
<version>1.3.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,14 @@ public Connection getConnection(String username, String password) throws SQLExce
this);
}

@Override
public Connection getAvailableConnection(String username, String password) throws SQLException {
return new ClusterTestConnection(
getWriteConnection(null, username, password),
getOneAvailableReadConnection(null, username, password),
this);
}

@Override
public Connection getConnection(
final DataNodeWrapper dataNodeWrapper, final String username, final String password)
Expand Down Expand Up @@ -656,6 +664,23 @@ protected List<NodeConnection> getReadConnections(
return readConnRequestDelegate.requestAll();
}

protected List<NodeConnection> getOneAvailableReadConnection(
final Constant.Version version, final String username, final String password)
throws SQLException {
final List<DataNodeWrapper> dataNodeWrapperListCopy = new ArrayList<>(dataNodeWrapperList);
Collections.shuffle(dataNodeWrapperListCopy);
SQLException lastException = null;
for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
try {
return getReadConnections(version, dataNode, username, password);
} catch (final SQLException e) {
lastException = e;
}
}
logger.error("Failed to get connection from any DataNode, last exception is ", lastException);
throw lastException;
}

// use this to avoid some runtimeExceptions when try to get jdbc connections.
// because it is hard to add retry and handle exception when getting jdbc connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public void start() {
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() + "m",
"-Djdk.nio.maxCachedBufferSize=262144",
"-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + killPoints.toString(),
"-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
"-cp",
server_node_lib_path));
addStartCmdParams(startCmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public Connection getConnection(String username, String password) throws SQLExce
return connection;
}

@Override
public Connection getAvailableConnection(String username, String password) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
DataNodeWrapper dataNode, String username, String password) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ Connection getConnection(Constant.Version version, String username, String passw
Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, String password)
throws SQLException;

default Connection getAvailableConnection() throws SQLException {
return getAvailableConnection(SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
}

Connection getAvailableConnection(String username, String password) throws SQLException;

default Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
throws SQLException {
return getWriteOnlyConnectionWithSpecifiedDataNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand All @@ -41,18 +42,27 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -78,7 +88,7 @@ public void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

private String getAuthorization(String username, String password) {
public static String getAuthorization(String username, String password) {
return Base64.getEncoder()
.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -128,7 +138,7 @@ public void ping() {
}
}

private HttpPost getHttpPost(String url) {
public static HttpPost getHttpPost(String url) {
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
Expand Down Expand Up @@ -242,6 +252,101 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http
}
}

@Ignore // Flaky test
@Test
public void errorInsertRecords() throws SQLException, InterruptedException {
SimpleEnv simpleEnv = new SimpleEnv();
simpleEnv
.getConfig()
.getCommonConfig()
.setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
.setSchemaReplicationFactor(3)
.setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
.setDataReplicationFactor(2);
simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
simpleEnv.initClusterEnvironment(1, 3);

CloseableHttpResponse response = null;
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
try {
HttpPost httpPost =
getHttpPost(
"http://"
+ simpleEnv.getDataNodeWrapper(0).getIp()
+ ":"
+ simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+ "/rest/v2/insertRecords");
String json =
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
response = httpClient.execute(httpPost);
break;
} catch (Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

HttpEntity responseEntity = response.getEntity();
String message = EntityUtils.toString(responseEntity, "utf-8");
JsonObject result = JsonParser.parseString(message).getAsJsonObject();
assertEquals(507, Integer.parseInt(result.get("code").toString()));
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
TimeUnit.SECONDS.sleep(5);

try {
for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) {
dataNodeWrapper.stop();
try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection();
Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) {
int count = 0;
try (ResultSet resultSet =
statementAfterNodeDown.executeQuery(
"select s88, s77, s66, s55, s44, s33 from root.s1")) {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder row = new StringBuilder();
for (int i = 0; i < metaData.getColumnCount(); i++) {
row.append(resultSet.getString(i + 1)).append(",");
}
System.out.println(row);
count++;
}
}
assertEquals(3, count);
}
dataNodeWrapper.start();
TimeUnit.SECONDS.sleep(1);
}
} catch (SQLException e) {
if (!e.getMessage().contains("Maybe server is down")) {
throw e;
}
} finally {
simpleEnv.cleanClusterEnvironment();
}
}

public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) {
CloseableHttpResponse response = null;
try {
Expand Down
Loading
Loading