Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.1.3 (June 5, 2020)

* Fix `timeout is not a function` bug

## 2.1.2 (May 22, 2020)

* Update sailor version to 2.6.7
Expand Down
1 change: 1 addition & 0 deletions lib/actions/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ async function ProcessAction(msg, cfg) {
rowCount = 0;
self.logger.trace('Emitting message %j', messageToEmit);
await self.emit('data', messageToEmit);
await init(cfg);
}, TIMEOUT_BETWEEN_EVENTS);

let row = msg.body.writer;
Expand Down
5 changes: 4 additions & 1 deletion lib/actions/writeFromJson.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ async function init(cfg) {
util.addRetryCountInterceptorToAxios(ax);
readyFlag = true;
}
async function ProcessAction(msg) {

async function ProcessAction(msg, cfg) {
// eslint-disable-next-line consistent-this
const self = this;
const { inputObject } = msg.body;
Expand All @@ -83,6 +84,7 @@ async function ProcessAction(msg) {
}

if (timeout) {
this.logger.info('Clearing timeout...');
clearTimeout(timeout);
}

Expand Down Expand Up @@ -114,6 +116,7 @@ async function ProcessAction(msg) {
rowCount = 0;
self.logger.trace('Emitting message %j', messageToEmit);
await self.emit('data', messageToEmit);
await init(cfg);
}, TIMEOUT_BETWEEN_EVENTS);

let row = inputObject;
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "csv-component",
"version": "2.1.2",
"version": "2.1.3",
"description": "CSV Component for elastic.io platform",
"main": "index.js",
"scripts": {
Expand Down
150 changes: 150 additions & 0 deletions spec/writeTimeoutError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/* eslint-disable no-unused-vars */
const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
const fs = require('fs');
const nock = require('nock');
const sinon = require('sinon');
const logger = require('@elastic.io/component-logger')();

chai.use(chaiAsPromised);
const { expect } = require('chai');

if (fs.existsSync('.env')) {
// eslint-disable-next-line global-require
require('dotenv').config();
} else {
process.env.ELASTICIO_API_USERNAME = 'name';
process.env.ELASTICIO_API_KEY = 'key';
}

const write = require('../lib/actions/write.js');
const writeFromJson = require('../lib/actions/writeFromJson.js');

// eslint-disable-next-line func-names
describe('CSV Write Timeout', function () {
this.timeout(180000);

let emit;
let cfg;

before(async () => {
nock('https://api.elastic.io', { encodedQueryParams: true })
.post('/v2/resources/storage/signed-url')
.reply(200,
{ put_url: 'https://examlple.mock/putUrl', get_url: 'https://examlple.mock/getUrl' })
.persist();

nock('https://examlple.mock')
.put('/putUrl')
.reply(200, {})
.persist();
});

beforeEach(() => {
emit = sinon.spy();
});

describe('raw', () => {
before(() => {
cfg = {
writer: {
columns: [
{ property: 'header1' },
{ property: 'header2' },
],
},
};
});

it('should write', async () => {
await write.init.call({
logger,
}, cfg);

const msg1 = {
body: {
inputObject: {
ProductKey: 'text11',
CategoryGroup_1: 'text12',
},
},
};

for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await write.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}
await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(4);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(1);
for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await write.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}

await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(8);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(2);
});
});

describe('From Object', () => {
before(() => {
cfg = {
includeHeaders: 'Yes',
separator: 'semicolon',
};
});

it('should write from object', async () => {
await writeFromJson.init.call({
logger,
}, cfg);

const msg1 = {
body: {
inputObject: {
ProductKey: 'text11',
CategoryGroup_1: 'text12',
},
},
};

for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await writeFromJson.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}
await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(4);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(1);
for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await writeFromJson.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}

await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(8);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(2);
});
});
});