diff --git a/CHANGELOG.md b/CHANGELOG.md index 5927254..f55afa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.1.3 (June 5, 2020) + +* Fix `timeout is not a function` bug + ## 2.1.2 (May 22, 2020) * Update sailor version to 2.6.7 diff --git a/lib/actions/write.js b/lib/actions/write.js index 87cea50..4b1f6b6 100644 --- a/lib/actions/write.js +++ b/lib/actions/write.js @@ -88,6 +88,7 @@ async function ProcessAction(msg, cfg) { rowCount = 0; self.logger.trace('Emitting message %j', messageToEmit); await self.emit('data', messageToEmit); + await init(cfg); }, TIMEOUT_BETWEEN_EVENTS); let row = msg.body.writer; diff --git a/lib/actions/writeFromJson.js b/lib/actions/writeFromJson.js index 01cfbde..c1f98f3 100644 --- a/lib/actions/writeFromJson.js +++ b/lib/actions/writeFromJson.js @@ -61,7 +61,8 @@ async function init(cfg) { util.addRetryCountInterceptorToAxios(ax); readyFlag = true; } -async function ProcessAction(msg) { + +async function ProcessAction(msg, cfg) { // eslint-disable-next-line consistent-this const self = this; const { inputObject } = msg.body; @@ -83,6 +84,7 @@ async function ProcessAction(msg) { } if (timeout) { + this.logger.info('Clearing timeout...'); clearTimeout(timeout); } @@ -114,6 +116,7 @@ async function ProcessAction(msg) { rowCount = 0; self.logger.trace('Emitting message %j', messageToEmit); await self.emit('data', messageToEmit); + await init(cfg); }, TIMEOUT_BETWEEN_EVENTS); let row = inputObject; diff --git a/package-lock.json b/package-lock.json index 9c1627c..113d4e1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "csv-component", - "version": "2.1.2", + "version": "2.1.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 2752f6a..65c210d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "csv-component", - "version": "2.1.2", + "version": "2.1.3", "description": "CSV Component for elastic.io platform", "main": "index.js", "scripts": { diff --git a/spec/writeTimeoutError.js b/spec/writeTimeoutError.js new file mode 100644 index 0000000..4515d0e --- /dev/null +++ b/spec/writeTimeoutError.js @@ -0,0 +1,150 @@ +/* eslint-disable no-unused-vars */ +const chai = require('chai'); +const chaiAsPromised = require('chai-as-promised'); +const fs = require('fs'); +const nock = require('nock'); +const sinon = require('sinon'); +const logger = require('@elastic.io/component-logger')(); + +chai.use(chaiAsPromised); +const { expect } = require('chai'); + +if (fs.existsSync('.env')) { + // eslint-disable-next-line global-require + require('dotenv').config(); +} else { + process.env.ELASTICIO_API_USERNAME = 'name'; + process.env.ELASTICIO_API_KEY = 'key'; +} + +const write = require('../lib/actions/write.js'); +const writeFromJson = require('../lib/actions/writeFromJson.js'); + +// eslint-disable-next-line func-names +describe('CSV Write Timeout', function () { + this.timeout(180000); + + let emit; + let cfg; + + before(async () => { + nock('https://api.elastic.io', { encodedQueryParams: true }) + .post('/v2/resources/storage/signed-url') + .reply(200, + { put_url: 'https://examlple.mock/putUrl', get_url: 'https://examlple.mock/getUrl' }) + .persist(); + + nock('https://examlple.mock') + .put('/putUrl') + .reply(200, {}) + .persist(); + }); + + beforeEach(() => { + emit = sinon.spy(); + }); + + describe('raw', () => { + before(() => { + cfg = { + writer: { + columns: [ + { property: 'header1' }, + { property: 'header2' }, + ], + }, + }; + }); + + it('should write', async () => { + await write.init.call({ + logger, + }, cfg); + + const msg1 = { + body: { + inputObject: { + ProductKey: 'text11', + CategoryGroup_1: 'text12', + }, + }, + }; + + for (let i = 0; i < 3; i += 1) { + // eslint-disable-next-line no-await-in-loop + await write.process.call({ + emit, + logger, + }, msg1, cfg); + // eslint-disable-next-line no-await-in-loop + await new Promise(resolve => setTimeout(resolve, 1000)); + } + await new Promise(resolve => setTimeout(resolve, 12000)); + expect(emit.getCalls().length).to.equal(4); + expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(1); + for (let i = 0; i < 3; i += 1) { + // eslint-disable-next-line no-await-in-loop + await write.process.call({ + emit, + logger, + }, msg1, cfg); + // eslint-disable-next-line no-await-in-loop + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + await new Promise(resolve => setTimeout(resolve, 12000)); + expect(emit.getCalls().length).to.equal(8); + expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(2); + }); + }); + + describe('From Object', () => { + before(() => { + cfg = { + includeHeaders: 'Yes', + separator: 'semicolon', + }; + }); + + it('should write from object', async () => { + await writeFromJson.init.call({ + logger, + }, cfg); + + const msg1 = { + body: { + inputObject: { + ProductKey: 'text11', + CategoryGroup_1: 'text12', + }, + }, + }; + + for (let i = 0; i < 3; i += 1) { + // eslint-disable-next-line no-await-in-loop + await writeFromJson.process.call({ + emit, + logger, + }, msg1, cfg); + // eslint-disable-next-line no-await-in-loop + await new Promise(resolve => setTimeout(resolve, 1000)); + } + await new Promise(resolve => setTimeout(resolve, 12000)); + expect(emit.getCalls().length).to.equal(4); + expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(1); + for (let i = 0; i < 3; i += 1) { + // eslint-disable-next-line no-await-in-loop + await writeFromJson.process.call({ + emit, + logger, + }, msg1, cfg); + // eslint-disable-next-line no-await-in-loop + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + await new Promise(resolve => setTimeout(resolve, 12000)); + expect(emit.getCalls().length).to.equal(8); + expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(2); + }); + }); +});