Skip to content
This repository has been archived by the owner on Jun 13, 2023. It is now read-only.

Commit

Permalink
fix(neo4j): handle calls to remote neo4j server (#570)
Browse files Browse the repository at this point in the history
* fix(neo4j): hanlde calls to remote neo4j server

* fix(lint-fix): rm redundant space adn return statemants
  • Loading branch information
sagivr2020 authored Feb 16, 2022
1 parent 790c54a commit 20e23fc
Showing 1 changed file with 110 additions and 161 deletions.
271 changes: 110 additions & 161 deletions src/events/neo4j.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ const DEFAULT_HOST = 'neo4j';


/**
* Extracts the relevant arguments from provided arguments
* @returns {Object} Object of the extracted arguments
*/
* Extracts the relevant arguments from provided arguments
* @returns {Object} Object of the extracted arguments
*/
function getArgsFromFunction(...args) {
let relevantArgs = {};
switch (args[args.length - 1]) {
Expand All @@ -30,13 +30,15 @@ function getArgsFromFunction(...args) {
query: args[0],
params: args[1],
transactionConfig: args[2],
operation: 'session',
};
break;

case 'Transaction':
relevantArgs = {
query: args[0],
params: args[1],
operation: 'transaction',
};
break;
default:
Expand All @@ -48,10 +50,10 @@ function getArgsFromFunction(...args) {


/**
* Gets session's address info from Transaction/Session instance
* @param {neo4j.Session|neo4j.Transaction} session A DB session (could be Session of Transaction)
* @returns {Object} json with address info { host, port }
*/
* Gets session's address info from Transaction/Session instance
* @param {neo4j.Session|neo4j.Transaction} session A DB session (could be Session of Transaction)
* @returns {Object} json with address info { host, port }
*/
function getAddressInfo(session) {
let port = DEFAULT_PORT;
let host = DEFAULT_HOST;
Expand All @@ -75,8 +77,10 @@ function getAddressInfo(session) {

if (connectionHolder && connectionHolder._connectionProvider) {
const connectionProvider = connectionHolder._connectionProvider;
port = connectionProvider._address.port();
host = connectionProvider._address.host();
port = connectionProvider._address === undefined ? connectionProvider._seedRouter._port :
connectionProvider._address.port();
host = connectionProvider._address === undefined ? connectionProvider._seedRouter._host :
connectionProvider._address.host();
}

return {
Expand Down Expand Up @@ -145,85 +149,6 @@ function getNeo4JSessionMetadata(session) {
}


/**
* Create new patched methods for all relevant Observable methods and return
* @param {neo4j.session.Session | neo4j.transaction.Transaction} session A session
* with Neo4j server
* @param {Object} observer An observer instance to patch
* @param {IArguments} sessionArguments The arguments of the observer
* @param {serverlessEvent.Event} dbApiEvent The event of the current run
* @param {number} startTime The Event start time
* @returns {Object} All the patched methods
*/
function getPatchedSubscribeMethods(
session,
observer,
sessionArguments,
dbApiEvent,
startTime
) {
const records = [];

return {
...observer,
// eslint-disable-next-line require-jsdoc
onNext(record) {
records.push(record);
return observer.onNext.apply(session, sessionArguments);
},

// eslint-disable-next-line require-jsdoc
onCompleted(summary) {
switch (summary.queryType) {
case 's':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Schema, Write' }
);
break;

case 'r':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read' }
);
break;

case 'w':
eventInterface.addToMetadata(
dbApiEvent,
{ operation_executed: 'Write' },
{ write_stats: summary.counters._stats }
);
break;
case 'rw':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read, Write' },
{ write_stats: summary.counters._stats }
);
break;

default:
utils.debugLog(`Unkown query type: ${summary.queryType}`);
break;
}

dbApiEvent.setDuration(utils.createDurationTimestamp(startTime));
tracer.addEvent(dbApiEvent);
return observer.onCompleted.apply(session, sessionArguments);
},

// eslint-disable-next-line require-jsdoc
onError(err) {
eventInterface.setException(dbApiEvent, err);
// eslint-disable-next-line prefer-rest-params
return observer.onError.apply(session, sessionArguments);
},
};
}


/**
* Create new serverlessEvent for Neo4J run event (any type of run) and
* add relevant metadata
Expand Down Expand Up @@ -264,97 +189,121 @@ function createNewNeo4jEvent(session, startTime) {


/**
* Wraps neo4j.Session.run functions with tracing
* Wraps neo4j.Transaction.run and neo4j.Session.run functions with tracing
* @param {Function} wrappedFunction The function to wrap from Neo4j
* @returns {Function} The wrapped function
*/
function neo4jSessionRunWrapper(wrappedFunction) {
return function internalNeo4jSessionRunWrapper(...args) {
function neo4jTransactionSessionRunWrapper(wrappedFunction) {
return function internalNeo4jTransactionSessionRunWrapper(...args) {
const relevantArgs = getArgsFromFunction(...args, this.constructor.name);
const {
query, params, transactionConfig,
} = relevantArgs;

utils.debugLog('User called Neo4j wrapped Session run function');
let resultResponse;
try {
const startTime = Date.now();
const dbApiEvent = createNewNeo4jEvent(this, startTime);

eventInterface.addToMetadata(
dbApiEvent,
{},
{ query, param: params, transaction_config: transactionConfig }
);

resultResponse = wrappedFunction.apply(
this,
[query, params, transactionConfig]
);
if (relevantArgs.operation === 'session') {
const {
query, params, transactionConfig,
} = relevantArgs;

const originalSubscribe = resultResponse.subscribe;
utils.debugLog('User called Neo4j wrapped Session run function');

// Override the Result subscribe with patched subscriber
// eslint-disable-next-line func-names
resultResponse.subscribe = function (observer) {
originalSubscribe.call(this, getPatchedSubscribeMethods(
this,
observer,
arguments,
eventInterface.addToMetadata(
dbApiEvent,
startTime
));
};

return resultResponse;
} catch (error) {
tracer.addException(error);
return resultResponse;
}
};
}


/**
* Wraps neo4j.Transaction.run functions with tracing
* @param {Function} wrappedFunction The function to wrap from Neo4j
* @returns {Function} The wrapped function
*/
function neo4jTransactionRunWrapper(wrappedFunction) {
return function internalNeo4jTransactionRunWrapper(...args) {
const relevantArgs = getArgsFromFunction(...args, this.constructor.name);
const {
query, params,
} = relevantArgs;

utils.debugLog('User called Neo4j wrapped Transaction run function');
{},
{ query, param: params, transaction_config: transactionConfig }
);
} else {
const {
query, params,
} = relevantArgs;

utils.debugLog('User called Neo4j wrapped Transaction run function');

let resultResponse;
try {
const startTime = Date.now();
const dbApiEvent = createNewNeo4jEvent(this, startTime);

eventInterface.addToMetadata(
dbApiEvent,
{},
{ query, param: params }
);
eventInterface.addToMetadata(
dbApiEvent,
{},
{ query, param: params }
);
}

resultResponse = wrappedFunction.apply(this, [query, params]);
resultResponse = wrappedFunction.apply(this, args);

const originalSubscribe = resultResponse.subscribe;

// Override the Result subscribe with patched subscriber
// eslint-disable-next-line func-names
resultResponse.subscribe = function (observer) {
originalSubscribe.call(this, getPatchedSubscribeMethods(
this,
observer,
arguments,
dbApiEvent,
startTime
));
const records = [];

originalSubscribe.call(this, {
...observer,
// eslint-disable-next-line require-jsdoc
onKeys() {
// eslint-disable consistent-return
if (!observer.onKeys) return;
// eslint-disable-next-line consistent-return
return observer.onKeys.apply(this, arguments);
},
// eslint-disable-next-line require-jsdoc
onNext(record) {
records.push(record);
// eslint-disable consistent-return
if (!observer.onNext) return;
// eslint-disable-next-line consistent-return
return observer.onNext.apply(this, arguments);
},

// eslint-disable-next-line require-jsdoc
onCompleted(summary) {
switch (summary.queryType) {
case 's':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Schema, Write' }
);
break;

case 'r':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read' }
);
break;

case 'w':
eventInterface.addToMetadata(
dbApiEvent,
{ operation_executed: 'Write' },
{ write_stats: summary.counters._stats }
);
break;
case 'rw':
eventInterface.addToMetadata(
dbApiEvent,
{ items_count: records.length, operation_executed: 'Read, Write' },
{ write_stats: summary.counters._stats }
);
break;

default:
utils.debugLog(`Unkown query type: ${summary.queryType}`);
break;
}

dbApiEvent.setDuration(utils.createDurationTimestamp(startTime));
tracer.addEvent(dbApiEvent);
return observer.onCompleted.apply(this, arguments);
},

// eslint-disable-next-line require-jsdoc
onError(err) {
eventInterface.setException(dbApiEvent, err);
// eslint-disable-next-line prefer-rest-params
return observer.onError.apply(this, arguments);
},
});
};

return resultResponse;
Expand All @@ -374,28 +323,28 @@ module.exports = {
moduleUtils.patchModule(
'neo4j-driver/lib/transaction.js',
'run',
neo4jTransactionRunWrapper,
neo4jTransactionSessionRunWrapper,
Transaction => Transaction.default.prototype
);

moduleUtils.patchModule(
'neo4j-driver/lib/session.js',
'run',
neo4jSessionRunWrapper,
neo4jTransactionSessionRunWrapper,
Session => Session.default.prototype
);

moduleUtils.patchModule(
'neo4j-driver-core/lib/transaction.js',
'run',
neo4jTransactionRunWrapper,
neo4jTransactionSessionRunWrapper,
Transaction => Transaction.default.prototype
);

moduleUtils.patchModule(
'neo4j-driver-core/lib/session.js',
'run',
neo4jSessionRunWrapper,
neo4jTransactionSessionRunWrapper,
Session => Session.default.prototype
);
},
Expand Down

0 comments on commit 20e23fc

Please sign in to comment.