From fcd5c7f92eee5c804f7bec5fc67d53ac2b027b1d Mon Sep 17 00:00:00 2001 From: Maciek Sakrejda Date: Fri, 10 Feb 2017 21:04:43 -0800 Subject: [PATCH] Fix :topics:retention-time and :topics:compaction --- commands/topics_compaction.js | 24 ++++-- commands/topics_retention_time.js | 29 +++++-- lib/clusters.js | 15 +++- test/commands/topics_compaction_test.js | 93 +++++++++++++++++---- test/commands/topics_retention_time_test.js | 85 +++++++++++++++++-- 5 files changed, 206 insertions(+), 40 deletions(-) diff --git a/commands/topics_compaction.js b/commands/topics_compaction.js index e0a9ad7..09d27e1 100644 --- a/commands/topics_compaction.js +++ b/commands/topics_compaction.js @@ -5,6 +5,8 @@ let co = require('co') let parseBool = require('../lib/shared').parseBool let withCluster = require('../lib/clusters').withCluster let request = require('../lib/clusters').request +let topicConfig = require('../lib/clusters').topicConfig +let fetchProvisionedInfo = require('../lib/clusters').fetchProvisionedInfo const VERSION = 'v0' @@ -19,15 +21,27 @@ function * compaction (context, heroku) { msg += ` on ${context.args.CLUSTER}` } yield withCluster(heroku, context.app, context.args.CLUSTER, function * (addon) { + const topicName = context.args.TOPIC + let [ topicInfo, addonInfo ] = yield [ + topicConfig(heroku, addon.id, topicName), + fetchProvisionedInfo(heroku, addon) + ] + let retentionTime + if (enabled) { + retentionTime = addonInfo.capabilities.supports_mixed_cleanup_policy ? topicInfo.retention_time_ms : null + } else { + retentionTime = addonInfo.limits.minimum_retention_ms + } + let cleanupPolicy = { + compaction: enabled, + retention_time_ms: retentionTime + } + yield cli.action(msg, co(function * () { - const topicName = context.args.TOPIC return yield request(heroku, { method: 'PUT', body: { - topic: { - name: topicName, - compaction: enabled - } + topic: Object.assign({ name: topicName }, cleanupPolicy) }, path: `/data/kafka/${VERSION}/clusters/${addon.id}/topics/${topicName}` }) diff --git a/commands/topics_retention_time.js b/commands/topics_retention_time.js index 242ec96..c4f901e 100644 --- a/commands/topics_retention_time.js +++ b/commands/topics_retention_time.js @@ -5,13 +5,20 @@ let co = require('co') let parseDuration = require('../lib/shared').parseDuration let withCluster = require('../lib/clusters').withCluster let request = require('../lib/clusters').request +let topicConfig = require('../lib/clusters').topicConfig +let fetchProvisionedInfo = require('../lib/clusters').fetchProvisionedInfo const VERSION = 'v0' function * retentionTime (context, heroku) { - let parsed = parseDuration(context.args.VALUE) - if (parsed == null) { - cli.exit(1, `Unknown retention time '${context.args.VALUE}'; expected value like '36h' or '10d'`) + let parsed + if (context.args.VALUE === 'disable') { + parsed = null + } else { + parsed = parseDuration(context.args.VALUE) + if (parsed == null) { + cli.exit(1, `Unknown retention time '${context.args.VALUE}'; expected 'disable' or value like '36h' or '10d'`) + } } let msg = `Setting retention time for topic ${context.args.TOPIC} to ${context.args.VALUE}` @@ -19,15 +26,21 @@ function * retentionTime (context, heroku) { msg += ` on ${context.args.CLUSTER}` } yield withCluster(heroku, context.app, context.args.CLUSTER, function * (addon) { + const topicName = context.args.TOPIC + let [ topicInfo, addonInfo ] = yield [ + topicConfig(heroku, addon.id, topicName), + fetchProvisionedInfo(heroku, addon) + ] + let cleanupPolicy = { + retention_time_ms: parsed, + compaction: (!parsed || (addonInfo.capabilities.supports_mixed_cleanup_policy && topicInfo.compaction_enabled)) + } + yield cli.action(msg, co(function * () { - const topicName = context.args.TOPIC return yield request(heroku, { method: 'PUT', body: { - topic: { - name: topicName, - retention_time_ms: parsed - } + topic: Object.assign({ name: topicName }, cleanupPolicy) }, path: `/data/kafka/${VERSION}/clusters/${addon.id}/topics/${topicName}` }) diff --git a/lib/clusters.js b/lib/clusters.js index 9683b36..d48f092 100644 --- a/lib/clusters.js +++ b/lib/clusters.js @@ -78,6 +78,18 @@ function * topicConfig (heroku, addonId, topic) { return forTopic } +// Fetch kafka info about a provisioned cluster or exit with failure +function fetchProvisionedInfo (heroku, addon) { + return heroku.request({ + host: host(addon), + method: 'get', + path: `/data/kafka/v0/clusters/${addon.id}` + }).catch(err => { + if (err.statusCode !== 404) throw err + cli.exit(1, `${cli.color.addon(addon.name)} is not yet provisioned.\nRun ${cli.color.cmd('heroku kafka:wait')} to wait until the cluster is provisioned.`) + }) +} + function request (heroku, params) { const h = host() debug(`picked shogun host: ${h}`) @@ -92,5 +104,6 @@ module.exports = { HerokuKafkaClusters: HerokuKafkaClusters, withCluster, request, - topicConfig + topicConfig, + fetchProvisionedInfo } diff --git a/test/commands/topics_compaction_test.js b/test/commands/topics_compaction_test.js index 08b03fd..8209ad4 100644 --- a/test/commands/topics_compaction_test.js +++ b/test/commands/topics_compaction_test.js @@ -29,6 +29,14 @@ describe('kafka:topics:compaction', () => { return `/data/kafka/v0/clusters/${cluster}/topics/${topic}` } + let topicListUrl = (cluster) => { + return `/data/kafka/v0/clusters/${cluster}/topics` + } + + let infoUrl = (cluster) => { + return `/data/kafka/v0/clusters/${cluster}` + } + beforeEach(() => { kafka = nock('https://kafka-api.heroku.com:443') cli.mockConsole() @@ -52,26 +60,77 @@ describe('kafka:topics:compaction', () => { }) const validEnable = [ 'enable', 'on' ] - validEnable.forEach((value) => { - it(`turns compaction on with argument ${value}`, () => { - kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), - { topic: { name: 'topic-1', compaction: true } }).reply(200) - - return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }}) - .then(() => expect(cli.stderr).to.equal('Enabling compaction for topic topic-1... done\n')) - .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + const validDisable = [ 'disable', 'off' ] + + describe('if the cluster supports a mixed cleanup policy', () => { + beforeEach(() => { + kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123 } ] }) + kafka.get(infoUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { + capabilities: { supports_mixed_cleanup_policy: true }, + limits: { minimum_retention_ms: 20 } + }) + }) + + validEnable.forEach((value) => { + it(`uses the original retention and turns compaction on with argument ${value}`, () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', compaction: true, retention_time_ms: 123 } }) + .reply(200) + + return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }}) + .then(() => expect(cli.stderr).to.equal('Enabling compaction for topic topic-1... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) + }) + + validDisable.forEach((value) => { + it(`turns compaction off and sets retention to plan minimum with argument ${value}`, () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', compaction: false, retention_time_ms: 20 } }) + .reply(200) + + return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }}) + .then(() => expect(cli.stderr).to.equal('Disabling compaction for topic topic-1... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) }) }) - const validDisable = [ 'disable', 'off' ] - validDisable.forEach((value) => { - it(`turns compaction off with argument ${value}`, () => { - kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), - { topic: { name: 'topic-1', compaction: false } }).reply(200) - - return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }}) - .then(() => expect(cli.stderr).to.equal('Disabling compaction for topic topic-1... done\n')) - .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + describe('if the cluster does not support a mixed cleanup policy', () => { + beforeEach(() => { + kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123 } ] }) + kafka.get(infoUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { + capabilities: { supports_mixed_cleanup_policy: false }, + limits: { minimum_retention_ms: 20 } + }) + }) + + validEnable.forEach((value) => { + it(`turns off retention and turns compaction on with argument ${value}`, () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', compaction: true, retention_time_ms: null } }) + .reply(200) + + return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }}) + .then(() => expect(cli.stderr).to.equal('Enabling compaction for topic topic-1... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) + }) + + validDisable.forEach((value) => { + it(`turns compaction off and sets retention to plan minimum with argument ${value}`, () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', compaction: false, retention_time_ms: 20 } }) + .reply(200) + + return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }}) + .then(() => expect(cli.stderr).to.equal('Disabling compaction for topic topic-1... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) }) }) }) diff --git a/test/commands/topics_retention_time_test.js b/test/commands/topics_retention_time_test.js index 1f9d46f..7e78d9e 100644 --- a/test/commands/topics_retention_time_test.js +++ b/test/commands/topics_retention_time_test.js @@ -29,6 +29,14 @@ describe('kafka:topics:retention-time', () => { return `/data/kafka/v0/clusters/${cluster}/topics/${topic}` } + let topicListUrl = (cluster) => { + return `/data/kafka/v0/clusters/${cluster}/topics` + } + + let infoUrl = (cluster) => { + return `/data/kafka/v0/clusters/${cluster}` + } + beforeEach(() => { kafka = nock('https://kafka-api.heroku.com:443') cli.mockConsole() @@ -46,18 +54,77 @@ describe('kafka:topics:retention-time', () => { args: { TOPIC: 'topic-1', VALUE: '1 fortnight' }, flags: {}})) .then(() => expect(cli.stdout).to.be.empty) - .then(() => expect(cli.stderr).to.equal(` ▸ Unknown retention time '1 fortnight'; expected value like '36h' or '10d'\n`)) + .then(() => expect(cli.stderr).to.equal(` ▸ Unknown retention time '1 fortnight'; expected 'disable' or value like + ▸ '36h' or '10d' +`)) }) }) - it('sets retention time to the specified value', () => { - kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), - { topic: { name: 'topic-1', retention_time_ms: 60000 } }).reply(200) + describe('if the cluster supports a mixed cleanup policy', () => { + beforeEach(() => { + kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123, compaction_enabled: false } ] }) + kafka.get(infoUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { + capabilities: { supports_mixed_cleanup_policy: true }, + limits: { minimum_retention_ms: 20 } + }) + }) + + it('sets retention time and leaves compaction as is if a value is specified', () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', retention_time_ms: 60000, compaction: false } }).reply(200) + + return cmd.run({app: 'myapp', + args: { TOPIC: 'topic-1', VALUE: '60s' }, + flags: {}}) + .then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to 60s... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) - return cmd.run({app: 'myapp', - args: { TOPIC: 'topic-1', VALUE: '60s' }, - flags: {}}) - .then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to 60s... done\n')) - .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + it('clears retention and turns on compaction if `disabled` is specified', () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', retention_time_ms: null, compaction: true } }).reply(200) + + return cmd.run({app: 'myapp', + args: { TOPIC: 'topic-1', VALUE: 'disable' }, + flags: {}}) + .then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to disable... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) + }) + + describe('if the cluster does not support a mixed cleanup policy', () => { + beforeEach(() => { + kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123, compaction_enabled: true } ] }) + kafka.get(infoUrl('00000000-0000-0000-0000-000000000000')) + .reply(200, { + capabilities: { supports_mixed_cleanup_policy: false }, + limits: { minimum_retention_ms: 20 } + }) + }) + + it('sets retention time and turns off compaction if a value is specified', () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', retention_time_ms: 60000, compaction: false } }).reply(200) + + return cmd.run({app: 'myapp', + args: { TOPIC: 'topic-1', VALUE: '60s' }, + flags: {}}) + .then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to 60s... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) + + it('clears retention and turns on compaction if `disabled` is specified', () => { + kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'), + { topic: { name: 'topic-1', retention_time_ms: null, compaction: true } }).reply(200) + + return cmd.run({app: 'myapp', + args: { TOPIC: 'topic-1', VALUE: 'disable' }, + flags: {}}) + .then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to disable... done\n')) + .then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')) + }) }) })