Skip to content

Commit

Permalink
Enable incremental metadata updates
Browse files Browse the repository at this point in the history
This refactors internal metadata handling so that it can be incrementally updated.

Metadata objects now have a `state` field, which represents their validation state,
which can be invalidated, and be listened to.
This allows actors to listen to metadata invalidations and act upon changes.

This commit also adds a metadata accumulation bus that
abstracts the logic for merging metadata from multiple sources that
existed before in the federated actor.
This will enable this logic to be used in other places, and for other
actors to manage how metadata is merged.

Closes #1156
Closes #1180

May be related to comunica/comunica-feature-link-traversal#102
  • Loading branch information
rubensworks committed May 22, 2023
1 parent f655f5a commit f906548
Show file tree
Hide file tree
Showing 118 changed files with 3,204 additions and 334 deletions.
1 change: 1 addition & 0 deletions .componentsjs-generator-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"IQueryableResultVoid",
"PathVariableObjectIterator",
"ClosableTransformIterator",
"MetadataValidationState",

"RDF.Stream",
"Omit",
Expand Down
2 changes: 2 additions & 0 deletions engines/config-query-sparql/config/config-default.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
"ccqs:config/rdf-join-selectivity/mediators.json",
"ccqs:config/rdf-metadata/actors.json",
"ccqs:config/rdf-metadata/mediators.json",
"ccqs:config/rdf-metadata-accumulate/actors.json",
"ccqs:config/rdf-metadata-accumulate/mediators.json",
"ccqs:config/rdf-metadata-extract/actors.json",
"ccqs:config/rdf-metadata-extract/mediators.json",
"ccqs:config/rdf-parse/actors.json",
Expand Down
3 changes: 3 additions & 0 deletions engines/config-query-sparql/config/config-rdfjs.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ccqs:config/rdf-join-selectivity/actors.json",
"ccqs:config/rdf-join-selectivity/mediators.json",

"ccqs:config/rdf-metadata-accumulate/actors.json",
"ccqs:config/rdf-metadata-accumulate/mediators.json",

"ccqs:config/rdf-resolve-quad-pattern/actors-rdfjs.json",
"ccqs:config/rdf-resolve-quad-pattern/mediators.json",
"ccqs:config/rdf-serialize/actors.json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"@context": [
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/runner/^2.0.0/components/context.jsonld",

"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/actor-rdf-metadata-accumulate-cancontainundefs/^2.0.0/components/context.jsonld",
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/actor-rdf-metadata-accumulate-cardinality/^2.0.0/components/context.jsonld",
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/actor-rdf-metadata-accumulate-pagesize/^2.0.0/components/context.jsonld",
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/actor-rdf-metadata-accumulate-requesttime/^2.0.0/components/context.jsonld"
],
"@id": "urn:comunica:default:Runner",
"@type": "Runner",
"actors": [
{
"@id": "urn:comunica:default:rdf-metadata-accumulate/actors#cancontainundefs",
"@type": "ActorRdfMetadataAccumulateCanContainUndefs"
},
{
"@id": "urn:comunica:default:rdf-metadata-accumulate/actors#cardinality",
"@type": "ActorRdfMetadataAccumulateCardinality"
},
{
"@id": "urn:comunica:default:rdf-metadata-accumulate/actors#pagesize",
"@type": "ActorRdfMetadataAccumulatePageSize"
},
{
"@id": "urn:comunica:default:rdf-metadata-accumulate/actors#requesttime",
"@type": "ActorRdfMetadataAccumulateRequestTime"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"@context": [
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/bus-rdf-metadata-accumulate/^2.0.0/components/context.jsonld",
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/mediator-combine-union/^2.0.0/components/context.jsonld"
],
"@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main",
"@type": "MediatorCombineUnion",
"bus": { "@id": "ActorRdfMetadataAccumulate:_default_bus" },
"field": "metadata"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
{
"@id": "urn:comunica:default:rdf-resolve-quad-pattern/actors#federated",
"@type": "ActorRdfResolveQuadPatternFederated",
"mediatorResolveQuadPattern": { "@id": "urn:comunica:default:rdf-resolve-quad-pattern/mediators#main" }
"mediatorResolveQuadPattern": { "@id": "urn:comunica:default:rdf-resolve-quad-pattern/mediators#main" },
"mediatorRdfMetadataAccumulate": { "@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main" }
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"mediatorDereferenceRdf": { "@id": "urn:comunica:default:dereference-rdf/mediators#main" },
"mediatorMetadata": { "@id": "urn:comunica:default:rdf-metadata/mediators#main" },
"mediatorMetadataExtract": { "@id": "urn:comunica:default:rdf-metadata-extract/mediators#main" },
"mediatorMetadataAccumulate": { "@id": "urn:comunica:default:rdf-metadata-accumulate/mediators#main" },
"mediatorRdfResolveHypermedia": { "@id": "urn:comunica:default:rdf-resolve-hypermedia/mediators#main" },
"mediatorRdfResolveHypermediaLinks": { "@id": "urn:comunica:default:rdf-resolve-hypermedia-links/mediators#main" },
"mediatorRdfResolveHypermediaLinksQueue": { "@id": "urn:comunica:default:rdf-resolve-hypermedia-links-queue/mediators#main" }
Expand Down
4 changes: 4 additions & 0 deletions engines/query-sparql-file/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@
"@comunica/actor-rdf-join-optional-bind": "^2.6.8",
"@comunica/actor-rdf-join-optional-nestedloop": "^2.6.8",
"@comunica/actor-rdf-join-selectivity-variable-counting": "^2.6.8",
"@comunica/actor-rdf-metadata-accumulate-cancontainundefs": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-cardinality": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-pagesize": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-requesttime": "^2.5.0",
"@comunica/actor-rdf-metadata-all": "^2.6.8",
"@comunica/actor-rdf-metadata-extract-allow-http-methods": "^2.6.8",
"@comunica/actor-rdf-metadata-extract-hydra-controls": "^2.6.8",
Expand Down
4 changes: 4 additions & 0 deletions engines/query-sparql-rdfjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
"@comunica/actor-rdf-join-optional-bind": "^2.6.8",
"@comunica/actor-rdf-join-optional-nestedloop": "^2.6.8",
"@comunica/actor-rdf-join-selectivity-variable-counting": "^2.6.8",
"@comunica/actor-rdf-metadata-accumulate-cancontainundefs": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-cardinality": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-pagesize": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-requesttime": "^2.5.0",
"@comunica/actor-rdf-resolve-quad-pattern-federated": "^2.6.8",
"@comunica/actor-rdf-resolve-quad-pattern-rdfjs-source": "^2.6.8",
"@comunica/actor-rdf-serialize-jsonld": "^2.6.8",
Expand Down
4 changes: 4 additions & 0 deletions engines/query-sparql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@
"@comunica/actor-rdf-join-optional-bind": "^2.6.8",
"@comunica/actor-rdf-join-optional-nestedloop": "^2.6.8",
"@comunica/actor-rdf-join-selectivity-variable-counting": "^2.6.8",
"@comunica/actor-rdf-metadata-accumulate-cancontainundefs": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-cardinality": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-pagesize": "^2.5.0",
"@comunica/actor-rdf-metadata-accumulate-requesttime": "^2.5.0",
"@comunica/actor-rdf-metadata-all": "^2.6.8",
"@comunica/actor-rdf-metadata-extract-allow-http-methods": "^2.6.8",
"@comunica/actor-rdf-metadata-extract-hydra-controls": "^2.6.8",
Expand Down
3 changes: 2 additions & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"message": "Bump to release version %s",
"allowBranch": [
"master",
"next/major"
"next/major",
"feature/adaptive-join"
],
"npmClient": "npm"
}
Expand Down
5 changes: 5 additions & 0 deletions packages/actor-init-query/test/QueryEngineBase-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Readable, Transform } from 'stream';
import { BindingsFactory } from '@comunica/bindings-factory';
import { KeysInitQuery } from '@comunica/context-entries';
import { Bus, ActionContext, ActionContextKey } from '@comunica/core';
import { MetadataValidationState } from '@comunica/metadata';
import type {
IPhysicalQueryPlanLogger,
IActionContext, QueryStringContext, IQueryBindingsEnhanced, IQueryQuadsEnhanced,
Expand Down Expand Up @@ -551,6 +552,7 @@ describe('QueryEngineBase', () => {
]),
]),
metadata: async() => ({
state: new MetadataValidationState(),
cardinality: { type: 'estimate', value: 1 },
canContainUndefs: false,
variables: [ DF.variable('a') ],
Expand All @@ -565,6 +567,7 @@ describe('QueryEngineBase', () => {
]),
]);
expect(await final.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 1 },
canContainUndefs: false,
variables: [ DF.variable('a') ],
Expand All @@ -579,6 +582,7 @@ describe('QueryEngineBase', () => {
DF.quad(DF.namedNode('ex:a'), DF.namedNode('ex:a'), DF.namedNode('ex:a')),
]),
metadata: async() => ({
state: new MetadataValidationState(),
cardinality: { type: 'estimate', value: 1 },
canContainUndefs: false,
variables: [ DF.variable('a') ],
Expand All @@ -591,6 +595,7 @@ describe('QueryEngineBase', () => {
DF.quad(DF.namedNode('ex:a'), DF.namedNode('ex:a'), DF.namedNode('ex:a')),
]);
expect(await final.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 1 },
canContainUndefs: false,
variables: [ DF.variable('a') ],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ActorQueryOperation } from '@comunica/bus-query-operation';
import { ActionContext, Bus } from '@comunica/core';
import { MetadataValidationState } from '@comunica/metadata';
import type { IQueryOperationResultQuads } from '@comunica/types';
import type * as RDF from '@rdfjs/types';
import arrayifyStream from 'arrayify-stream';
Expand All @@ -21,6 +22,7 @@ describe('ActorQueryOperationDescribeSubject', () => {
const patterns = [ ...arg.operation.input.input[0].patterns, ...arg.operation.input.input[1].patterns ];
return {
metadata: () => Promise.resolve({
state: new MetadataValidationState(),
cardinality: {
type: 'estimate',
value: arg.operation.input.input[0].patterns.length + arg.operation.input.input[1].patterns.length,
Expand All @@ -36,6 +38,7 @@ describe('ActorQueryOperationDescribeSubject', () => {
}
return {
metadata: () => Promise.resolve({
state: new MetadataValidationState(),
cardinality: { type: 'estimate', value: arg.operation.input.patterns.length },
}),
quadStream: new ArrayIterator(arg.operation.input.patterns.map(
Expand Down Expand Up @@ -94,7 +97,9 @@ describe('ActorQueryOperationDescribeSubject', () => {
};
return actor.run(op).then(async(output: IQueryOperationResultQuads) => {
expect(await output.metadata())
.toEqual({ cardinality: { type: 'estimate', value: 2 }, canContainUndefs: false });
.toEqual({ state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 2 },
canContainUndefs: false });
expect(output.type).toEqual('quads');
expect(await arrayifyStream(output.quadStream)).toEqual([
DF.quad(DF.namedNode('a'), DF.namedNode('__predicate'), DF.namedNode('__object')),
Expand All @@ -114,7 +119,9 @@ describe('ActorQueryOperationDescribeSubject', () => {
};
return actor.run(op).then(async(output: IQueryOperationResultQuads) => {
expect(await output.metadata())
.toEqual({ cardinality: { type: 'estimate', value: 3 }, canContainUndefs: false });
.toEqual({ state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 3 },
canContainUndefs: false });
expect(output.type).toEqual('quads');
expect(await arrayifyStream(output.quadStream)).toEqual([
DF.quad(DF.namedNode('a'), DF.namedNode('b'), DF.namedNode('dummy')),
Expand All @@ -135,7 +142,9 @@ describe('ActorQueryOperationDescribeSubject', () => {
};
return actor.run(op).then(async(output: IQueryOperationResultQuads) => {
expect(await output.metadata())
.toEqual({ cardinality: { type: 'estimate', value: 4 }, canContainUndefs: false });
.toEqual({ state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 4 },
canContainUndefs: false });
expect(output.type).toEqual('quads');
expect(await arrayifyStream(output.quadStream)).toEqual([
DF.quad(DF.namedNode('c'), DF.namedNode('__predicate'), DF.namedNode('__object')),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { BindingsFactory } from '@comunica/bindings-factory';
import type { IActorQueryOperationTypedMediatedArgs } from '@comunica/bus-query-operation';
import { ActorQueryOperationTypedMediated } from '@comunica/bus-query-operation';
import type { IActorTest } from '@comunica/core';
import { MetadataValidationState } from '@comunica/metadata';
import type { IActionContext, IQueryOperationResult } from '@comunica/types';
import { SingletonIterator } from 'asynciterator';
import type { Algebra } from 'sparqlalgebrajs';
Expand All @@ -24,6 +25,7 @@ export class ActorQueryOperationNop extends ActorQueryOperationTypedMediated<Alg
return {
bindingsStream: new SingletonIterator(BF.bindings()),
metadata: () => Promise.resolve({
state: new MetadataValidationState(),
cardinality: { type: 'exact', value: 1 },
canContainUndefs: false,
variables: [],
Expand Down
1 change: 1 addition & 0 deletions packages/actor-query-operation-nop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"@comunica/bindings-factory": "^2.5.1",
"@comunica/bus-query-operation": "^2.6.8",
"@comunica/core": "^2.6.8",
"@comunica/metadata": "^2.5.0",
"@comunica/types": "^2.6.8",
"asynciterator": "^3.8.0",
"sparqlalgebrajs": "^4.0.5"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BindingsFactory } from '@comunica/bindings-factory';
import { ActorQueryOperation } from '@comunica/bus-query-operation';
import { ActionContext, Bus } from '@comunica/core';
import { MetadataValidationState } from '@comunica/metadata';
import { ArrayIterator } from 'asynciterator';
import { DataFactory } from 'rdf-data-factory';
import { Algebra, Factory } from 'sparqlalgebrajs';
Expand All @@ -25,6 +26,7 @@ describe('ActorQueryOperationPathAlt', () => {
BF.bindings([[ DF.variable('x'), DF.literal('3') ]]),
]),
metadata: () => Promise.resolve({
state: new MetadataValidationState(),
cardinality: { type: 'estimate', value: 3 },
canContainUndefs: false,
variables: [ DF.variable('a') ],
Expand Down Expand Up @@ -88,6 +90,7 @@ describe('ActorQueryOperationPathAlt', () => {
context: new ActionContext() };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 6 },
canContainUndefs: false,
variables: [ DF.variable('a') ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ActorAbstractPath } from '@comunica/actor-abstract-path';
import { BindingsFactory } from '@comunica/bindings-factory';
import type { IActorQueryOperationTypedMediatedArgs } from '@comunica/bus-query-operation';
import { ActorQueryOperation } from '@comunica/bus-query-operation';
import { MetadataValidationState } from '@comunica/metadata';
import type { Bindings, IQueryOperationResult, IActionContext } from '@comunica/types';
import { SingletonIterator } from 'asynciterator';
import { Algebra } from 'sparqlalgebrajs';
Expand Down Expand Up @@ -33,6 +34,7 @@ export class ActorQueryOperationPathZeroOrOne extends ActorAbstractPath {
type: 'bindings',
bindingsStream: new SingletonIterator(BF.bindings()),
metadata: () => Promise.resolve({
state: new MetadataValidationState(),
cardinality: { type: 'exact', value: 1 },
canContainUndefs: false,
variables: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"@comunica/actor-abstract-path": "^2.6.8",
"@comunica/bindings-factory": "^2.5.1",
"@comunica/bus-query-operation": "^2.6.8",
"@comunica/metadata": "^2.5.0",
"@comunica/types": "^2.6.8",
"asynciterator": "^3.8.0",
"sparqlalgebrajs": "^4.0.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { BindingsFactory } from '@comunica/bindings-factory';
import { ActorQueryOperation } from '@comunica/bus-query-operation';
import { KeysQueryOperation } from '@comunica/context-entries';
import { Bus, ActionContext } from '@comunica/core';
import { MetadataValidationState } from '@comunica/metadata';
import type * as RDF from '@rdfjs/types';
import { ArrayIterator } from 'asynciterator';
import { DataFactory } from 'rdf-data-factory';
Expand Down Expand Up @@ -51,6 +52,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
return Promise.resolve({
bindingsStream: new ArrayIterator(distinct ? [ bindings[0] ] : bindings),
metadata: () => Promise.resolve({
state: new MetadataValidationState(),
cardinality: { type: 'estimate', value: distinct ? 1 : 3 },
canContainUndefs: false,
variables: vars,
Expand Down Expand Up @@ -107,6 +109,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
context: new ActionContext() };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 1 },
canContainUndefs: false,
variables: [ DF.variable('x') ],
Expand All @@ -125,6 +128,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
context: new ActionContext({ [KeysQueryOperation.isPathArbitraryLengthDistinctKey.name]: false }) };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 1 },
canContainUndefs: false,
variables: [ DF.variable('x') ],
Expand All @@ -143,6 +147,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
context: new ActionContext({ [KeysQueryOperation.isPathArbitraryLengthDistinctKey.name]: true }) };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 3 },
canContainUndefs: false,
variables: [ DF.variable('x') ],
Expand All @@ -164,6 +169,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
context: new ActionContext({ [KeysQueryOperation.isPathArbitraryLengthDistinctKey.name]: true }) };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 3 },
canContainUndefs: false,
variables: [ DF.variable('x') ],
Expand All @@ -185,6 +191,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
context: new ActionContext({ [KeysQueryOperation.isPathArbitraryLengthDistinctKey.name]: true }) };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'estimate', value: 3 },
canContainUndefs: false,
variables: [],
Expand All @@ -203,6 +210,7 @@ describe('ActorQueryOperationPathZeroOrOne', () => {
context: new ActionContext({ [KeysQueryOperation.isPathArbitraryLengthDistinctKey.name]: true }) };
const output = ActorQueryOperation.getSafeBindings(await actor.run(op));
expect(await output.metadata()).toEqual({
state: expect.any(MetadataValidationState),
cardinality: { type: 'exact', value: 1 },
canContainUndefs: false,
variables: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { IActorQueryOperationTypedMediatedArgs } from '@comunica/bus-query-
import { ActorQueryOperation, ActorQueryOperationTypedMediated } from '@comunica/bus-query-operation';
import { KeysInitQuery, KeysRdfResolveQuadPattern } from '@comunica/context-entries';
import type { IActorTest } from '@comunica/core';
import { MetadataValidationState } from '@comunica/metadata';
import type { IActionContext, IQueryOperationResult, IQueryOperationResultBindings } from '@comunica/types';
import { SingletonIterator } from 'asynciterator';
import type { Algebra } from 'sparqlalgebrajs';
Expand Down Expand Up @@ -50,7 +51,12 @@ export class ActorQueryOperationService extends ActorQueryOperationTypedMediated
output = {
bindingsStream: new SingletonIterator(BF.bindings()),
type: 'bindings',
metadata: async() => ({ cardinality: { type: 'exact', value: 1 }, canContainUndefs: false, variables: []}),
metadata: async() => ({
state: new MetadataValidationState(),
cardinality: { type: 'exact', value: 1 },
canContainUndefs: false,
variables: [],
}),
};
} else {
throw error;
Expand Down

0 comments on commit f906548

Please sign in to comment.