Skip to content
Permalink
Browse files

feat(store): Return a clearer exception when a triplestore read timeo…

…ut occurs. (#1795)
  • Loading branch information
benjamingeer committed Jan 26, 2021
1 parent 69ae7fd commit 0eeb3b3b29fb5a3351e944c28ba0bc20fe7598ac
@@ -299,6 +299,20 @@ object TriplestoreConnectionException {
TriplestoreConnectionException(message, Some(ExceptionUtil.logAndWrapIfNotSerializable(e, log)))
}

/**
* Indicates that a read timeout occurred while waiting for data from the triplestore.
*
* @param message a description of the error.
* @param cause the original exception representing the cause of the error, if any.
*/
case class TriplestoreTimeoutException(message: String, cause: Option[Throwable] = None)
extends TriplestoreException(message, cause)

object TriplestoreTimeoutException {
def apply(message: String, e: Throwable, log: LoggingAdapter): TriplestoreTimeoutException =
TriplestoreTimeoutException(message, Some(ExceptionUtil.logAndWrapIfNotSerializable(e, log)))
}

/**
* Indicates that we tried using a feature which is unsuported by the selected triplestore.
*
@@ -355,6 +355,11 @@ case class CheckTriplestoreRequest() extends TriplestoreRequest
*/
case class CheckTriplestoreResponse(triplestoreStatus: TriplestoreStatus, msg: String)

/**
* Simulates a triplestore timeout. Used only in testing.
*/
case class SimulateTimeoutRequest() extends TriplestoreRequest

/**
* Requests that the repository is updated to be compatible with the running version of Knora.
*/
@@ -231,18 +231,31 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
case UploadRepositoryRequest(inputFile: Path) => try2Message(sender(), uploadRepository(inputFile), log)
case InsertGraphDataContentRequest(graphContent: String, graphName: String) =>
try2Message(sender(), insertDataGraphRequest(graphContent, graphName), log)
case SimulateTimeoutRequest() => try2Message(sender(), doSimulateTimeout(), log)
case other =>
sender ! Status.Failure(
UnexpectedMessageException(s"Unexpected message $other of type ${other.getClass.getCanonicalName}"))
}

/**
* Simulates a read timeout.
*/
private def doSimulateTimeout(): Try[SparqlSelectResult] = {
val sparql = """SELECT ?foo WHERE {
| BIND("foo" AS ?foo)
|}""".stripMargin

sparqlHttpSelect(sparql = sparql, simulateTimeout = true)
}

/**
* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
*
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return a [[SparqlSelectResult]].
*/
private def sparqlHttpSelect(sparql: String): Try[SparqlSelectResult] = {
private def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): Try[SparqlSelectResult] = {
def parseJsonResponse(sparql: String, resultStr: String): Try[SparqlSelectResult] = {
val parseTry = Try {
resultStr.parseJson.convertTo[SparqlSelectResult]
@@ -265,7 +278,7 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
Try(FakeTriplestore.data(sparql))
} else {
// No: get the response from the real triplestore over HTTP.
getSparqlHttpResponse(sparql, isUpdate = false)
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout)
}

// Are we preparing a fake triplestore?
@@ -833,11 +846,13 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
* @param sparql the SPARQL request to be submitted.
* @param isUpdate `true` if this is an update request.
* @param acceptMimeType the MIME type to be provided in the HTTP Accept header.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return the triplestore's response.
*/
private def getSparqlHttpResponse(sparql: String,
isUpdate: Boolean,
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson): Try[String] = {
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson,
simulateTimeout: Boolean = false): Try[String] = {

val httpContext: HttpClientContext = makeHttpContext

@@ -869,7 +884,8 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
client = httpClient,
request = httpPost,
context = httpContext,
processResponse = returnResponseAsString
processResponse = returnResponseAsString,
simulateTimeout = simulateTimeout
)
}

@@ -990,18 +1006,24 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
* @param request the request to be sent.
* @param context the request context to be used.
* @param processResponse a function that processes the HTTP response.
* @param simulateTimeout if `true`, simulate a read timeout.
* @tparam T the return type of `processResponse`.
* @return the return value of `processResponse`.
*/
private def doHttpRequest[T](client: CloseableHttpClient,
request: HttpRequest,
context: HttpClientContext,
processResponse: CloseableHttpResponse => T): Try[T] = {
processResponse: CloseableHttpResponse => T,
simulateTimeout: Boolean = false): Try[T] = {
// Make an Option wrapper for the response, so we can close it if we get one,
// even if an error occurs.
var maybeResponse: Option[CloseableHttpResponse] = None

val triplestoreResponseTry = Try {
if (simulateTimeout) {
throw new java.net.SocketTimeoutException("Simulated read timeout")
}

val start = System.currentTimeMillis()
val response = client.execute(targetHost, request, context)
maybeResponse = Some(response)
@@ -1028,15 +1050,21 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat

maybeResponse.foreach(_.close)

// TODO: Can we throw a more user-friendly exception if the query timed out?
// TODO: Can we make Fuseki abandon the query if it takes too long?

triplestoreResponseTry.recover {
case tre: TriplestoreResponseException => throw tre

case socketTimeoutException: java.net.SocketTimeoutException =>
val message =
"The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help."
log.error(socketTimeoutException, message)
throw TriplestoreTimeoutException(message = message, e = socketTimeoutException, log = log)

case e: Exception =>
log.error(e, s"Failed to connect to triplestore")
throw TriplestoreConnectionException(s"Failed to connect to triplestore", e, log)
val message = "Failed to connect to triplestore"
log.error(e, message)
throw TriplestoreConnectionException(message = message, e = e, log = log)
}
}

@@ -21,6 +21,24 @@ scala_test(
] + BASE_TEST_DEPENDENCIES,
)

scala_test(
name = "HttpTriplestoreConnectorSpec",
size = "small",
srcs = [
"HttpTriplestoreConnectorSpec.scala",
],
data = [
"//knora-ontologies",
"//test_data",
],
jvm_flags = ["-Dconfig.resource=fuseki.conf"],
# unused_dependency_checker_mode = "warn",
deps = ALL_WEBAPI_MAIN_DEPENDENCIES + [
"//webapi:main_library",
"//webapi:test_library",
] + BASE_TEST_DEPENDENCIES,
)

scala_test(
name = "GraphDBConsistencyCheckingSpec",
size = "small", # 60s
@@ -0,0 +1,44 @@
/*
* Copyright © 2015-2021 the contributors (see Contributors.md).
*
* This file is part of Knora.
*
* Knora is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Knora is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with Knora. If not, see <http://www.gnu.org/licenses/>.
*/

package org.knora.webapi.store.triplestore

import akka.testkit.ImplicitSender
import org.knora.webapi.CoreSpec
import org.knora.webapi.exceptions.TriplestoreTimeoutException
import org.knora.webapi.messages.store.triplestoremessages.SimulateTimeoutRequest

import scala.concurrent.duration._

class HttpTriplestoreConnectorSpec extends CoreSpec() with ImplicitSender {
private val timeout = 10.seconds

"The HttpTriplestoreConnector" should {
"report a connection timeout with an appropriate error message" in {
storeManager ! SimulateTimeoutRequest()

expectMsgPF(timeout) {
case msg: akka.actor.Status.Failure =>
assert(msg.cause.isInstanceOf[TriplestoreTimeoutException])
assert(
msg.cause.getMessage == "The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help.")
}
}
}
}

0 comments on commit 0eeb3b3

Please sign in to comment.