Skip to content

Commit 10affbf

Browse files
committed
[KYUUBI #2962] [SUB-TASK][KPIP-4] Throw exception if the metadata update count is zero
### _Why are the changes needed?_ Throw exception if the metadata update count is zero, in case that the pre insert metadata operation failed. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2962 from turboFei/retry_on_created. Closes #2962 3610be9 [Fei Wang] If update count is zero, throw exception 25d957c [Fei Wang] Revert "If the metadata does not be created successfully, put into retry queue" 24cdc7e [Fei Wang] If the metadata does not be created successfully, put into retry queue Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
1 parent 6e4b558 commit 10affbf

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,14 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
269269
queryBuilder.append(" WHERE identifier = ? ")
270270
params += metadata.identifier
271271

272+
val query = queryBuilder.toString()
272273
withConnection() { connection =>
273-
execute(connection, queryBuilder.toString(), params: _*)
274-
274+
withUpdateCount(connection, query, params: _*) { updateCount =>
275+
if (updateCount == 0) {
276+
throw new KyuubiException(
277+
s"Error updating metadata for ${metadata.identifier} with $query")
278+
}
279+
}
275280
}
276281
}
277282

@@ -397,6 +402,26 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
397402
}
398403
}
399404

405+
private def withUpdateCount[T](
406+
conn: Connection,
407+
sql: String,
408+
params: Any*)(f: Int => T): T = {
409+
debug(s"executing sql $sql with update count")
410+
var statement: PreparedStatement = null
411+
try {
412+
statement = conn.prepareStatement(sql)
413+
setStatementParams(statement, params: _*)
414+
f(statement.executeUpdate())
415+
} catch {
416+
case e: SQLException =>
417+
throw new KyuubiException(e.getMessage, e)
418+
} finally {
419+
if (statement != null) {
420+
Utils.tryLogNonFatalError(statement.close())
421+
}
422+
}
423+
}
424+
400425
private def setStatementParams(statement: PreparedStatement, params: Any*): Unit = {
401426
params.zipWithIndex.foreach { case (param, index) =>
402427
param match {

kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.UUID
2222
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2323
import org.scalatest.time.SpanSugar._
2424

25-
import org.apache.kyuubi.KyuubiFunSuite
25+
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
2626
import org.apache.kyuubi.config.KyuubiConf
2727
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
2828
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._
@@ -266,4 +266,11 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
266266
assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
267267
}
268268
}
269+
270+
test("throw exception if update count is 0") {
271+
val metadata = Metadata(identifier = UUID.randomUUID().toString, state = "RUNNING")
272+
intercept[KyuubiException] {
273+
jdbcMetadataStore.updateMetadata(metadata)
274+
}
275+
}
269276
}

0 commit comments

Comments
 (0)