Skip to content
This repository has been archived by the owner on Jun 13, 2023. It is now read-only.

fix(neo4j): handle calls to remote neo4j server #570

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 110 additions & 161 deletions src/events/neo4j.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ const DEFAULT_HOST = 'neo4j';


/**
* Extracts the relevant arguments from provided arguments
* @returns {Object} Object of the extracted arguments
*/
* Extracts the relevant arguments from provided arguments
* @returns {Object} Object of the extracted arguments
*/
function getArgsFromFunction(...args) {
let relevantArgs = {};
switch (args[args.length - 1]) {
Expand All @@ -30,13 +30,15 @@ function getArgsFromFunction(...args) {
query: args[0],
params: args[1],
transactionConfig: args[2],
operation: 'session',
};
break;

case 'Transaction':
relevantArgs = {
query: args[0],
params: args[1],
operation: 'transaction',
};
break;
default:
Expand All @@ -48,10 +50,10 @@ function getArgsFromFunction(...args) {


/**
* Gets session's address info from Transaction/Session instance
* @param {neo4j.Session|neo4j.Transaction} session A DB session (could be Session of Transaction)
* @returns {Object} json with address info { host, port }
*/
* Gets session's address info from Transaction/Session instance
* @param {neo4j.Session|neo4j.Transaction} session A DB session (could be Session of Transaction)
* @returns {Object} json with address info { host, port }
*/
function getAddressInfo(session) {
let port = DEFAULT_PORT;
let host = DEFAULT_HOST;
Expand All @@ -75,8 +77,10 @@ function getAddressInfo(session) {

if (connectionHolder && connectionHolder._connectionProvider) {
const connectionProvider = connectionHolder._connectionProvider;
port = connectionProvider._address.port();
host = connectionProvider._address.host();
port = connectionProvider._address === undefined ? connectionProvider._seedRouter._port :
connectionProvider._address.port();
host = connectionProvider._address === undefined ? connectionProvider._seedRouter._host :
connectionProvider._address.host();
}

return {
Expand Down Expand Up @@ -145,85 +149,6 @@ function getNeo4JSessionMetadata(session) {
}


/**
* Create new patched methods for all relevant Observable methods and return
* @param {neo4j.session.Session | neo4j.transaction.Transaction} session A session
* with Neo4j server
* @param {Object} observer An observer instance to patch
* @param {IArguments} sessionArguments The arguments of the observer
* @param {serverlessEvent.Event} dbApiEvent The event of the current run
* @param {number} startTime The Event start time
* @returns {Object} All the patched methods
*/
function getPatchedSubscribeMethods(
session,
observer,
sessionArguments,
dbApiEvent,
startTime
) {
const records = [];

return {
...observer,
// eslint-disable-next-line require-jsdoc
onNext(record) {
records.push(record);
return observer.onNext.apply(session, sessionArguments);
},

// eslint-disable-next-line require-jsdoc
onCompleted(summary) {
switch (summary.queryType) {
case 's':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Schema, Write' }
);
break;

case 'r':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read' }
);
break;

case 'w':
eventInterface.addToMetadata(
dbApiEvent,
{ operation_executed: 'Write' },
{ write_stats: summary.counters._stats }
);
break;
case 'rw':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read, Write' },
{ write_stats: summary.counters._stats }
);
break;

default:
utils.debugLog(`Unkown query type: ${summary.queryType}`);
break;
}

dbApiEvent.setDuration(utils.createDurationTimestamp(startTime));
tracer.addEvent(dbApiEvent);
return observer.onCompleted.apply(session, sessionArguments);
},

// eslint-disable-next-line require-jsdoc
onError(err) {
eventInterface.setException(dbApiEvent, err);
// eslint-disable-next-line prefer-rest-params
return observer.onError.apply(session, sessionArguments);
},
};
}


/**
* Create new serverlessEvent for Neo4J run event (any type of run) and
* add relevant metadata
Expand Down Expand Up @@ -264,97 +189,121 @@ function createNewNeo4jEvent(session, startTime) {


/**
* Wraps neo4j.Session.run functions with tracing
* Wraps neo4j.Transaction.run and neo4j.Session.run functions with tracing
* @param {Function} wrappedFunction The function to wrap from Neo4j
* @returns {Function} The wrapped function
*/
function neo4jSessionRunWrapper(wrappedFunction) {
return function internalNeo4jSessionRunWrapper(...args) {
function neo4jTransactionSessionRunWrapper(wrappedFunction) {
return function internalNeo4jTransactionSessionRunWrapper(...args) {
const relevantArgs = getArgsFromFunction(...args, this.constructor.name);
const {
query, params, transactionConfig,
} = relevantArgs;

utils.debugLog('User called Neo4j wrapped Session run function');
let resultResponse;
try {
const startTime = Date.now();
const dbApiEvent = createNewNeo4jEvent(this, startTime);

eventInterface.addToMetadata(
dbApiEvent,
{},
{ query, param: params, transaction_config: transactionConfig }
);

resultResponse = wrappedFunction.apply(
this,
[query, params, transactionConfig]
);
if (relevantArgs.operation === 'session') {
const {
query, params, transactionConfig,
} = relevantArgs;

const originalSubscribe = resultResponse.subscribe;
utils.debugLog('User called Neo4j wrapped Session run function');

// Override the Result subscribe with patched subscriber
// eslint-disable-next-line func-names
resultResponse.subscribe = function (observer) {
originalSubscribe.call(this, getPatchedSubscribeMethods(
this,
observer,
arguments,
eventInterface.addToMetadata(
dbApiEvent,
startTime
));
};

return resultResponse;
} catch (error) {
tracer.addException(error);
return resultResponse;
}
};
}


/**
* Wraps neo4j.Transaction.run functions with tracing
* @param {Function} wrappedFunction The function to wrap from Neo4j
* @returns {Function} The wrapped function
*/
function neo4jTransactionRunWrapper(wrappedFunction) {
return function internalNeo4jTransactionRunWrapper(...args) {
const relevantArgs = getArgsFromFunction(...args, this.constructor.name);
const {
query, params,
} = relevantArgs;

utils.debugLog('User called Neo4j wrapped Transaction run function');
{},
{ query, param: params, transaction_config: transactionConfig }
);
} else {
const {
query, params,
} = relevantArgs;

utils.debugLog('User called Neo4j wrapped Transaction run function');

let resultResponse;
try {
const startTime = Date.now();
const dbApiEvent = createNewNeo4jEvent(this, startTime);

eventInterface.addToMetadata(
dbApiEvent,
{},
{ query, param: params }
);
eventInterface.addToMetadata(
dbApiEvent,
{},
{ query, param: params }
);
}

resultResponse = wrappedFunction.apply(this, [query, params]);
resultResponse = wrappedFunction.apply(this, args);

const originalSubscribe = resultResponse.subscribe;

// Override the Result subscribe with patched subscriber
// eslint-disable-next-line func-names
resultResponse.subscribe = function (observer) {
originalSubscribe.call(this, getPatchedSubscribeMethods(
this,
observer,
arguments,
dbApiEvent,
startTime
));
const records = [];

originalSubscribe.call(this, {
...observer,
// eslint-disable-next-line require-jsdoc
onKeys() {
// eslint-disable consistent-return
if (!observer.onKeys) return;
// eslint-disable-next-line consistent-return
return observer.onKeys.apply(this, arguments);
},
// eslint-disable-next-line require-jsdoc
onNext(record) {
records.push(record);
// eslint-disable consistent-return
if (!observer.onNext) return;
// eslint-disable-next-line consistent-return
return observer.onNext.apply(this, arguments);
},

// eslint-disable-next-line require-jsdoc
onCompleted(summary) {
switch (summary.queryType) {
case 's':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Schema, Write' }
);
break;

case 'r':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read' }
);
break;

case 'w':
eventInterface.addToMetadata(
dbApiEvent,
{ operation_executed: 'Write' },
{ write_stats: summary.counters._stats }
);
break;
case 'rw':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read, Write' },
{ write_stats: summary.counters._stats }
);
break;

default:
utils.debugLog(`Unkown query type: ${summary.queryType}`);
break;
}

dbApiEvent.setDuration(utils.createDurationTimestamp(startTime));
tracer.addEvent(dbApiEvent);
return observer.onCompleted.apply(this, arguments);
},

// eslint-disable-next-line require-jsdoc
onError(err) {
eventInterface.setException(dbApiEvent, err);
// eslint-disable-next-line prefer-rest-params
return observer.onError.apply(this, arguments);
},
});
};

return resultResponse;
Expand All @@ -374,28 +323,28 @@ module.exports = {
moduleUtils.patchModule(
'neo4j-driver/lib/transaction.js',
'run',
neo4jTransactionRunWrapper,
neo4jTransactionSessionRunWrapper,
Transaction => Transaction.default.prototype
);

moduleUtils.patchModule(
'neo4j-driver/lib/session.js',
'run',
neo4jSessionRunWrapper,
neo4jTransactionSessionRunWrapper,
Session => Session.default.prototype
);

moduleUtils.patchModule(
'neo4j-driver-core/lib/transaction.js',
'run',
neo4jTransactionRunWrapper,
neo4jTransactionSessionRunWrapper,
Transaction => Transaction.default.prototype
);

moduleUtils.patchModule(
'neo4j-driver-core/lib/session.js',
'run',
neo4jSessionRunWrapper,
neo4jTransactionSessionRunWrapper,
Session => Session.default.prototype
);
},
Expand Down