Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options for env, startedBy and region and add a trap for SIGINT #1

Merged
merged 14 commits into from
Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# EditorConfig helps developers define and maintain consistent
# coding styles between different editors and IDEs
# editorconfig.org

root = true

[*]
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
indent_style = space
indent_size = 2

[*.js]
indent_style = space
indent_size = 2

[*.md]
trim_trailing_whitespace = false
18 changes: 18 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"env": {
"es6": true,
"mocha": true,
"node": true
},
"extends": "eslint:recommended",
"parserOptions": {
"ecmaVersion": 6,
"sourceType": "script"
},
"rules": {
"no-unused-vars": [ "error", { "argsIgnorePattern": "^_" } ],
"no-console": "off",
"strict": ["error", "global"],
"one-var": ["error", { "var": "never", "let": "never", "const": "never" }]
}
}
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ As a module: `npm install ecs-task-runner --save`
Usage
-----

ECS Task Runner requires an already existing ECS cluster and Task Definition. The Task Definition must send it's logs to AWS Cloudwatch and the ECS hosts will need an IAM role that has permission to do that.
ECS Task Runner requires an already existing ECS cluster and Task Definition. The Task Definition must send it's logs to AWS Cloudwatch (using awslogs-stream-prefix) and the ECS hosts will need an IAM role that has permission to do that.

### CLI Tool

Expand All @@ -22,6 +22,9 @@ Options:
--task-definition [required]
--container-name [required]
--cmd [required]
--started-by
--env
--region (default: us-east-1)
```

#### cluster
Expand All @@ -36,6 +39,18 @@ The name of your container in your Task Definition that you want to run this com
#### cmd
The command you want to run

#### started-by
If provided, this will show up as startedBy in your ECS console

#### env
This option is a key/value pair defined as `key=value` and can be repeated multiple times. Each
pair is passed as an environment variable to the container, where `key` is the name of the env var
and `value` is it's value.

#### region
The AWS region used when accessing ECS and CloudWatch. If nothing is provided falls back to `us-east-1`.
The `AWS_DEFAULT_REGION` environment variable has precendence over this setting.

### Example Module Usage

```
Expand Down
75 changes: 69 additions & 6 deletions bin/ecs-task-runner
Original file line number Diff line number Diff line change
@@ -1,17 +1,62 @@
#!/usr/bin/env node
'use strict'

var async = require('async');
var ecsTaskRunner = require('../');
const _ = require('lodash'),
async = require('async'),
ecsTaskRunner = require('../'),
colors = require('colors'),
yargs = require('yargs');

var argv = require('yargs')
.demand([ 'cluster', 'task-definition', 'container-name', 'cmd' ])
const argv = yargs
.option('cluster', {
alias: 'c',
describe: 'Name of the cluster to run a task on',
demandOption: true
})
.option('task-definition', {
alias: 't',
describe: 'The task (family:revision) to run',
demandOption: true
})
.option('container-name', {
alias: 'n',
describe: 'Name of the container within the task definition to run',
demandOption: true
})
.option('cmd', {
describe: 'The command to run within the container',
demandOption: true
})
.option('started-by', {
describe: 'Describes who the container was started by'
})
.option('env', {
array: true,
describe: 'key=value Pass an additional environment variable to the container',
coerce: opts => {
return _.map(opts, item => {
let pieces = _.split(item, '=', 2);
return { name: pieces[0], value: pieces[1] };
})
}
})
.option('region', {
alias: 'r',
default: 'us-east-1',
describe: 'The region used when talking to ECS and CloudWatch'
})
.help()
.wrap(yargs.terminalWidth())
.argv;

var options = {
const options = {
clusterArn: argv.cluster,
taskDefinitionArn: argv.taskDefinition,
containerName: argv.containerName,
cmd: argv.cmd
cmd: argv.cmd,
region: argv.region,
startedBy: argv.startedBy,
env: argv.env
};

ecsTaskRunner(options, function(err, stream) {
Expand All @@ -34,5 +79,23 @@ ecsTaskRunner(options, function(err, stream) {
process.exit(stream.logStream.exitCode);
});

process.on('SIGINT', () => {
if (stream.taskRunner && stream.taskId) {
console.log(`Received SIGINT. Asking ECS to stop task: ${stream.taskId}`);

const params = {
clusterArn: options.clusterArn,
taskId: stream.taskId,
reason: 'User requested interrupt'
};

stream.taskRunner.stop(params, () => {
stream.logStream.shutDown();
});
} else {
process.exit(2);
}
});

stream.pipe(process.stdout);
});
46 changes: 25 additions & 21 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
'use strict';

var async = require('async');
var randomstring = require('randomstring');
var _ = require('lodash');
var AWS = require('aws-sdk');
var combiner = require('stream-combiner');

var taskRunner = require('./lib/taskrunner');
var LogStream = require('./lib/log-stream');
var FormatStream = require('./lib/format-transform-stream');
const _ = require('lodash');
const async = require('async');
const AWS = require('aws-sdk');
const combiner = require('stream-combiner');
const FormatStream = require('./lib/format-transform-stream');
const LogStream = require('./lib/log-stream');
const randomstring = require('randomstring');
const taskRunner = require('./lib/taskrunner');

module.exports = function(options, cb) {
AWS.config.update({
region: process.env.AWS_DEFAULT_REGION || 'us-east-1'
region: process.env.AWS_DEFAULT_REGION || options.region
});

var containerDefinition = null;
var loggingDriver = null;
var logOptions = null;
var loggingDriver = null;
var logOptions = null;

// Generate a random string we will use to know when
// the log stream is finished.
Expand All @@ -27,7 +27,7 @@ module.exports = function(options, cb) {

async.waterfall([
function(next) {
var ecs = new AWS.ECS();
const ecs = new AWS.ECS();
ecs.describeTaskDefinition({ taskDefinition: options.taskDefinitionArn }, function(err, result) {
if (err) return next(err);

Expand All @@ -53,29 +53,33 @@ module.exports = function(options, cb) {
function(next) {
var params = {
clusterArn: options.clusterArn,
taskDefinitionArn: options.taskDefinitionArn,
containerName: options.containerName,
cmd: options.cmd,
endOfStreamIdentifier: endOfStreamIdentifier
containerName: options.containerName,
endOfStreamIdentifier: endOfStreamIdentifier,
env: options.env,
startedBy: options.startedBy,
taskDefinitionArn: options.taskDefinitionArn
}

taskRunner.run(params, next);
}
], function(err, taskDefinition) {
if (err) return cb(err);

var taskArn = taskDefinition.tasks[0].taskArn;
var taskId = taskArn.substring(taskArn.lastIndexOf('/')+1);
const taskArn = taskDefinition.tasks[0].taskArn;
const taskId = taskArn.substring(taskArn.lastIndexOf('/')+1);
const formatter = new FormatStream();

var formatter = new FormatStream();
var logs = new LogStream({
const logs = new LogStream({
logGroup: logOptions['awslogs-group'],
logStream: `${logOptions['awslogs-stream-prefix']}/${options.containerName}/${taskId}`,
endOfStreamIdentifier: endOfStreamIdentifier
});

var stream = combiner(logs, formatter);
stream.logStream = logs;
stream.taskRunner = taskRunner;
stream.taskId = taskId;

cb(null, stream);
});
Expand Down
4 changes: 2 additions & 2 deletions lib/aws.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

var AWS = require('aws-sdk');
var region = process.env.AWS_DEFAULT_REGION || 'us-east-1';
const AWS = require('aws-sdk');
const region = process.env.AWS_DEFAULT_REGION || 'us-east-1';

AWS.config.update({
region: region
Expand Down
7 changes: 2 additions & 5 deletions lib/format-transform-stream.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
'use strict'

var Transform = require('stream').Transform;
var moment = require('moment');

require("colors");
const Transform = require('stream').Transform;

class FormatTransformStream extends Transform {
constructor(options) {
constructor(_options) {
super({ objectMode: true });
}

Expand Down
25 changes: 17 additions & 8 deletions lib/log-stream.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict'

var Readable = require('stream').Readable;
var _ = require('lodash');
var async = require('async');
var AWS = require('aws-sdk');
const _ = require('lodash');
const AWS = require('aws-sdk');
const moment = require('moment');
const Readable = require('stream').Readable;

// Take cloudwatchlogs.getLogEvents data and
// return logs events as strings
Expand All @@ -30,9 +30,10 @@ class LogStream extends Readable {
this.logsReceived = false;
this.streamEnded = false;
this.cloudwatchlogs = new AWS.CloudWatchLogs();
this.stopRequested = false;
}

fetchLogs(cb) {
fetchLogs(_cb) {
this.pending = true;

var params = {
Expand All @@ -42,7 +43,7 @@ class LogStream extends Readable {
nextToken: this.nextToken
};

var next = (err, data) => {
var next = (_err, _data) => {
setTimeout(this._read.bind(this), this.options.durationBetweenPolls);
};

Expand All @@ -62,7 +63,7 @@ class LogStream extends Readable {

// If we haven't recieved any logs at all and timeoutBeforeFirstLogs duration has passed. Fail
if (!this.logsReceived && (Date.now() - this.startTime) > this.options.timeoutBeforeFirstLogs) {
var err = new Error(`No logs recieved before timeoutBeforeFirstLogs option set at '${timeoutBeforeFirstLogs}'`);
let err = new Error(`No logs recieved before timeoutBeforeFirstLogs option set at '${this.options.timeoutBeforeFirstLogs}'`);
return process.nextTick(() => this.emit('error', err));
}

Expand All @@ -72,6 +73,10 @@ class LogStream extends Readable {
var endOfStreamIdentifierBase64 = new Buffer(this.options.endOfStreamIdentifier).toString('base64');
var endEvent = _.find(data.events, (event) => event.message.includes(endOfStreamIdentifierBase64));

if (this.stopRequested) {
this.exitCode = 130;
}

if (endEvent) {
this.streamEnded = true;

Expand All @@ -85,14 +90,18 @@ class LogStream extends Readable {
});
}

shutDown() {
this.stopRequested = true;
}

_read() {
var active = true;
while (active && this.eventBuffer.length) active = this.push(this.eventBuffer.shift());

// Downstream buffers are full. Lets give them 100ms to recover
if (!active) return setTimeout(this._read.bind(this), 100);

if (this.streamEnded) return this.push(null);
if (this.streamEnded || this.stopRequested) return this.push(null);
if (active && !this.pending) this.fetchLogs();
}
}
Expand Down
Loading