Skip to content

Commit

Permalink
Intercept requests via Fetch to be compatible with Comunica 2
Browse files Browse the repository at this point in the history
  • Loading branch information
rubensworks committed Jan 18, 2022
1 parent fd1ad77 commit 409793e
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 103 deletions.
34 changes: 8 additions & 26 deletions bin/Runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import type { IWriteConfig, IInterceptOptions, IQueryResult } from '../lib/IReco
import { QueryExecutor } from '../lib/QueryExecutor';
import { ResultWriter } from '../lib/ResultWriter';

const http = require('http');
const https = require('https');

const usageMessage = `${C.inColor(`ldf-recorder records all http-requests and responses for a specific SPARQL- or TPF- query.
ldf-recorder is based on the comunica SPARQL query engine.`, C.CYAN)}
Expand All @@ -31,27 +28,15 @@ if (args._.length < 2) {
}

// Overwrite request method to intercept the requests
function wrapRequest(scheme: any, defaultSchemeString: string, defaultPort: number): any {
const originalRequest = scheme.request;
scheme.request = function(options: any) {
interceptOptions.push({
headers: options.headers,
method: options.method || 'GET',
path: options.path,
port: options.port || defaultPort,
protocol: options.protocol || defaultSchemeString,
hostname: options.hostname,
query: options.query,
body: options.body,
});
// eslint-disable-next-line prefer-rest-params
return originalRequest.apply(options, arguments);
function wrapFetch(originalFetch: typeof fetch): typeof fetch {
return (input, init) => {
if (typeof input !== 'string') {
throw new Error(`Unsupported non-string fetch input: ${JSON.stringify(input)}`);
}
interceptOptions.push({ input, init });
return originalFetch(input, init);
};
// eslint-disable-next-line no-return-assign
return () => scheme.request = originalRequest;
}
const unwrapHttp = wrapRequest(http, 'http:', 80);
const unwrapHttps = wrapRequest(https, 'https:', 443);

// The configuration used for the interceptor
const writeConfig: IWriteConfig = {
Expand All @@ -76,10 +61,7 @@ while (args._.length) {
// Every request's options will be stored in interceptOptions
const interceptOptions: IInterceptOptions[] = [];
const queryExecutor: QueryExecutor = new QueryExecutor();
queryExecutor.runQuery(query, dataSources).then(async(results: IQueryResult) => {
// Undo overwriting of http.request
unwrapHttp();
unwrapHttps();
queryExecutor.runQuery(query, dataSources, wrapFetch(fetch)).then(async(results: IQueryResult) => {
// Intercept and record all requests
const interceptor: HttpInterceptor = new HttpInterceptor(writeConfig);
for (const interceptOption of interceptOptions) {
Expand Down
71 changes: 28 additions & 43 deletions lib/HttpInterceptor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import * as crypto from 'crypto';
import * as fs from 'fs';
import type { ClientRequest, IncomingMessage } from 'http';
import * as http from 'http';
import * as https from 'https';
import * as Path from 'path';
import type { Readable } from 'stream';
import type { IInterceptOptions, IMockedFile, IWriteConfig } from './IRecorder';
const stringifyStream = require('stream-to-string');

/**
* A class for intercepting and recording the HTTP-response body of a TPF-request
Expand All @@ -21,45 +20,31 @@ export class HttpInterceptor {
* @param interceptOptions
*/
public async interceptResponse(interceptOptions: IInterceptOptions): Promise<void> {
return new Promise(async(resolve, reject) => {
const res: ClientRequest = (interceptOptions.protocol === 'http:' ? http : https).request(interceptOptions);
let body = '';
res.on('error', reject);
res.on('response', (incoming: IncomingMessage) => {
incoming.setEncoding('utf8');
const headers = incoming.headers;
incoming.on('error', reject);
incoming.on('data', (chunk: any) => {
if (typeof chunk !== 'string') {
throw new Error(`Chunk must be of type string, not of type: ${typeof chunk}`);
}
body += chunk;
});
incoming.on('end', async() => {
// Decode to get the pure URI
let requestIRI = `${interceptOptions.protocol}//${interceptOptions.hostname}${interceptOptions.path}`;
if (interceptOptions.method === 'POST') {
requestIRI += `@@POST:${interceptOptions.body.toString()}`;
}
const filename = crypto.createHash('sha1')
.update(decodeURIComponent(requestIRI))
.digest('hex');
const fileConfig: IMockedFile = {
body,
filename,
hashedIRI: requestIRI,
headers,
query: interceptOptions.query,
};
this.writeToFile(fileConfig);
resolve();
});
});
if (interceptOptions.method === 'POST') {
res.write(interceptOptions.body.toString());
}
res.end();
});
// Execute request
const response = await fetch(interceptOptions.input, interceptOptions.init);

// Determine request URL
let requestIRI = interceptOptions.input;
if (interceptOptions.init?.method === 'POST') {
// eslint-disable-next-line @typescript-eslint/no-base-to-string
requestIRI += `@@POST:${interceptOptions.init.body.toString()}`;
}

// Collect response body
const body = await stringifyStream(<Readable> <any> response.body);

// Save response to file
const filename = crypto.createHash('sha1')
.update(decodeURIComponent(requestIRI))
.digest('hex');
const fileConfig: IMockedFile = {
body,
filename,
hashedIRI: requestIRI,
headers: response.headers,
query: requestIRI.includes('?') ? requestIRI.slice(requestIRI.indexOf('?') + 1) : 'null',
};
this.writeToFile(fileConfig);
}

/**
Expand All @@ -85,7 +70,7 @@ export class HttpInterceptor {
private getHeaderLines(config: IMockedFile): string {
return `# Query: ${config.query}
# Hashed IRI: ${config.hashedIRI}
# Content-type: ${config.headers['content-type']}
# Content-type: ${config.headers.get('content-type')}
`;
}
}
16 changes: 5 additions & 11 deletions lib/IRecorder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type * as http from 'http';
import type { Bindings } from '@comunica/bus-query-operation';
import type { Quad } from 'rdf-js';
import type { QueryType } from './QueryExecutor';
Expand All @@ -7,14 +6,8 @@ import type { QueryType } from './QueryExecutor';
* All the options necessary for intercepting and recording the response
*/
export interface IInterceptOptions {
headers: any;
method: string;
path: string;
port: number;
protocol: string;
hostname: string;
query: string;
body?: URLSearchParams;
input: string;
init?: RequestInit;
}

/**
Expand All @@ -29,11 +22,11 @@ export interface IWriteConfig {
* The information concerning a mocked file
*/
export interface IMockedFile {
query: string;
body: string;
filename: string;
hashedIRI: string;
headers: http.IncomingHttpHeaders;
query: string;
headers: Headers;
}

/**
Expand All @@ -50,4 +43,5 @@ export interface IQuerySource {
export interface IQueryResult {
type: QueryType;
value: Bindings[] | boolean | Quad[];
variables?: string[];
}
16 changes: 10 additions & 6 deletions lib/QueryExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,36 @@ export class QueryExecutor {
* it can be recorded and saved too
* @param queryString The SPARQL-query string
* @param tpfSources A list of remote TPF endpoints
* @param fetcher The fetch function to use.
*/
public async runQuery(queryString: string, tpfSources: string[]): Promise<IQueryResult> {
public async runQuery(queryString: string, tpfSources: string[], fetcher: typeof fetch): Promise<IQueryResult> {
const queryType: QueryType = this.getQueryType(queryString);
const querySources: IQuerySource[] = this.mapSources(tpfSources);
const context = { sources: querySources, fetch: fetcher };
return new Promise(async(resolve, reject) => {
switch (queryType) {
case QueryType.SELECT:
const rss: Bindings[] = [];
const rs = await this.myEngine.query(queryString, { sources: querySources });
const rs = await this.myEngine.query(queryString, context);
await rs.bindingsStream.on('data', (data: Bindings) => {
rss.push(data);
});
rs.bindingsStream.on('error', reject);
await rs.bindingsStream.on('end', async() => {
resolve({ type: QueryType.SELECT, value: rss });
resolve({ type: QueryType.SELECT, value: rss, variables: rs.variables });
});
break;
case QueryType.ASK:
const ra = await this.myEngine.query(queryString, { sources: querySources });
const ra = await this.myEngine.query(queryString, context);
resolve({ type: QueryType.ASK, value: await ra.booleanResult });
break;
case QueryType.CONSTRUCT:
const rsc: Quad[] = [];
const rc = await this.myEngine.query(queryString, { sources: querySources });
const rc = await this.myEngine.query(queryString, context);
await rc.quadStream.on('data', (data: Quad) => {
rsc.push(data);
});
rc.quadStream.on('error', reject);
await rc.quadStream.on('end', async() => {
resolve({ type: QueryType.CONSTRUCT, value: rsc });
});
Expand All @@ -61,7 +65,7 @@ export class QueryExecutor {
private getQueryType(queryString: string): QueryType {
let content = queryString.split('\n');
let fln = content[0];
while (fln.startsWith('PREFIX') || fln.trim() === '') {
while (fln.startsWith('PREFIX') || fln.startsWith('prefix') || fln.trim() === '') {
content = content.slice(1);
fln = content[0];
}
Expand Down
11 changes: 4 additions & 7 deletions lib/ResultWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class ResultWriter {
this.getResultString(results),
(err: any) => {
if (err) {
throw new Error(`in writeResultsToFile: could not write TPF-query result to file: result.srj`);
reject(new Error(`in writeResultsToFile: could not write TPF-query result to file: result.srj`));
}
resolve();
// Else: ok
Expand Down Expand Up @@ -55,7 +55,7 @@ export class ResultWriter {
private getResultString(results: IQueryResult): string {
switch (results.type) {
case QueryType.SELECT:
return this.bindingsToSparqlJsonResult(<Bindings[]> results.value);
return this.bindingsToSparqlJsonResult(<Bindings[]> results.value, results.variables);
case QueryType.ASK:
return this.booleanToSparqlJsonResult(<boolean> results.value);
case QueryType.CONSTRUCT:
Expand Down Expand Up @@ -83,12 +83,9 @@ export class ResultWriter {
* Transform the bindings to the SPARQLJsonResult format used for testing
* @param bindings The bindings returned from the query-engine
*/
private bindingsToSparqlJsonResult(bindings: Bindings[]): string {
private bindingsToSparqlJsonResult(bindings: Bindings[], variables: string[]): string {
const head: any = {};
head.vars = [];
if (bindings.length > 0 && bindings[0].size) {
bindings[0].keySeq().forEach(key => head.vars.push(key.slice(1)));
}
head.vars = variables.map(key => key.slice(1));

const results: any = {};
results.bindings = [];
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
"@types/nock": "^10.0.3",
"@types/node": "^17.0.9",
"minimist": "^1.2.0",
"nock": "^10.0.6"
"nock": "^10.0.6",
"node-fetch": "^2.6.7",
"stream-to-string": "^1.2.0"
},
"devDependencies": {
"@comunica/actor-sparql-serialize-sparql-json": "^1.22.0",
Expand Down
11 changes: 7 additions & 4 deletions test/HttpInterceptor-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import * as fs from 'fs';
import * as Path from 'path';
import * as fse from 'fs-extra';
import * as nock from 'nock';
import fetch from 'node-fetch';
import { HttpInterceptor } from '../lib/HttpInterceptor';

global.fetch = fetch;

describe('HttpInterceptor', () => {
beforeEach(() => {
if (!fs.existsSync(jestTestFolder)) {
Expand All @@ -29,14 +32,14 @@ describe('HttpInterceptor', () => {
:_ a :test`);

await interceptor.interceptResponse({
headers: '', method: 'GET', path: '/path/', port: undefined, protocol: 'http:', hostname: 'ex.org', query: '',
input: 'http://ex.org/path/',
}).then(() => {
fs.readdir(jestTestFolder, (error, files) => {
const filename: string = crypto.createHash('sha1')
.update(fn)
.digest('hex');
const fileContent: string = fs.readFileSync(Path.join(jestTestFolder, filename), 'utf8');
expect(fileContent.startsWith(`# Query: ${''}
expect(fileContent.startsWith(`# Query: null
# Hashed IRI: ${fn}
# Content-type: ${ct}`)).toBeTruthy();
fse.removeSync(Path.join(jestTestFolder, filename));
Expand All @@ -53,7 +56,7 @@ describe('HttpInterceptor', () => {
:_ a :test`);

return expect(interceptor.interceptResponse({
headers: '', method: 'GET', path: '/path/', port: undefined, protocol: 'http:', hostname: 'ex.org', query: '',
input: 'http://ex.org/path/',
})).resolves.toBeUndefined();
});

Expand All @@ -64,7 +67,7 @@ describe('HttpInterceptor', () => {
:_ a :test`);

return expect(interceptor.interceptResponse({
headers: '', method: 'GET', path: '/path/', port: undefined, protocol: 'http:', hostname: 'ex.org', query: '',
input: 'http://ex.org/path/',
})).resolves.toBeUndefined();
});

Expand Down
8 changes: 8 additions & 0 deletions test/QueryExecutor-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,39 @@ describe('QueryExecutor', () => {
return expect(queryExecutor.runQuery(
'SELECT * WHERE { ?s ?p <http://dbpedia.org/resource/Belgium>. ?s ?p ?o } LIMIT 5',
[ 'TPF@http://fragments.dbpedia.org/2015/en' ],
fetch,
)).resolves.toBeTruthy();
});

it('should resolve ASK', () => {
return expect(queryExecutor.runQuery(
'ASK { ?s ?p <http://dbpedia.org/resource/Belgium>. }',
[ 'TPF@http://fragments.dbpedia.org/2015/en' ],
fetch,
)).resolves.toBeTruthy();
});

it('should resolve CONSTRUCT', () => {
return expect(queryExecutor.runQuery(
'CONSTRUCT WHERE { ?s ?p ?o. } LIMIT 5',
[ 'TPF@http://fragments.dbpedia.org/2015/en' ],
fetch,
)).resolves.toBeTruthy();
});

it('should error on undefined CLI input', () => {
return expect(queryExecutor.runQuery(
'CONSTRUCT WHERE { ?s ?p ?o. } LIMIT 5',
[ 'UNKNOWN@http://fragments.dbpedia.org/2015/en' ],
fetch,
)).rejects.toBeTruthy();
});

it('should error on undefined query-type', () => {
return expect(queryExecutor.runQuery(
'UNKNOWN WHERE { ?s ?p ?o. } LIMIT 5',
[ 'UNKNOWN@http://fragments.dbpedia.org/2015/en' ],
fetch,
)).rejects.toBeTruthy();
});

Expand All @@ -52,20 +57,23 @@ describe('QueryExecutor', () => {
`PREFIX ola: <http://ex.org>
CONSTRUCT WHERE { ?s ?p ?o. } LIMIT 5`,
[ 'TPF@http://fragments.dbpedia.org/2015/en' ],
fetch,
)).resolves.toBeTruthy();
});

it('should work with FILE', () => {
return expect(queryExecutor.runQuery(
`CONSTRUCT WHERE { ?s ?p ?o. } LIMIT 5`,
[ 'FILE@https://ruben.verborgh.org/profile/#me' ],
fetch,
)).resolves.toBeTruthy();
});

it('should work with SPARQL', () => {
return expect(queryExecutor.runQuery(
`SELECT * WHERE { ?s ?p <http://dbpedia.org/resource/Belgium>. ?s ?p ?o } LIMIT 5`,
[ 'SPARQL@https://dbpedia.org/sparql' ],
fetch,
)).resolves.toBeTruthy();
});
});
Expand Down

0 comments on commit 409793e

Please sign in to comment.