-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
344 additions
and
521 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,85 +1,123 @@ | ||
import type { WriteStream } from 'node:fs'; | ||
import { createWriteStream } from 'node:fs'; | ||
import * as fs from 'node:fs/promises'; | ||
import * as Path from 'node:path'; | ||
import * as readline from 'node:readline'; | ||
import type * as RDF from '@rdfjs/types'; | ||
import { unlink } from 'node:fs/promises'; | ||
import { basename, resolve, dirname } from 'node:path'; | ||
import * as Docker from 'dockerode'; | ||
import type { IQuadSinkFileOptions } from './QuadSinkFile'; | ||
import { QuadSinkFile } from './QuadSinkFile'; | ||
import { transformToHdt, pullHdtCppDockerImage } from './rfdhdtDockerUtil'; | ||
import { QuadSinkFile, type IQuadSinkFileOptions } from './QuadSinkFile'; | ||
|
||
const HDTCPP_MOUNT_PATH = '/tmp/convert'; | ||
const HDTCPP_DOCKER_IMAGE = 'rdfhdt/hdt-cpp:latest'; | ||
const HDTCPP_FORMATS = new Map<string, string>([ | ||
[ 'application/n-quads', 'nquad' ], | ||
[ 'application/n-triples', 'ntriples' ], | ||
[ 'text/turtle', 'turtle' ], | ||
[ 'application/rdf+xml', 'rdfxml' ], | ||
[ 'text/n3', 'n3' ], | ||
]); | ||
|
||
export class QuadSinkHdt extends QuadSinkFile { | ||
private readonly files: Set<string> = new Set(); | ||
private readonly deleteSourceFiles: boolean; | ||
private readonly errorFileDockerRfdhdt: WriteStream; | ||
private readonly poolSize: number; | ||
private readonly generateIndexes: boolean; | ||
private readonly conversionProgress: boolean; | ||
private readonly removeSourceFiles: boolean; | ||
private readonly conversionConcurrency: number; | ||
|
||
private readonly docker: Docker; | ||
private readonly filesToConvert: Set<string>; | ||
|
||
public constructor( | ||
options: IQuadSinkFileOptions, | ||
poolSize = 1, | ||
deleteSourceFiles = false, | ||
errorFileDockerRfdhdt = './error_log_docker_rfdhdt.txt', | ||
) { | ||
public constructor(options: IQuadSinkHdtOptions) { | ||
super(options); | ||
this.deleteSourceFiles = deleteSourceFiles; | ||
this.errorFileDockerRfdhdt = createWriteStream(errorFileDockerRfdhdt); | ||
this.poolSize = poolSize; | ||
if (!HDTCPP_FORMATS.has(options.outputFormat)) { | ||
throw new Error(`Unsupported HDT output format ${options.outputFormat}`); | ||
} | ||
this.generateIndexes = options.generateIndexes ?? false; | ||
this.conversionProgress = options.conversionProgress ?? false; | ||
this.removeSourceFiles = options.removeSourceFiles ?? true; | ||
this.conversionConcurrency = options.conversionConcurrency ?? 1; | ||
this.docker = new Docker(); | ||
this.filesToConvert = new Set(); | ||
} | ||
|
||
public async push(iri: string, quad: RDF.Quad): Promise<void> { | ||
const path = Path.join('./', this.getFilePath(iri)); | ||
await super.push(iri, quad); | ||
protected getFilePath(iri: string): string { | ||
const path = super.getFilePath(iri); | ||
this.filesToConvert.add(resolve(path)); | ||
return path; | ||
} | ||
|
||
protected async pullDockerImage(): Promise<void> { | ||
await new Promise<void>((resolve, reject) => this.docker.pull(HDTCPP_DOCKER_IMAGE, {}, (error, result) => { | ||
if (error) { | ||
reject(error); | ||
} | ||
this.docker.modem.followProgress(result!, error => error ? reject(error) : resolve()); | ||
})); | ||
} | ||
|
||
// Add files with the defined extension to the list to be transformed | ||
if (this.fileExtension !== undefined && path.includes(this.fileExtension)) { | ||
this.files.add(path); | ||
protected async convertSingleFile(rdfFilePath: string): Promise<void> { | ||
const cmd = [ | ||
'rdf2hdt', | ||
...this.generateIndexes ? [ '-i' ] : [], | ||
...this.conversionProgress ? [ '-p' ] : [], | ||
`${HDTCPP_MOUNT_PATH}/${basename(rdfFilePath)}`, | ||
`${HDTCPP_MOUNT_PATH}/${basename(rdfFilePath).replace(this.fileExtension ?? '', '')}.hdt`, | ||
]; | ||
const createOptions: Docker.ContainerCreateOptions = { | ||
// eslint-disable-next-line ts/naming-convention | ||
HostConfig: { | ||
// eslint-disable-next-line ts/naming-convention | ||
AutoRemove: true, | ||
// eslint-disable-next-line ts/naming-convention | ||
Binds: [ `${dirname(rdfFilePath)}:${HDTCPP_MOUNT_PATH}:rw` ], | ||
}, | ||
}; | ||
await this.docker.run(HDTCPP_DOCKER_IMAGE, cmd, process.stdout, createOptions); | ||
if (this.removeSourceFiles) { | ||
await unlink(rdfFilePath); | ||
} | ||
} | ||
|
||
/** | ||
* Log the number of files transformed into HDT | ||
* @param {number} counter - counter of the number of files transformed | ||
* @param {boolean} newLine - add a new line after the logging | ||
*/ | ||
private attemptLogHdtTransformation(counter: number, newLine = false): void { | ||
if (this.log) { | ||
readline.clearLine(process.stdout, 0); | ||
readline.cursorTo(process.stdout, 0); | ||
process.stdout.write(`\rfiles transformed to HDT:${counter} out of ${this.files.size}`); | ||
if (newLine) { | ||
process.stdout.write(`\n`); | ||
protected async convertToHdt(): Promise<void> { | ||
await this.pullDockerImage(); | ||
const conversionPromisePool = new Set<Promise<void>>(); | ||
let convertedFileCount = 0; | ||
for (const rdfFilePath of this.filesToConvert) { | ||
convertedFileCount++; | ||
conversionPromisePool.add(this.convertSingleFile(rdfFilePath)); | ||
if ( | ||
conversionPromisePool.size === this.conversionConcurrency || | ||
convertedFileCount === this.filesToConvert.size | ||
) { | ||
await Promise.all(conversionPromisePool); | ||
conversionPromisePool.clear(); | ||
if (this.log) { | ||
process.stdout.write(`\rConverted files: ${convertedFileCount}/${this.filesToConvert.size}${this.conversionProgress ? '\n' : ''}`); | ||
} | ||
} | ||
} | ||
if (!this.conversionProgress) { | ||
process.stdout.write('\n'); | ||
} | ||
} | ||
|
||
public async close(): Promise<void> { | ||
// Close the streaming of files | ||
await super.close(); | ||
const docker: Docker = new Docker(); | ||
// Pull the docker image if it is not available in the system | ||
await pullHdtCppDockerImage(docker); | ||
const operationPool: Map<string, Promise<string>> = new Map(); | ||
let i = 0; | ||
|
||
for (const file of this.files) { | ||
operationPool.set(file, this.transformToHdt(docker, file)); | ||
if (i % this.poolSize === 0) { | ||
this.attemptLogHdtTransformation(i); | ||
const winnerFile = await Promise.race(operationPool.values()); | ||
operationPool.delete(winnerFile); | ||
} | ||
i++; | ||
} | ||
await Promise.all(operationPool.values()); | ||
this.attemptLogHdtTransformation(i, true); | ||
await this.convertToHdt(); | ||
} | ||
} | ||
|
||
private async transformToHdt(docker: Docker, file: string): Promise<string> { | ||
await transformToHdt(docker, file, this.errorFileDockerRfdhdt); | ||
if (this.deleteSourceFiles) { | ||
await fs.rm(file); | ||
} | ||
return file; | ||
} | ||
export interface IQuadSinkHdtOptions extends IQuadSinkFileOptions { | ||
/** | ||
* Whether to generate indexes using rdf2hdt | ||
*/ | ||
generateIndexes?: boolean; | ||
/** | ||
* Whether to show the conversion progress from rdf2hdt | ||
*/ | ||
conversionProgress?: boolean; | ||
/** | ||
* Whether to clean up the RDF files after HDT generation | ||
*/ | ||
removeSourceFiles?: boolean; | ||
/** | ||
* The level of parallelism in the conversion, which will affect the number of | ||
* concurrently running Docker containers performing the conversion | ||
*/ | ||
conversionConcurrency?: number; | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.