Skip to content

Commit

Permalink
Merge pull request #83 from sOfekS/main
Browse files Browse the repository at this point in the history
moved consumer's stateChangeHandler out of constructor
  • Loading branch information
ronfarkash committed Jul 17, 2022
2 parents 4986bd9 + 50b5dfd commit bc83fb6
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 39 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ const consumer = new Consumer({
receiveQueueSize: 1000,
logLevel: logLevel.INFO,
// you can also provide logCreator function
stateChangeHandler: ({previousState, newState}) => {
console.log(`Consumer previous state ${previousState}.`)
console.log(`Consumer new state ${newState}.`)
}
})

const run = async () => {
Expand All @@ -101,6 +97,12 @@ const run = async () => {
]});

await consumer.subscribe();

consumer.onStateChange(({previousState, newState}) => {
console.log(`Consumer state has changed from ${previousState} to ${newState}.`);
};
);

await consumer.run({
onMessage: async ({ ack, message, properties, redeliveryCount }) => {
await ack(); // Default is individual ack
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pulsar-flex",
"version": "1.1.0-beta.0",
"version": "1.1.1",
"description": "A package that natively supports pulsar api",
"main": "src/index.js",
"scripts": {
Expand Down
8 changes: 4 additions & 4 deletions scripts/test-setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ console.log(`Setting up environment using JWT and pulsar standalone version ${ve
await asyncExec(`docker rm ${containerName} -f`);
console.log(`Successfully found and removed docker with name ${containerName}`);
} catch (e) {
console.log(`Did not found docker with name ${containerName}`);
console.log(`Did not find docker with name ${containerName}`);
}

const dockerComposeFilePath = path.join(
Expand Down Expand Up @@ -57,16 +57,16 @@ console.log(`Setting up environment using JWT and pulsar standalone version ${ve
console.log(`Found a running pulsar container, continuing`);
}

console.log('Creating test topic public/default/test');
console.log('Creating test topic "public/default/test"');
await asyncExec(
`docker exec ${containerName} /pulsar/bin/pulsar-admin topics create public/default/test`
);
console.log('Creating subscription subscription');
console.log('Creating "subscription" subscription');

await asyncExec(
`docker exec ${containerName} /pulsar/bin/pulsar-admin topics create-subscription -s subscription public/default/test`
);

console.log('You can run the your tests now!');
console.log('You can run your tests now!');
process.exit(0);
})();
10 changes: 8 additions & 2 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ module.exports = class Consumer {
reconnectInterval = 5000,
logLevel,
logCreator = defaultLogger,
stateChangeHandler = null,
}) {
this._logger = createLogger({ logLevel, logCreator });
this._client = new Pulsar({
Expand Down Expand Up @@ -64,7 +63,7 @@ module.exports = class Consumer {
this._onMessageParams = {};
this._processTimeoutInterval = null;

this._onStateChangeHandler = stateChangeHandler;
this._onStateChangeHandler = null;

this._receiveQueue = new PriorityQueue({
maxQueueSize: receiveQueueSize,
Expand Down Expand Up @@ -197,6 +196,13 @@ module.exports = class Consumer {
}
};

onStateChange = (stateChangeHandler) => {
this._onStateChangeHandler = stateChangeHandler;
this._logger.info(
`Set new state change handler for consumer: ${this._consumerName}(${this._consumerId})`
);
};

_setRedeliveringUnacknowledgedMessages = (redeliveringUnacknowledgedMessages) => {
if (
redeliveringUnacknowledgedMessages &&
Expand Down
41 changes: 13 additions & 28 deletions test/consumer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,7 @@ describe('Consumer tests', function () {
});
});
describe('Consumer State Change Handling Tests', function () {
let stateChangeConsumerRef;
const stateChangeErrorConsumer = new Consumer({
const stateChangeConsumer = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
Expand All @@ -630,52 +629,38 @@ describe('Consumer tests', function () {
readCompacted: false,
receiveQueueSize,
logLevel: LEVELS.TRACE,
stateChangeHandler: ({ previousState, newState }) => {
throw new Error('Unexpected fake error!');
},
});
afterEach(async function () {
if (stateChangeErrorConsumer._isSubscribed) {
await stateChangeErrorConsumer.unsubscribe();
}
if (stateChangeConsumerRef._isSubscribed) {
await stateChangeConsumerRef.unsubscribe();
if (stateChangeConsumer._isSubscribed) {
await stateChangeConsumer.unsubscribe();
}
});
it('Should run custom state change function provided.', async function () {
const stateChanged = await new Promise(async (resolve, reject) => {
const stateChangeConsumer = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
subscription: 'subscription',
subType: Consumer.SUB_TYPES.FAILOVER,
consumerName: 'Consy6',
readCompacted: false,
receiveQueueSize,
logLevel: LEVELS.INFO,
stateChangeHandler: ({ previousState, newState }) => {
if (previousState !== newState) {
resolve(true);
}
},
stateChangeConsumer.onStateChange(({ previousState, newState }) => {
if (previousState !== newState) {
resolve(true);
}
});
stateChangeConsumerRef = stateChangeConsumer;
// triggers state change
await stateChangeConsumer.subscribe();
});
assert.ok(stateChanged);
});
it('Should continue reading even if custom consumer state change function throws errors.', async function () {
stateChangeConsumer.onStateChange(({ previousState, newState }) => {
throw new Error('Unexpected fake error!');
});

let expectedNumOfMessages = 20;
let actualNumOfMessages = 0;
const messages = Array(expectedNumOfMessages).fill('message');
await utils.produceMessages({ messages });

await new Promise(async (resolve, reject) => {
// triggers state change
await stateChangeErrorConsumer.subscribe();
await stateChangeErrorConsumer.run({
await stateChangeConsumer.subscribe();
await stateChangeConsumer.run({
onMessage: async ({ message }) => {
actualNumOfMessages++;
if (actualNumOfMessages === 10) await utils.unloadTopic();
Expand Down

0 comments on commit bc83fb6

Please sign in to comment.