-
Notifications
You must be signed in to change notification settings - Fork 3
/
GraphStoreRequestFlowBuilder.scala
253 lines (219 loc) · 8.69 KB
/
GraphStoreRequestFlowBuilder.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package ai.agnos.sparql.stream.client
import ai.agnos.sparql._
import java.io.{StringReader, StringWriter}
import java.net.URL
import java.nio.file.Path
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{HttpEntity, _}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing, Source}
import akka.util.ByteString
import ai.agnos.sparql.api._
import ai.agnos.sparql.util.HttpEndpoint
import org.eclipse.rdf4j.model.{IRI, Model}
import org.eclipse.rdf4j.rio.{RDFFormat, Rio}
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
object GraphStoreRequestFlowBuilder {
/**
* A Set of status codes on which the response will always show success = true
*/
val successfulHttpResponseStatusCodes: Set[StatusCode] = {
Set(
StatusCodes.OK,
StatusCodes.Created,
StatusCodes.Accepted,
StatusCodes.NoContent,
StatusCodes.AlreadyReported
)
}
/**
* A Set of status codes which the flow can handle gracefully, even though
* these all mean that the operation has failed. In all these cases, however
* the stream remains open. Any codes not shown on the success or failure
* list will cause the stream to fail and complete prematurely.
*/
val failingHttpResponseStatusCodes: Set[StatusCode] = {
Set(
StatusCodes.NotFound,
StatusCodes.Unauthorized,
StatusCodes.PaymentRequired,
StatusCodes.Forbidden,
StatusCodes.NotFound,
StatusCodes.ProxyAuthenticationRequired,
StatusCodes.RequestTimeout ,
StatusCodes.Conflict,
StatusCodes.Gone
)
}
}
trait GraphStoreRequestFlowBuilder extends SparqlClientHelpers with HttpClientFlowBuilder with ErrorHandlerSupport {
import SparqlClientConstants._
import GraphStoreRequestFlowBuilder._
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
/**
* If this is set to true then the response entity is "strictified", i.e. all chunks are loaded
* into memory in one go. However by being false, the result is processed
* as a proper stream but there is the risk is that the user might
* get more than a single response per request.
*/
val useStrictByteStringStrategy = true
/**
* Specifies for how long to wait for a "strict" http entity.
*/
val strictEntityReadTimeout: FiniteDuration = 60 seconds
/**
* Limit the response entity size to 100MB by default
*/
val strictEntityMaximumLengthInBytes: Int = 100 * 1024 * 1024
def graphStoreRequestFlow(
endpointFlow: HttpEndpointFlow[GraphStoreRequest]
): Flow[GraphStoreRequest, GraphStoreResponse, NotUsed] = {
Flow
.fromFunction(graphStoreOpToRequest(endpointFlow.endpoint))
.log("beforeHttpRequest")
.via(endpointFlow.flow)
.log("afterHttpRequest")
.flatMapConcat {
case (Success(response), request) =>
val gsr = GraphStoreResponse(
request,
success = calculateSuccess(response.status),
statusCode = response.status.intValue,
statusText = response.status.reason
)
makeModelSource(response.entity).map( s => gsr.copy(model = s))
case (Failure(error), _) =>
// the handler can choose to throw, which will collapse the stream or ignore the error, in which case
// no response will be returned
errorHandler.handleError(error)
Source.empty
}
}
def makeModelSource(entity: HttpEntity): Source[Option[Model], Any] = {
if ( !entity.isChunked() && (entity.isKnownEmpty() || entity.contentLengthOption.getOrElse(0) == 0)) {
// if we know there are no bytes in the entity (no-graph has been returned)
// or the reponse content type is not what we have requested then no model is emitted.
entity.discardBytes()
Source.single(None)
} else if ( !useStrictByteStringStrategy && entity.isChunked()) {
// NB: mapping over the data bytes stream won't work because the stream will never emit for empty entities
// the trick is to introduce a scan() call, which will emit an empty string even if nothing comes through.
entity.withoutSizeLimit().dataBytes
.via(Framing.delimiter(ByteString.fromString("\n"), maximumFrameLength = strictEntityMaximumLengthInBytes, allowTruncation = true))
.scan(ByteString.empty)((a,b) => b ++ a)
.filter(_.nonEmpty)
.map { bs =>
if ( !bs.isEmpty ) {
val reader = new StringReader(bs.utf8String)
val mt = Try(Rio.parse(reader, "", mapContentTypeToRdfFormat(entity.contentType)))
mt.toOption
} else {
None
}
}
} else { //i.e. if useStrictByteStringStrategy is true
// this workaround does seem to be alright for smaller graphs that can be
// converted to a strict in-memory entity - currently this off by default
Source.single(entity.withSizeLimit(strictEntityMaximumLengthInBytes))
.mapAsync(numberOfCpuCores)(_.toStrict(strictEntityReadTimeout))
.map { bs =>
val reader = new StringReader(bs.data.utf8String)
val mt = Try(Rio.parse(reader, "", mapContentTypeToRdfFormat(entity.contentType)))
mt.toOption
}
}
}
/**
* Returns true or false if a supported success or failure code is given. For unsupported
* codes, a SparqlClientRequestFailed is thrown.
*
* @param statusCode
* @return
*/
def calculateSuccess(statusCode: StatusCode): Boolean = {
if (successfulHttpResponseStatusCodes.contains(statusCode)) true
else if (failingHttpResponseStatusCodes.contains(statusCode)) false
else {
throw SparqlClientRequestFailed(s"request failed with status code: $statusCode")
}
}
def graphStoreOpToRequest(endpoint: HttpEndpoint)
(graphStoreRequest: GraphStoreRequest): (HttpRequest, GraphStoreRequest) = {
(makeHttpRequest(endpoint, graphStoreRequest), graphStoreRequest)
}
def makeHttpRequest(endpoint: HttpEndpoint, request: GraphStoreRequest): HttpRequest = {
request match {
case GetGraphM(graphUri, method) =>
HttpRequest(
method, uri = s"${endpoint.path}${mapGraphOptionToPath(graphUri)}"
).withHeaders(
Accept(`application/n-triples`.mediaType)
:: makeRequestHeaders(endpoint)
)
case DropGraphM(graphUri, method) =>
HttpRequest(
method, uri = s"${endpoint.path}${mapGraphOptionToPath(graphUri)}"
).withHeaders(makeRequestHeaders(endpoint))
case InsertGraphFromModelM(model, format, graphUri, method) =>
makeInsertGraphHttpRequest(endpoint, method, graphUri, mapRdfFormatToContentType(format)) {
() => makeGraphSource(model, format)
}
case InsertGraphFromURLM(url, format, graphUri, method) =>
makeInsertGraphHttpRequest(endpoint, method, graphUri, mapRdfFormatToContentType(format)) {
() => makeGraphSource(url, format)
}
case InsertGraphFromPathM(path, format, graphUri, method) =>
makeInsertGraphHttpRequest(endpoint, method, graphUri, mapRdfFormatToContentType(format)) {
() => makeGraphSource(path, format)
}
}
}
private def makeInsertGraphHttpRequest
(
endpoint: HttpEndpoint,
method: HttpMethod,
graphIri: Option[IRI],
contentType: ContentType
)
(
entitySourceCreator: () => Source[ByteString, Any]
): HttpRequest = {
HttpRequest(
method, uri = s"${endpoint.path}${mapGraphOptionToPath(graphIri)}"
)
.withHeaders(makeRequestHeaders(endpoint))
.withEntity(
entity = HttpEntity(
contentType = contentType,
data = entitySourceCreator()
)
)
}
private def mapGraphOptionToPath(graphIri: Option[IRI]): String = graphIri match {
case Some(uri) => s"?$GRAPH_PARAM_NAME=${urlEncode(uri.toString)}"
case None => s"?$DEFAULT_PARAM_NAME"
}
private def makeGraphSource(model: Model, format: RDFFormat): Source[ByteString, Any] = {
Source.single(model)
.map { model =>
val writer = new StringWriter()
Rio.write(model, writer, format)
ByteString(writer.getBuffer.toString, "UTF-8")
}
}
private def makeGraphSource(fileUrl: URL, format: RDFFormat): Source[ByteString, Any] = {
Source.single(Uri(fileUrl.toURI.toString))
.mapAsync(1)(uri => Http().singleRequest {
HttpRequest(uri = uri).withHeaders(Accept(mapRdfFormatToContentType(format).mediaType))
})
.flatMapConcat(res => res.entity.dataBytes)
}
private def makeGraphSource(filePath: Path, format: RDFFormat): Source[ByteString, Any] = {
FileIO.fromPath(filePath)
}
}