Skip to content

Commit

Permalink
Default retention for MT plans to plan minimum on topic creation if n…
Browse files Browse the repository at this point in the history
…ot specified
  • Loading branch information
Maciek Sakrejda committed Feb 14, 2017
1 parent 800a669 commit 8b5bf02
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
15 changes: 11 additions & 4 deletions commands/topics_create.js
Expand Up @@ -6,15 +6,16 @@ let parseDuration = require('../lib/shared').parseDuration
let deprecated = require('../lib/shared').deprecated
let withCluster = require('../lib/clusters').withCluster
let request = require('../lib/clusters').request
let fetchProvisionedInfo = require('../lib/clusters').fetchProvisionedInfo

const VERSION = 'v0'

function * createTopic (context, heroku) {
let flags = Object.assign({}, context.flags)
let retentiomTimeMillis
let retentionTimeMillis
if ('retention-time' in flags) {
let retentiomTimeMillis = parseDuration(flags['retention-time'])
if (parsed == null) {
retentionTimeMillis = parseDuration(flags['retention-time'])
if (retentionTimeMillis == null) {
cli.exit(1, `Could not parse retention time '${flags['retention-time']}'; expected value like '10d' or '36h'`)
}
}
Expand All @@ -25,13 +26,19 @@ function * createTopic (context, heroku) {
msg += ` on ${context.args.CLUSTER}`
}

let addonInfo = yield fetchProvisionedInfo(heroku, addon)

if (addonInfo.shared_cluster && retentionTimeMillis === undefined) {
retentionTimeMillis = addonInfo.limits.minimum_retention_ms
}

yield cli.action(msg, co(function * () {
return yield request(heroku, {
method: 'POST',
body: {
topic: {
name: context.args.TOPIC,
retention_time_ms: retentiomTimeMillis,
retention_time_ms: retentionTimeMillis,
replication_factor: flags['replication-factor'],
partition_count: flags['partitions'],
compaction: flags['compaction'] || false
Expand Down
32 changes: 32 additions & 0 deletions test/commands/topics_create_test.js
Expand Up @@ -29,6 +29,10 @@ describe('kafka:topics:create', () => {
return `/data/kafka/v0/clusters/${cluster}/topics`
}

let infoUrl = (cluster) => {
return `/data/kafka/v0/clusters/${cluster}`
}

beforeEach(() => {
kafka = nock('https://kafka-api.heroku.com:443')

Expand All @@ -50,6 +54,8 @@ describe('kafka:topics:create', () => {
})

it('passes the topic name and specified flags', () => {
kafka.get(infoUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, { shared_cluster: false })
kafka.post(createUrl('00000000-0000-0000-0000-000000000000'),
{
topic: {
Expand All @@ -71,4 +77,30 @@ describe('kafka:topics:create', () => {
expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')
})
})

describe('for multi-tenant plans', () => {
it('defaults retention to the plan minimum if not specified', () => {
kafka.get(infoUrl('00000000-0000-0000-0000-000000000000'))
.reply(200, { shared_cluster: true, limits: { minimum_retention_ms: 66 } })
kafka.post(createUrl('00000000-0000-0000-0000-000000000000'),
{
topic: {
name: 'topic-1',
retention_time_ms: 66,
replication_factor: '3',
partition_count: '7',
compaction: false
}
}).reply(200)

return cmd.run({app: 'myapp',
args: { TOPIC: 'topic-1' },
flags: { 'replication-factor': '3',
'partitions': '7' }})
.then(() => {
expect(cli.stderr).to.equal('Creating topic topic-1... done\n')
expect(cli.stdout).to.equal('Use `heroku kafka:topics:info topic-1` to monitor your topic.\n')
})
})
})
})

0 comments on commit 8b5bf02

Please sign in to comment.