Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate Blazegraph clients for querying and indexing #2538

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci-composite-plugin.yml
Expand Up @@ -10,6 +10,8 @@ on:
- 'delta/sourcing/**'
- 'delta/testkit/**'
- 'delta/plugins/composite-views/**'
- 'delta/plugins/blazegraph/**'
- 'delta/plugins/elasticsearch/**'
- 'build.sbt'
- 'project/**'
jobs:
Expand Down
12 changes: 10 additions & 2 deletions delta/plugins/blazegraph/src/main/resources/blazegraph.conf
Expand Up @@ -23,14 +23,22 @@ plugins.blazegraph {
# username = "username"
# password = "password"
#}
# configuration of the Blazegraph client
client {
# configuration of the indexing Blazegraph client
indexing-client {
# the retry strategy for the http client
retry = ${app.defaults.constant-retry-strategy}
# the strategy to decide if it is worth retrying when an Http error occurs.
# allowed strategies are 'always', 'never' or 'onServerError'.
is-worth-retrying = "onServerError"
}
# configuration of the query Blazegraph client
query-client {
# the retry strategy for the http client
retry = ${app.defaults.never-retry-strategy}
# the strategy to decide if it is worth retrying when an Http error occurs.
# allowed strategies are 'always', 'never' or 'onServerError'.
is-worth-retrying = "never"
}
# Blazegraph query timeout
query-timeout = "1 minute"
# the storages aggregate configuration
Expand Down
Expand Up @@ -38,13 +38,23 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {

make[EventLog[Envelope[BlazegraphViewEvent]]].fromEffect { databaseEventLog[BlazegraphViewEvent](_, _) }

make[HttpClient].named("blazegraph-client").from {
make[HttpClient].named("http-indexing-client").from {
(cfg: BlazegraphViewsConfig, as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient()(cfg.client, as.classicSystem, sc)
HttpClient()(cfg.indexingClient, as.classicSystem, sc)
}

make[BlazegraphClient].from {
(cfg: BlazegraphViewsConfig, client: HttpClient @Id("blazegraph-client"), as: ActorSystem[Nothing]) =>
make[BlazegraphClient].named("blazegraph-indexing-client").from {
(cfg: BlazegraphViewsConfig, client: HttpClient @Id("http-indexing-client"), as: ActorSystem[Nothing]) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
}

make[HttpClient].named("http-query-client").from {
(cfg: BlazegraphViewsConfig, as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient()(cfg.queryClient, as.classicSystem, sc)
}

make[BlazegraphClient].named("blazegraph-query-client").from {
(cfg: BlazegraphViewsConfig, client: HttpClient @Id("http-query-client"), as: ActorSystem[Nothing]) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
}

Expand Down Expand Up @@ -74,7 +84,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {

make[BlazegraphIndexingStream].from {
(
client: BlazegraphClient,
client: BlazegraphClient @Id("blazegraph-indexing-client"),
projection: Projection[Unit],
indexingSource: IndexingSource @Id("blazegraph-source"),
cache: ProgressesCache @Id("blazegraph-progresses"),
Expand All @@ -91,7 +101,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
}

make[BlazegraphIndexingCleanup].from {
(client: BlazegraphClient, cache: ProgressesCache @Id("blazegraph-progresses")) =>
(client: BlazegraphClient @Id("blazegraph-indexing-client"), cache: ProgressesCache @Id("blazegraph-progresses")) =>
new BlazegraphIndexingCleanup(client, cache)
}

Expand Down Expand Up @@ -120,7 +130,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
log: EventLog[Envelope[BlazegraphViewEvent]],
contextResolution: ResolverContextResolution,
resourceIdCheck: ResourceIdCheck,
client: BlazegraphClient,
client: BlazegraphClient @Id("blazegraph-indexing-client"),
permissions: Permissions,
orgs: Organizations,
projects: Projects,
Expand All @@ -142,7 +152,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
acls: Acls,
views: BlazegraphViews,
projects: Projects,
client: BlazegraphClient,
client: BlazegraphClient @Id("blazegraph-query-client"),
cfg: BlazegraphViewsConfig
) =>
BlazegraphViewsQuery(acls, views, projects, client)(cfg.indexing)
Expand Down Expand Up @@ -202,7 +212,9 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {

many[PriorityRoute].add { (route: BlazegraphViewsRoutes) => PriorityRoute(priority, route.routes) }

many[ServiceDependency].add { new BlazegraphServiceDependency(_) }
many[ServiceDependency].add { (client: BlazegraphClient @Id("blazegraph-indexing-client")) =>
new BlazegraphServiceDependency(client)
}

many[ReferenceExchange].add { (views: BlazegraphViews) =>
BlazegraphViews.referenceExchange(views)
Expand Down
Expand Up @@ -21,21 +21,23 @@ import scala.util.Try
/**
* Configuration for the Blazegraph views module.
*
* @param base the base uri to the Blazegraph HTTP endpoint
* @param credentials the Blazegraph HTTP endpoint credentials
* @param client configuration of the Blazegraph client
* @param base the base uri to the Blazegraph HTTP endpoint
* @param credentials the Blazegraph HTTP endpoint credentials
* @param indexingClient configuration of the indexing Blazegraph client
* @param queryClient configuration of the query Blazegraph client
* @param queryTimeout the Blazegraph query timeout
* @param aggregate configuration of the underlying aggregate
* @param keyValueStore configuration of the underlying key/value store
* @param pagination configuration for how pagination should behave in listing operations
* @param cacheIndexing configuration of the cache indexing process
* @param indexing configuration of the external indexing process
* @param maxViewRefs configuration of the maximum number of view references allowed on an aggregated view
* @param aggregate configuration of the underlying aggregate
* @param keyValueStore configuration of the underlying key/value store
* @param pagination configuration for how pagination should behave in listing operations
* @param cacheIndexing configuration of the cache indexing process
* @param indexing configuration of the external indexing process
* @param maxViewRefs configuration of the maximum number of view references allowed on an aggregated view
*/
final case class BlazegraphViewsConfig(
base: Uri,
credentials: Option[Credentials],
client: HttpClientConfig,
indexingClient: HttpClientConfig,
queryClient: HttpClientConfig,
queryTimeout: Duration,
aggregate: AggregateConfig,
keyValueStore: KeyValueStoreConfig,
Expand Down
Expand Up @@ -24,6 +24,7 @@ trait BlazegraphViewsSetup extends IOValues with ConfigFixtures with IOFixedCloc
baseUri.toString,
None,
httpClientConfig,
httpClientConfig,
1.second,
aggregate,
keyValueStore,
Expand Down
Expand Up @@ -143,7 +143,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
make[CompositeIndexingStream].from {
(
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient,
blazeClient: BlazegraphClient @Id("blazegraph-indexing-client"),
projection: Projection[Unit],
deltaClient: DeltaClient,
indexingController: CompositeIndexingController,
Expand Down Expand Up @@ -173,7 +173,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
make[CompositeIndexingCleanup].from {
(
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient,
blazeClient: BlazegraphClient @Id("blazegraph-indexing-client"),
cache: ProgressesCache @Id("composite-progresses"),
config: CompositeViewsConfig
) =>
Expand Down Expand Up @@ -217,7 +217,12 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
)

make[BlazegraphQuery].from {
(acls: Acls, views: CompositeViews, client: BlazegraphClient, cfg: CompositeViewsConfig) =>
(
acls: Acls,
views: CompositeViews,
client: BlazegraphClient @Id("blazegraph-query-client"),
cfg: CompositeViewsConfig
) =>
BlazegraphQuery(acls, views, client)(cfg.blazegraphIndexing)

}
Expand Down
Expand Up @@ -124,7 +124,7 @@ class CompositeIndexingSpec
implicit private val httpConfig = HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never)
private val httpClient = HttpClient()
private val esClient = new ElasticSearchClient(httpClient, elasticsearchHost.endpoint)
private val blazeClient = BlazegraphClient(httpClient, blazegraphHostConfig.endpoint, None)
private val blazeClient = BlazegraphClient(httpClient, blazegraphHostConfig.endpoint, None, 10.seconds)

private val museId = iri"http://music.com/muse"
private val museUuid = UUID.randomUUID()
Expand Down