Skip to content

Commit

Permalink
Support reporting to firebase metrics on logs and stop writing new lo…
Browse files Browse the repository at this point in the history
…gs when reaching the limit (#11)
  • Loading branch information
itai-codefresh committed Nov 11, 2018
1 parent 166cd94 commit baad313
Show file tree
Hide file tree
Showing 7 changed files with 1,572 additions and 169 deletions.
67 changes: 54 additions & 13 deletions lib/ContainerLogger.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
'use strict';

const EventEmitter = require('events');
const Q = require('q');
const logger = require('cf-logs').Logger('codefresh:containerLogger');
const CFError = require('cf-errors');
const LoggerStrategy = require('./enums').LoggerStrategy;

class ContainerLogger {
class ContainerLogger extends EventEmitter {

constructor(containerId, containerInterface, firebaseLogger, firebaseLastUpdate, loggerStrategy) {
this.containerId = containerId;
this.containerInterface = containerInterface;
this.firebaseLogger = firebaseLogger;
this.firebaseLastUpdate = firebaseLastUpdate;
this.loggerStrategy = loggerStrategy;
this.tty = false;
constructor({
containerId,
containerInterface,
firebaseLogger,
firebaseLastUpdate,
firebaseMetricsLogs,
logSizeLimit,
isWorkflowLogSizeExceeded, // eslint-disable-line
loggerStrategy
}) {
super();
this.containerId = containerId;
this.containerInterface = containerInterface;
this.firebaseLogger = firebaseLogger;
this.firebaseLastUpdate = firebaseLastUpdate;
this.firebaseMetricsLogs = firebaseMetricsLogs;
this.loggerStrategy = loggerStrategy;
this.tty = false;
this.logSizeLimit = logSizeLimit;
this.logSize = 0;
this.isWorkflowLogSizeExceeded = isWorkflowLogSizeExceeded;
this.stepFinished = false;
}

start() {
Expand All @@ -34,18 +50,24 @@ class ContainerLogger {
// See documentation of the docker api here: https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
if (this.tty) {
this._handleTtyStream(stdout, false);
this._handleTtyStream(stderr, true);
if (stderr) {
this._handleTtyStream(stderr, true);
}
} else {
this._handleNonTtyStream(stdout, false);
}

stdout.on('end', () => {
this.stepFinished = true;
logger.info(`stdout end event was fired for container: ${this.containerId}`);
});

stderr.on('end', () => {
logger.info(`stderr end event was fired for container: ${this.containerId}`);
});
if (stderr) {
stderr.on('end', () => {
this.stepFinished = true;
logger.info(`stderr end event was fired for container: ${this.containerId}`);
});
}
}, (err) => {
return Q.reject(new CFError({
cause: err,
Expand Down Expand Up @@ -105,13 +127,32 @@ class ContainerLogger {
logger.info(`Listening on stream 'readable' event for container: ${this.containerId}`);
}

_stepLogSizeExceeded() {
return this.logSize > this.logSizeLimit;
}

_logMessageToFirebase(message, isError) {
if (this.logSizeLimit && (this._stepLogSizeExceeded() || this.isWorkflowLogSizeExceeded()) && !isError) {
if (!this.logExceededLimitsNotified) {
this.logExceededLimitsNotified = true;
message = `\x1B[01;93mLog size exceeded for ${this._stepLogSizeExceeded() ? 'this step' : 'the workflow'}.\nThe step will continue to execute until it finished but new logs will not be stored.\x1B[0m\r\n`;
} else {
return;
}
}

if (isError) {
message = `\x1B[31m${message}\x1B[0m`;
}

this.firebaseLogger.push(message);
this.firebaseLastUpdate.set(new Date().getTime());

if (this.logSizeLimit) {
this.logSize += Buffer.byteLength(message);
this.firebaseMetricsLogs.child('total').set(this.logSize);
}
this.emit('message.logged');
}

}
Expand Down
10 changes: 8 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ cflogs.init(loggerOptions);

const Logger = require('./logger');


const logger = new Logger(process.env.LOGGER_ID, process.env.FIREBASE_AUTH_URL, process.env.FIREBASE_SECRET, process.env.LISTEN_ON_EXISTING);
const logger = new Logger({
loggerId: process.env.LOGGER_ID,
firebaseAuthUrl: process.env.FIREBASE_AUTH_URL,
firebaseSecret: process.env.FIREBASE_SECRET,
findExistingContainers: process.env.LISTEN_ON_EXISTING,
firebaseMetricsLogsUrl: process.env.FIREBASE_METRICS_LOGS_URL,
logSizeLimit: process.env.LOG_SIZE_LIMIT ? (parseInt(process.env.LOG_SIZE_LIMIT) * 1000000) : undefined,
});

logger.validate();
logger.start();
70 changes: 68 additions & 2 deletions lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@ const ContainerLogger = require('./ContainerLogger');

class Logger {

constructor(loggerId, firebaseAuthUrl, firebaseSecret, findExistingContainers) {
constructor({
loggerId,
firebaseAuthUrl,
firebaseSecret,
findExistingContainers,
firebaseMetricsLogsUrl,
logSizeLimit
}) {
this.state = { status: 'init' };
this.firebaseAuthUrl = firebaseAuthUrl;
this.firebaseSecret = firebaseSecret;
this.loggerId = loggerId;
this.findExistingContainers = findExistingContainers === 'true';
this.firebaseMetricsLogsUrl = firebaseMetricsLogsUrl;
this.logSizeLimit = logSizeLimit;
this.containerLoggers = [];
this.logSize = 0;

let dockerSockPath;
if (fs.existsSync('/var/run/codefresh/docker.sock')) {
Expand Down Expand Up @@ -75,6 +86,7 @@ class Logger {
return;
}
logger.info(`Authenticated to firebase url: ${this.firebaseAuthUrl}`);
this.firebaseMetricsLogs = new Firebase(this.firebaseMetricsLogsUrl);

this._listenForNewContainers();

Expand Down Expand Up @@ -118,6 +130,19 @@ class Logger {
});
}


logLimitExceeded() {
// TODO in the future when we allow a workflow to use multuple dinds, this will not be correct
// we need to get the total size of logs from all dinds
return this.logSizeLimit && this._getTotalLogSize() > this.logSizeLimit;
}

_getTotalLogSize() {
return _.reduce(this.containerLoggers, (sum, containerLogger) => {
return sum + containerLogger.logSize;
}, 0);
}

/**
* receives a container and decides if to start listening on it
* @param loggerId
Expand All @@ -134,6 +159,12 @@ class Logger {
const receivedFirebaseLastUpdateUrl = _.get(container,
'Labels',
_.get(container, 'Actor.Attributes'))['io.codefresh.logger.firebase.lastUpdateUrl'];
const receivedFirebaseMetricsLogsUrl = _.get(container,
'Labels',
_.get(container, 'Actor.Attributes'))['io.codefresh.logger.firebase.metricsLogs'];
const receivedLogSizeLimit = _.get(container,
'Labels',
_.get(container, 'Actor.Attributes'))['io.codefresh.logger.logSizeLimit'];
const loggerStrategy = _.get(container, 'Labels', _.get(container, 'Actor.Attributes'))['io.codefresh.logger.strategy'];

if (!containerId) {
Expand Down Expand Up @@ -167,6 +198,11 @@ class Logger {
return;
}

if (!receivedFirebaseMetricsLogsUrl) {
logger.error(`Container: ${containerId} does contain a loggerFirebaseMetricsLogsUrl label`);
return;
}

if (!loggerStrategy) {
logger.error(`Container: ${containerId} does contain a loggerStrategy label`);
return;
Expand Down Expand Up @@ -210,8 +246,33 @@ class Logger {
return;
}

let firebaseMetricsLogs;
try {
firebaseMetricsLogs = new Firebase(receivedFirebaseMetricsLogsUrl);
} catch (err) {
const error = new CFError({
cause: err,
message: `Failed to create a new firebase metricsLogs ref`
});
logger.error(error.toString());
return;
}

const logSizeLimit = receivedLogSizeLimit ? (parseInt(receivedLogSizeLimit) * 1000000) : undefined;

const containerInterface = this.docker.getContainer(containerId);
const containerLogger = new ContainerLogger(containerId, containerInterface, firebaseLogger, firebaseLastUpdate, loggerStrategy);
const containerLogger = new ContainerLogger({
containerId,
containerInterface,
firebaseLogger,
firebaseLastUpdate,
firebaseMetricsLogs,
logSizeLimit,
isWorkflowLogSizeExceeded: this.logLimitExceeded.bind(this),
loggerStrategy
});
this.containerLoggers.push(containerLogger);
containerLogger.on('message.logged', this._updateTotalLogSize.bind(this));

containerLogger.start()
.done(() => {
Expand All @@ -227,6 +288,11 @@ class Logger {
});
}

_updateTotalLogSize() {
this.logSize = this._getTotalLogSize();
this.firebaseMetricsLogs.child('total').set(this.logSize);
}

/**
* Will check if a container was already handled (no matter what the handling status is)
* @param containerId
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"dependencies": {
"cf-errors": "^0.1.11",
"cf-logs": "git+https://github.com/codefresh-io/cf-logs.git#ceba4f309e52a077747a0c6bf9c3ad02e762dc4b",
"dockerode": "^2.3.0",
"docker-events": "0.0.2",
"dockerode": "^2.3.0",
"firebase": "git+https://github.com/codefresh-io/firebase.git#80b2ed883ff281cd67b53bd0f6a0bbd6f330fed5",
"forever": "^0.15.3",
"lodash": "^4.15.0",
Expand All @@ -22,6 +22,7 @@
"gulp-env": "^0.2.0",
"gulp-istanbul": "^0.10.4",
"gulp-jshint": "^1.11.0",
"gulp-mocha": "^6.0.0",
"gulp-mocha-co": "^0.4.1-co.3",
"gulp-rimraf": "^0.1.1",
"isparta": "^4.0.0",
Expand Down

0 comments on commit baad313

Please sign in to comment.