Skip to content

Commit 0e6367a

Browse files
turboFeipan3793
authored andcommitted
[KYUUBI #1946] Close the kyuubi connection on launch engine operation failure
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> For kyuubi connection, if the launch engine operation fails, we need close the kyuubi connection. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1946 from turboFei/close_conn_on_engine_failure. Closes #1946 3a4a465 [Fei Wang] remove final: 8111726 [Fei Wang] add ut 74d3b28 [Fei Wang] address commments 0c41ac5 [Fei Wang] refactor 8dcc7ec [Fei Wang] Close the kyuubi connection on launch engine operation failure Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 2ac8165 commit 0e6367a

File tree

3 files changed

+40
-6
lines changed

3 files changed

+40
-6
lines changed

kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public Connection getConnectionFromDefaultDriver(String url, Properties properti
140140
new Thread(beeLine.commands.createLogRunnable(kyuubiConnection, eventNotifier));
141141
logThread.setDaemon(true);
142142
logThread.start();
143+
kyuubiConnection.setEngineLogThread(logThread);
143144

144145
kyuubiConnection.waitLaunchEngineToComplete();
145146
logThread.interrupt();

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
public class KyuubiConnection implements java.sql.Connection, KyuubiLoggable {
7979
public static final Logger LOG = LoggerFactory.getLogger(KyuubiConnection.class.getName());
8080
public static final String BEELINE_MODE_PROPERTY = "BEELINE_MODE";
81+
public static int DEFAULT_ENGINE_LOG_THREAD_TIMEOUT = 10 * 1000;
8182

8283
private String jdbcUriString;
8384
private String host;
@@ -100,6 +101,7 @@ public class KyuubiConnection implements java.sql.Connection, KyuubiLoggable {
100101
private boolean initFileCompleted = false;
101102

102103
private TOperationHandle launchEngineOpHandle = null;
104+
private Thread engineLogThread;
103105
private boolean engineLogInflight = true;
104106
private volatile boolean launchEngineOpCompleted = false;
105107

@@ -263,7 +265,7 @@ public List<String> getExecLog() throws SQLException, ClosedOrCancelledException
263265
private void showLaunchEngineLog() {
264266
if (launchEngineOpHandle != null) {
265267
LOG.info("Starting to get launch engine log.");
266-
Thread logThread =
268+
engineLogThread =
267269
new Thread("engine-launch-log") {
268270

269271
@Override
@@ -282,10 +284,14 @@ public void run() {
282284
LOG.info("Finished to get launch engine log.");
283285
}
284286
};
285-
logThread.start();
287+
engineLogThread.start();
286288
}
287289
}
288290

291+
public void setEngineLogThread(Thread logThread) {
292+
this.engineLogThread = logThread;
293+
}
294+
289295
public void executeInitSql() throws SQLException {
290296
if (initFileCompleted) return;
291297
if (initFile != null) {
@@ -937,6 +943,18 @@ public void close() throws SQLException {
937943
}
938944
}
939945

946+
private void closeOnLaunchEngineFailure() throws SQLException {
947+
if (engineLogThread != null && engineLogThread.isAlive()) {
948+
engineLogThread.interrupt();
949+
try {
950+
engineLogThread.join(DEFAULT_ENGINE_LOG_THREAD_TIMEOUT);
951+
} catch (Exception e) {
952+
}
953+
}
954+
engineLogThread = null;
955+
close();
956+
}
957+
940958
/*
941959
* (non-Javadoc)
942960
*
@@ -1669,12 +1687,14 @@ public void waitLaunchEngineToComplete() throws SQLException {
16691687
break;
16701688
}
16711689
}
1672-
} catch (SQLException e) {
1673-
engineLogInflight = false;
1674-
throw e;
16751690
} catch (Exception e) {
16761691
engineLogInflight = false;
1677-
throw new SQLException(e.toString(), "08S01", e);
1692+
closeOnLaunchEngineFailure();
1693+
if (e instanceof SQLException) {
1694+
throw e;
1695+
} else {
1696+
throw new SQLException(e.getMessage(), "08S01", e);
1697+
}
16781698
}
16791699
}
16801700
}

kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.kyuubi.{Utils, WithKyuubiServer}
3030
import org.apache.kyuubi.config.KyuubiConf
3131
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
3232
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
33+
import org.apache.kyuubi.jdbc.hive.KyuubiConnection
3334
import org.apache.kyuubi.plugin.SessionConfAdvisor
3435

3536
/**
@@ -203,6 +204,18 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
203204
}
204205
}
205206
}
207+
208+
test("close kyuubi connection on launch engine operation failure") {
209+
withSessionConf(Map.empty)(Map.empty)(Map(
210+
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true",
211+
"spark.master" -> "invalid")) {
212+
val prop = new Properties()
213+
prop.setProperty(KyuubiConnection.BEELINE_MODE_PROPERTY, "true")
214+
val kyuubiConnection = new KyuubiConnection(jdbcUrlWithConf, prop)
215+
intercept[SQLException](kyuubiConnection.waitLaunchEngineToComplete())
216+
assert(kyuubiConnection.isClosed)
217+
}
218+
}
206219
}
207220

208221
class TestSessionConfAdvisor extends SessionConfAdvisor {

0 commit comments

Comments
 (0)