Skip to content

Commit

Permalink
Fix :topics:retention-time and :topics:compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciek Sakrejda committed Feb 11, 2017
1 parent cbe0c8e commit fcd5c7f
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 40 deletions.
24 changes: 19 additions & 5 deletions commands/topics_compaction.js
Expand Up @@ -5,6 +5,8 @@ let co = require('co')
let parseBool = require('../lib/shared').parseBool
let withCluster = require('../lib/clusters').withCluster
let request = require('../lib/clusters').request
let topicConfig = require('../lib/clusters').topicConfig
let fetchProvisionedInfo = require('../lib/clusters').fetchProvisionedInfo

const VERSION = 'v0'

Expand All @@ -19,15 +21,27 @@ function * compaction (context, heroku) {
msg += ` on ${context.args.CLUSTER}`
}
yield withCluster(heroku, context.app, context.args.CLUSTER, function * (addon) {
const topicName = context.args.TOPIC
let [ topicInfo, addonInfo ] = yield [
topicConfig(heroku, addon.id, topicName),
fetchProvisionedInfo(heroku, addon)
]
let retentionTime
if (enabled) {
retentionTime = addonInfo.capabilities.supports_mixed_cleanup_policy ? topicInfo.retention_time_ms : null
} else {
retentionTime = addonInfo.limits.minimum_retention_ms
}
let cleanupPolicy = {
compaction: enabled,
retention_time_ms: retentionTime
}

yield cli.action(msg, co(function * () {
const topicName = context.args.TOPIC
return yield request(heroku, {
method: 'PUT',
body: {
topic: {
name: topicName,
compaction: enabled
}
topic: Object.assign({ name: topicName }, cleanupPolicy)
},
path: `/data/kafka/${VERSION}/clusters/${addon.id}/topics/${topicName}`
})
Expand Down
29 changes: 21 additions & 8 deletions commands/topics_retention_time.js
Expand Up @@ -5,29 +5,42 @@ let co = require('co')
let parseDuration = require('../lib/shared').parseDuration
let withCluster = require('../lib/clusters').withCluster
let request = require('../lib/clusters').request
let topicConfig = require('../lib/clusters').topicConfig
let fetchProvisionedInfo = require('../lib/clusters').fetchProvisionedInfo

const VERSION = 'v0'

function * retentionTime (context, heroku) {
let parsed = parseDuration(context.args.VALUE)
if (parsed == null) {
cli.exit(1, `Unknown retention time '${context.args.VALUE}'; expected value like '36h' or '10d'`)
let parsed
if (context.args.VALUE === 'disable') {
parsed = null
} else {
parsed = parseDuration(context.args.VALUE)
if (parsed == null) {
cli.exit(1, `Unknown retention time '${context.args.VALUE}'; expected 'disable' or value like '36h' or '10d'`)
}
}

let msg = `Setting retention time for topic ${context.args.TOPIC} to ${context.args.VALUE}`
if (context.args.CLUSTER) {
msg += ` on ${context.args.CLUSTER}`
}
yield withCluster(heroku, context.app, context.args.CLUSTER, function * (addon) {
const topicName = context.args.TOPIC
let [ topicInfo, addonInfo ] = yield [
topicConfig(heroku, addon.id, topicName),
fetchProvisionedInfo(heroku, addon)
]
let cleanupPolicy = {
retention_time_ms: parsed,
compaction: (!parsed || (addonInfo.capabilities.supports_mixed_cleanup_policy && topicInfo.compaction_enabled))
}

yield cli.action(msg, co(function * () {
const topicName = context.args.TOPIC
return yield request(heroku, {
method: 'PUT',
body: {
topic: {
name: topicName,
retention_time_ms: parsed
}
topic: Object.assign({ name: topicName }, cleanupPolicy)
},
path: `/data/kafka/${VERSION}/clusters/${addon.id}/topics/${topicName}`
})
Expand Down
15 changes: 14 additions & 1 deletion lib/clusters.js
Expand Up @@ -78,6 +78,18 @@ function * topicConfig (heroku, addonId, topic) {
return forTopic
}

// Fetch kafka info about a provisioned cluster or exit with failure
function fetchProvisionedInfo (heroku, addon) {
return heroku.request({
host: host(addon),
method: 'get',
path: `/data/kafka/v0/clusters/${addon.id}`
}).catch(err => {
if (err.statusCode !== 404) throw err
cli.exit(1, `${cli.color.addon(addon.name)} is not yet provisioned.\nRun ${cli.color.cmd('heroku kafka:wait')} to wait until the cluster is provisioned.`)
})
}

function request (heroku, params) {
const h = host()
debug(`picked shogun host: ${h}`)
Expand All @@ -92,5 +104,6 @@ module.exports = {
HerokuKafkaClusters: HerokuKafkaClusters,
withCluster,
request,
topicConfig
topicConfig,
fetchProvisionedInfo
}
93 changes: 76 additions & 17 deletions test/commands/topics_compaction_test.js
Expand Up @@ -29,6 +29,14 @@ describe('kafka:topics:compaction', () => {
return `/data/kafka/v0/clusters/${cluster}/topics/${topic}`
}

let topicListUrl = (cluster) => {
return `/data/kafka/v0/clusters/${cluster}/topics`
}

let infoUrl = (cluster) => {
return `/data/kafka/v0/clusters/${cluster}`
}

beforeEach(() => {
kafka = nock('https://kafka-api.heroku.com:443')
cli.mockConsole()
Expand All @@ -52,26 +60,77 @@ describe('kafka:topics:compaction', () => {
})

const validEnable = [ 'enable', 'on' ]
validEnable.forEach((value) => {
it(`turns compaction on with argument ${value}`, () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', compaction: true } }).reply(200)

return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }})
.then(() => expect(cli.stderr).to.equal('Enabling compaction for topic topic-1... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
const validDisable = [ 'disable', 'off' ]

describe('if the cluster supports a mixed cleanup policy', () => {
beforeEach(() => {
kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123 } ] })
kafka.get(infoUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, {
capabilities: { supports_mixed_cleanup_policy: true },
limits: { minimum_retention_ms: 20 }
})
})

validEnable.forEach((value) => {
it(`uses the original retention and turns compaction on with argument ${value}`, () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', compaction: true, retention_time_ms: 123 } })
.reply(200)

return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }})
.then(() => expect(cli.stderr).to.equal('Enabling compaction for topic topic-1... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})
})

validDisable.forEach((value) => {
it(`turns compaction off and sets retention to plan minimum with argument ${value}`, () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', compaction: false, retention_time_ms: 20 } })
.reply(200)

return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }})
.then(() => expect(cli.stderr).to.equal('Disabling compaction for topic topic-1... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})
})
})

const validDisable = [ 'disable', 'off' ]
validDisable.forEach((value) => {
it(`turns compaction off with argument ${value}`, () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', compaction: false } }).reply(200)

return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }})
.then(() => expect(cli.stderr).to.equal('Disabling compaction for topic topic-1... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
describe('if the cluster does not support a mixed cleanup policy', () => {
beforeEach(() => {
kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123 } ] })
kafka.get(infoUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, {
capabilities: { supports_mixed_cleanup_policy: false },
limits: { minimum_retention_ms: 20 }
})
})

validEnable.forEach((value) => {
it(`turns off retention and turns compaction on with argument ${value}`, () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', compaction: true, retention_time_ms: null } })
.reply(200)

return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }})
.then(() => expect(cli.stderr).to.equal('Enabling compaction for topic topic-1... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})
})

validDisable.forEach((value) => {
it(`turns compaction off and sets retention to plan minimum with argument ${value}`, () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', compaction: false, retention_time_ms: 20 } })
.reply(200)

return cmd.run({app: 'myapp', args: { TOPIC: 'topic-1', VALUE: value }})
.then(() => expect(cli.stderr).to.equal('Disabling compaction for topic topic-1... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})
})
})
})
85 changes: 76 additions & 9 deletions test/commands/topics_retention_time_test.js
Expand Up @@ -29,6 +29,14 @@ describe('kafka:topics:retention-time', () => {
return `/data/kafka/v0/clusters/${cluster}/topics/${topic}`
}

let topicListUrl = (cluster) => {
return `/data/kafka/v0/clusters/${cluster}/topics`
}

let infoUrl = (cluster) => {
return `/data/kafka/v0/clusters/${cluster}`
}

beforeEach(() => {
kafka = nock('https://kafka-api.heroku.com:443')
cli.mockConsole()
Expand All @@ -46,18 +54,77 @@ describe('kafka:topics:retention-time', () => {
args: { TOPIC: 'topic-1', VALUE: '1 fortnight' },
flags: {}}))
.then(() => expect(cli.stdout).to.be.empty)
.then(() => expect(cli.stderr).to.equal(` ▸ Unknown retention time '1 fortnight'; expected value like '36h' or '10d'\n`))
.then(() => expect(cli.stderr).to.equal(` ▸ Unknown retention time '1 fortnight'; expected 'disable' or value like
▸ '36h' or '10d'
`))
})
})

it('sets retention time to the specified value', () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', retention_time_ms: 60000 } }).reply(200)
describe('if the cluster supports a mixed cleanup policy', () => {
beforeEach(() => {
kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123, compaction_enabled: false } ] })
kafka.get(infoUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, {
capabilities: { supports_mixed_cleanup_policy: true },
limits: { minimum_retention_ms: 20 }
})
})

it('sets retention time and leaves compaction as is if a value is specified', () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', retention_time_ms: 60000, compaction: false } }).reply(200)

return cmd.run({app: 'myapp',
args: { TOPIC: 'topic-1', VALUE: '60s' },
flags: {}})
.then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to 60s... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})

return cmd.run({app: 'myapp',
args: { TOPIC: 'topic-1', VALUE: '60s' },
flags: {}})
.then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to 60s... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
it('clears retention and turns on compaction if `disabled` is specified', () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', retention_time_ms: null, compaction: true } }).reply(200)

return cmd.run({app: 'myapp',
args: { TOPIC: 'topic-1', VALUE: 'disable' },
flags: {}})
.then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to disable... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})
})

describe('if the cluster does not support a mixed cleanup policy', () => {
beforeEach(() => {
kafka.get(topicListUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, { topics: [ { name: 'topic-1', retention_time_ms: 123, compaction_enabled: true } ] })
kafka.get(infoUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, {
capabilities: { supports_mixed_cleanup_policy: false },
limits: { minimum_retention_ms: 20 }
})
})

it('sets retention time and turns off compaction if a value is specified', () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', retention_time_ms: 60000, compaction: false } }).reply(200)

return cmd.run({app: 'myapp',
args: { TOPIC: 'topic-1', VALUE: '60s' },
flags: {}})
.then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to 60s... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})

it('clears retention and turns on compaction if `disabled` is specified', () => {
kafka.put(topicConfigUrl('00000000-0000-0000-0000-000000000000', 'topic-1'),
{ topic: { name: 'topic-1', retention_time_ms: null, compaction: true } }).reply(200)

return cmd.run({app: 'myapp',
args: { TOPIC: 'topic-1', VALUE: 'disable' },
flags: {}})
.then(() => expect(cli.stderr).to.equal('Setting retention time for topic topic-1 to disable... done\n'))
.then(() => expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n'))
})
})
})

0 comments on commit fcd5c7f

Please sign in to comment.