Skip to content

Commit

Permalink
Merge 85ea389 into b50d42e
Browse files Browse the repository at this point in the history
  • Loading branch information
surilindur committed Aug 5, 2024
2 parents b50d42e + 85ea389 commit dfcf3f7
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 521 deletions.
25 changes: 14 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,12 @@ Options:

#### HDT Quad Sink

A quad sink that writes to files using an IRI to local file system path mapping and then converts the files into an [HDT document](https://www.rdfhdt.org/what-is-hdt/).
The implementation uses the [docker](https://www.docker.com/) image [HDT-Docker](https://github.com/rdfhdt/hdt-docker) of the [hdt-cpp](https://github.com/rdfhdt/hdt-cpp) library.
The docker operations to acquire the image and execute the transformations into HDT are performed by the sink.
A quad sink that writes to files using an IRI to local file system path mapping,
and then converts the files into [HDT](https://www.rdfhdt.org/what-is-hdt/) documents
using the Docker image of [rdfhdt/hdt-cpp](https://github.com/rdfhdt/hdt-cpp).

**WARNING: Can be very slow for many files**
The conversion to HDT is done file by file, which can be really slow.
The conversion concurrency option can be adjusted to increase the number of simultaneously converted file.

```json
{
Expand All @@ -323,13 +324,14 @@ The docker operations to acquire the image and execute the transformations into
"log": true,
"outputFormat": "application/n-quads",
"fileExtension": "$.nq",
"generateIndexes": false,
"conversionProgress": false,
"removeSourceFiles": true,
"conversionConcurrency": 1,
"iriToPath": {
"http://example.org/base/": "output/base/",
"http://example.org/other/": "output/other/"
},
"poolSize": 1,
"deleteSourceFiles": false,
"errorFileDockerRfdhdt": "./error_log_docker_rfdhdt.txt"
}
}
}
```
Expand All @@ -339,9 +341,10 @@ Options:
* `"outputFormat"`: The desired output serialization. (Only `"application/n-quads"` is considered stable at the moment).
* `"fileExtension"`: An optional extension to add to resulting files.
* `"iriToPath"`: A collection of mappings that indicate what URL patterns should be translated into what folder structure.
* `"poolSize"`: The number of concurrent HDT conversion operations. By the default `1`.
* `"deleteSourceFiles"`: If the sink should delete the source RDF file after the conversion into HDT.
* `"errorFileDockerRfdhdt"`: File where the error of HDT-Docker will be outputed. By default `"./error_log_docker_rfdhdt.txt"`.
* `"generateIndexes"`: Whether to generate indexes alongside the HDT file. Toggles the `-i` flag of `rdf2hdt`.
* `"conversionProgress"`: Whether to show HDT generation statistics. Toggles the `-p` flag of `rdf2hdt`.
* `"removeSourceFiles"`: Whether to remove the RDF source files after they have been converted to HDT.
* `"conversionConcurrency"`: The maximum number of concurrent HDT conversions to execute.

#### Composite Quad Sink

Expand Down
2 changes: 1 addition & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ export * from './io/QuadSinkComposite';
export * from './io/QuadSinkCsv';
export * from './io/QuadSinkFile';
export * from './io/QuadSinkFiltered';
export * from './io/QuadSinkHdt';
export * from './io/QuadSourceComposite';
export * from './io/QuadSourceFile';
export * from './io/QuadSinkHdt';
export * from './quadmatcher/IQuadMatcher';
export * from './quadmatcher/QuadMatcherPredicate';
export * from './quadmatcher/QuadMatcherResourceType';
Expand Down
165 changes: 100 additions & 65 deletions lib/io/QuadSinkHdt.ts
Original file line number Diff line number Diff line change
@@ -1,85 +1,120 @@
import type { WriteStream } from 'node:fs';
import { createWriteStream } from 'node:fs';
import * as fs from 'node:fs/promises';
import * as Path from 'node:path';
import * as readline from 'node:readline';
import type * as RDF from '@rdfjs/types';
import { unlink } from 'node:fs/promises';
import { basename, resolve, dirname } from 'node:path';
import * as Docker from 'dockerode';
import type { IQuadSinkFileOptions } from './QuadSinkFile';
import { QuadSinkFile } from './QuadSinkFile';
import { transformToHdt, pullHdtCppDockerImage } from './rfdhdtDockerUtil';
import { QuadSinkFile, type IQuadSinkFileOptions } from './QuadSinkFile';

const HDTCPP_MOUNT_PATH = '/tmp/convert';
const HDTCPP_DOCKER_IMAGE = 'rdfhdt/hdt-cpp:latest';
const HDTCPP_FORMATS = new Map<string, string>([
[ 'application/n-quads', 'nquad' ],
[ 'application/n-triples', 'ntriples' ],
[ 'text/turtle', 'turtle' ],
[ 'application/rdf+xml', 'rdfxml' ],
[ 'text/n3', 'n3' ],
]);

export class QuadSinkHdt extends QuadSinkFile {
private readonly files: Set<string> = new Set();
private readonly deleteSourceFiles: boolean;
private readonly errorFileDockerRfdhdt: WriteStream;
private readonly poolSize: number;
private readonly generateIndexes: boolean;
private readonly conversionProgress: boolean;
private readonly removeSourceFiles: boolean;
private readonly conversionConcurrency: number;

private readonly docker: Docker;
private readonly filesToConvert: Set<string>;

public constructor(
options: IQuadSinkFileOptions,
poolSize = 1,
deleteSourceFiles = false,
errorFileDockerRfdhdt = './error_log_docker_rfdhdt.txt',
) {
public constructor(options: IQuadSinkHdtOptions) {
super(options);
this.deleteSourceFiles = deleteSourceFiles;
this.errorFileDockerRfdhdt = createWriteStream(errorFileDockerRfdhdt);
this.poolSize = poolSize;
if (!HDTCPP_FORMATS.has(options.outputFormat)) {
throw new Error(`Unsupported HDT output format ${options.outputFormat}`);
}
this.generateIndexes = options.generateIndexes ?? false;
this.conversionProgress = options.conversionProgress ?? false;
this.removeSourceFiles = options.removeSourceFiles ?? true;
this.conversionConcurrency = options.conversionConcurrency ?? 1;
this.docker = new Docker();
this.filesToConvert = new Set();
}

public async push(iri: string, quad: RDF.Quad): Promise<void> {
const path = Path.join('./', this.getFilePath(iri));
await super.push(iri, quad);
protected getFilePath(iri: string): string {
const path = super.getFilePath(iri);
this.filesToConvert.add(resolve(path));
return path;
}

protected async pullDockerImage(): Promise<void> {
await new Promise<void>((resolve, reject) => this.docker.pull(HDTCPP_DOCKER_IMAGE, {}, (error, result) => {
if (error) {
reject(error);
}
this.docker.modem.followProgress(result!, error => error ? reject(error) : resolve());
}));
}

// Add files with the defined extension to the list to be transformed
if (this.fileExtension !== undefined && path.includes(this.fileExtension)) {
this.files.add(path);
protected async convertSingleFile(rdfFilePath: string): Promise<void> {
const cmd = [
'rdf2hdt',
...this.generateIndexes ? [ '-i' ] : [],
...this.conversionProgress ? [ '-p' ] : [],
`${HDTCPP_MOUNT_PATH}/${basename(rdfFilePath)}`,
`${HDTCPP_MOUNT_PATH}/${basename(rdfFilePath).replace(this.fileExtension ?? '', '')}.hdt`,
];
const createOptions: Docker.ContainerCreateOptions = {
// eslint-disable-next-line ts/naming-convention
HostConfig: {
// eslint-disable-next-line ts/naming-convention
AutoRemove: true,
// eslint-disable-next-line ts/naming-convention
Binds: [ `${dirname(rdfFilePath)}:${HDTCPP_MOUNT_PATH}:rw` ],
},
};
await this.docker.run(HDTCPP_DOCKER_IMAGE, cmd, process.stdout, createOptions);
if (this.removeSourceFiles) {
await unlink(rdfFilePath);
}
}

/**
* Log the number of files transformed into HDT
* @param {number} counter - counter of the number of files transformed
* @param {boolean} newLine - add a new line after the logging
*/
private attemptLogHdtTransformation(counter: number, newLine = false): void {
if (this.log) {
readline.clearLine(process.stdout, 0);
readline.cursorTo(process.stdout, 0);
process.stdout.write(`\rfiles transformed to HDT:${counter} out of ${this.files.size}`);
if (newLine) {
process.stdout.write(`\n`);
protected async convertToHdt(): Promise<void> {
await this.pullDockerImage();
const conversionPromisePool = new Set<Promise<void>>();
let convertedFileCount = 0;
for (const rdfFilePath of this.filesToConvert) {
convertedFileCount++;
conversionPromisePool.add(this.convertSingleFile(rdfFilePath));
if (
conversionPromisePool.size === this.conversionConcurrency ||
convertedFileCount === this.filesToConvert.size
) {
await Promise.all(conversionPromisePool);
conversionPromisePool.clear();
if (this.log) {
process.stdout.write(`\rConverted files: ${convertedFileCount}/${this.filesToConvert.size}${this.conversionProgress ? '\n' : ''}`);
}
}
}
}

public async close(): Promise<void> {
// Close the streaming of files
await super.close();
const docker: Docker = new Docker();
// Pull the docker image if it is not available in the system
await pullHdtCppDockerImage(docker);
const operationPool: Map<string, Promise<string>> = new Map();
let i = 0;

for (const file of this.files) {
operationPool.set(file, this.transformToHdt(docker, file));
if (i % this.poolSize === 0) {
this.attemptLogHdtTransformation(i);
const winnerFile = await Promise.race(operationPool.values());
operationPool.delete(winnerFile);
}
i++;
}
await Promise.all(operationPool.values());
this.attemptLogHdtTransformation(i, true);
await this.convertToHdt();
}
}

private async transformToHdt(docker: Docker, file: string): Promise<string> {
await transformToHdt(docker, file, this.errorFileDockerRfdhdt);
if (this.deleteSourceFiles) {
await fs.rm(file);
}
return file;
}
export interface IQuadSinkHdtOptions extends IQuadSinkFileOptions {
/**
* Whether to generate indexes using rdf2hdt
*/
generateIndexes?: boolean;
/**
* Whether to show the conversion progress from rdf2hdt
*/
conversionProgress?: boolean;
/**
* Whether to clean up the RDF files after HDT generation
*/
removeSourceFiles?: boolean;
/**
* The level of parallelism in the conversion, which will affect the number of
* concurrently running Docker containers performing the conversion
*/
conversionConcurrency?: number;
}
105 changes: 0 additions & 105 deletions lib/io/rfdhdtDockerUtil.ts

This file was deleted.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"async-lock": "^1.4.0",
"bloem": "^0.2.0",
"componentsjs": "^6.0.0",
"dockerode": "^4.0.2",
"dockerode": "^4.0.0",
"lru-cache": "^10.3.0",
"mkdirp": "^3.0.1",
"rdf-parse": "^3.0.0",
Expand All @@ -61,7 +61,7 @@
},
"devDependencies": {
"@rubensworks/eslint-config": "^3.0.0",
"@types/dockerode": "^3.3.29",
"@types/dockerode": "^3.3.0",
"@types/jest": "^29.0.0",
"arrayify-stream": "^2.0.0",
"componentsjs-generator": "^4.0.0",
Expand Down
Loading

0 comments on commit dfcf3f7

Please sign in to comment.