Skip to content

Commit

Permalink
Merge pull request #73 from TREEcg/development
Browse files Browse the repository at this point in the history
v4.0.6
  • Loading branch information
julianrojas87 committed May 16, 2023
2 parents ba08ddc + ebd18b3 commit 295022f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 49 deletions.
4 changes: 0 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 45 additions & 43 deletions packages/actor-init-ldes-client/lib/EventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Frame } from "jsonld/jsonld-spec";
import { inspect } from 'util';
import { DataFactory } from 'rdf-data-factory';
import { JsonLdDocument } from "jsonld";
import { Store } from "n3";
import { Quad, Store } from "n3";
import LRU from 'lru-cache';
import * as ContentType from 'content-type';
import * as CachePolicy from 'http-cache-semantics';
Expand All @@ -30,6 +30,8 @@ import { stream2Array, stream2String } from "./Utils";
export class EventStream extends Readable {
protected readonly mediators: IEventStreamMediators;

protected readonly factory: RDF.DataFactory;

protected readonly pollingInterval?: number;
protected readonly representation?: OutputRepresentation;
protected readonly requestHeaders?: { [key: string]: number | string | string[] };
Expand Down Expand Up @@ -91,6 +93,8 @@ export class EventStream extends Readable {
} else {
this.bookkeeper.addFragment(this.accessUrl, 0);
}
this.factory = new DataFactory()!;

}

private async fetchNextPage() {
Expand Down Expand Up @@ -310,9 +314,9 @@ export class EventStream extends Readable {

// Process TREE relations towards other nodes
for (const [_, relation] of treeMetadata.metadata.treeMetadata.relations) {
if (relation.value && this.fromTime && relation["@type"][0] && moment(relation.value[0]["@value"]).isValid()) {
const value = relation.value[0]["@value"];

const value = relation.value[0]["@value"];
if (this.fromTime && relation["@type"][0] && moment(value).isValid()) {
// To be enhanced when more TREE filtering capabilities are available
const valueDate = new Date(value);

Expand Down Expand Up @@ -371,14 +375,20 @@ ${inspect(e)}`);
// Extract response headers
const resHeaders: Array<[string, string]> = [];
res.headers.forEach((v, k) => resHeaders.push([k, v]));

const trailingCharacterRegex = /;$/;
let contentType = <string>res.headers.get('content-type');
if (contentType) {
contentType = contentType.trim().replace(trailingCharacterRegex, ''); // Just removing some clutter
} else {
contentType = "";
}
return <PageMetadata>{
url: res.url,
request: req,
response: { status: res.status, headers: Object.fromEntries(resHeaders) },
statusCode: res.status,
data: StreamReadable.fromWeb(<any>res.body?.pipeThrough(<any>new TextDecoderStream())),
contentType: ContentType.parse(<string>res.headers.get('content-type')).type
contentType: ContentType.parse(contentType).type
};
} catch (err) {
this.logger.error(inspect(err));
Expand Down Expand Up @@ -429,8 +439,13 @@ ${inspect(e)}`);
this.processedURIs.set(memberUri, {});

if (!this.dereferenceMembers) {
const done = new Set(memberUris);
yield this.extractMember(memberUri, store, done);
let processedSubjects: Set <string> = new Set();
const memberQuads = this.extractMember(store, this.factory.namedNode(memberUri), processedSubjects, memberUris);
const memberQuadStream = StreamReadable.from(memberQuads);
yield {
uri: memberUri,
quads: new ArrayIterator(memberQuads)
};
} else {
const quads = new MemberIterator(memberUri, this.rateLimiter);
quads.on('error', (msg, e) => {
Expand Down Expand Up @@ -471,44 +486,31 @@ ${inspect(e)}`);
}
}

protected extractMember(memberUri: string, store: Store, done: Set<string>): IMember {
const queue: string[] = [memberUri];
const result: RDF.Quad[] = [];

while (queue.length > 0) {
const subject = queue.pop();

if (!subject) {
// Type coercion, should never happen
break;
}
const subjectQuads = store.getQuads(subject, null, null, null);

if (!subjectQuads.length) {
// Nothing is known about this resource
continue;
}

for (const quad of subjectQuads) {
result.push(quad);

if (quad.object.termType === 'NamedNode' || quad.object.termType === 'BlankNode') {
if (!done.has(quad.object.value)) {
done.add(quad.object.value);
queue.push(quad.object.value);
}
protected extractMember(store: Store, id: RDF.Term, processedSubjects: Set<string>, memberUris: string[]): Quad[] {
let member: Quad[] = [];
processedSubjects.add(id.value);
const forwardQuads = store.getQuads(id,null,null,null);
//const inverseQuads = store.getQuads(null,null,id,null);
//console.log(id, forwardQuads,inverseQuads);

for (const q of forwardQuads) {
if (!member.includes(q) && !processedSubjects.has(q.object.value)) {
member.push(q);
if (q.object.termType !== 'Literal' && !memberUris.includes(q.object.value)) {
member = member.concat(this.extractMember(store, q.object, processedSubjects, memberUris));
}
}
}

return {
uri: memberUri,
quads: new ArrayIterator(result),
};
/*for (let q of inverseQuads) {
if (q.predicate.value !== 'https://w3id.org/tree#member' && !memberUris.includes(q.subject.value) && !processedSubjects.has(q.subject.value)) {
member.push(q); //also relevant triple that needs to be added
member = member.concat(this.extractMember(store, q.subject, processedSubjects, memberUris));
}
}*/
return member
}

protected async processMembers(members: Generator<IMember>) {
const factory = new DataFactory();

for (const member of members) {
const id = member.uri;
Expand Down Expand Up @@ -544,12 +546,12 @@ ${inspect(e)}`);
// Build an array from the quads iterator
await new Promise<void>((resolve, reject) => {
const quadArray: Array<RDF.Quad> = [];
quadStream.forEach((item) => {
quadStream.on('data', (item) => {
quadArray.push(item);
});
quadStream.on('end', () => {
let _member: Member = {
id: factory.namedNode(member.uri),
id: this.factory.namedNode(member.uri),
quads: quadArray
};
this.push(_member);
Expand Down Expand Up @@ -605,7 +607,7 @@ ${inspect(e)}`);

interface IMember {
uri: string,
quads: RDF.Stream<RDF.Quad> & AsyncIterator<RDF.Quad>,
quads: RDF.Stream<RDF.Quad> //& AsyncIterator<RDF.Quad> & ArrayIterator<Quad>,
}

interface PageMetadata {
Expand Down
2 changes: 0 additions & 2 deletions packages/actor-init-ldes-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
"@comunica/query-sparql": "^2.6.8",
"@comunica/runner": "^2.6.8",
"@comunica/runner-cli": "^2.6.8",
"@treecg/actor-rdf-filter-object-with-framing": "^4.0.0",
"@treecg/actor-rdf-filter-objects-with-quadstore": "^4.0.0",
"@treecg/actor-rdf-frame-with-json-ld-js": "^4.0.0",
"@treecg/actor-rdf-metadata-extract-tree": "^2.0.0",
"@treecg/bus-rdf-filter-object": "^4.0.0",
Expand Down

0 comments on commit 295022f

Please sign in to comment.