Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-lippert committed Aug 16, 2023
1 parent 29a0ee8 commit 6d3d9a5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 129 deletions.
56 changes: 37 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,63 +1,81 @@
# kafka.do

kafka.do is a simplified interface for Kafka topics, allowing you to easily manage topics for subscriptions and webhooks.
`kafka.do` is a simple API for managing Kafka-based queues. Webhook URLs can be added to retrofit existing applications to consume from queues without needing to modify the application itself to subscribe to Kafka topics. Below are the available endpoints.

## Endpoints

### List all queues

- `GET /`
```http
GET /
```

### Consume from a queue

- `GET /:queue`
```http
GET /:queue
```

### Produce a message to a queue

- `GET /:queue/send/:message`
```http
GET /:queue/send/:message
```

### Send a batch of messages

- `POST /:queue/sendBatch`
```http
POST /:queue/sendBatch
```

```json
["message1", "message2"]
```
Payload:

```json
["message1", "message2"]
```

### Acknowledge all messages as consumed

- `GET /:queue/ackAll`
```http
GET /:queue/ackAll
```

### Mark all messages to be retried

- `GET /:queue/retryAll`
```http
GET /:queue/retryAll
```

### Acknowledge a message as consumed

- `GET /:queue/ack/:messageId`
```http
GET /:queue/ack/:messageId
```

### Mark a message to be retried

- `GET /:queue/retry/:messageId`
```http
GET /:queue/retry/:messageId
```

### Automatically consume to a webhook URL

- `GET /:queue/webhook/:url`
```http
GET /:queue/webhook/:url
```

### List a queue's webhooks

- `GET /:queue/webhook`
```http
GET /:queue/webhook
```

## Parameters

Each endpoint besides list creates the queue if it does not exist and accepts the following parameters to change queue behavior:

- `maxBatchSize`: The maximum number of messages allowed in each batch.

- `maxBatchTimeout`: The maximum number of seconds to wait until a batch is full.

- `maxRetries`: The maximum number of retries for a message, if it fails or retryAll is invoked.

- `deadLetterQueue`: The name of another queue to send a message if it fails processing at least maxRetries times. If a deadLetterQueue is not defined, messages that repeatedly fail processing will eventually be discarded. If there is no queue with the specified name, it will be created automatically.

- `maxConcurrency`: The maximum number of concurrent consumers allowed to run at once. Leaving this unset will mean that the number of invocations will scale to the currently supported maximum.
- `maxConcurrency`: The maximum number of concurrent consumers allowed to run at once. Leaving this unset means that the number of invocations will scale to the currently supported maximum.
132 changes: 22 additions & 110 deletions worker.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { Router, json } from 'itty-router'
import { Kafka } from '@upstash/kafka'

const kafkaConfig = {
url: env.KAFKA_URL,
username: env.KAFKA_USERNAME,
password: env.KAFKA_PASSWORD,
}

const withCtx = async (request, env) => {
request.ctx = await env.CTX.fetch(req).then((res) => res.json())
request.ctx = await env.CTX.fetch(request).then((res) => res.json())
console.log(request)
if (!request.ctx.user) {
return Response.redirect('/login')
}
Expand All @@ -12,127 +19,32 @@ const withCtx = async (request, env) => {
description: 'Cloudflare Worker API for Kafka with webhooks',
url: 'https://kafka.do',
endpoints: {
topics: request.ctx.origin + '/topics',
producer: request.ctx.origin + '/producer/:topic/:message',
consumer: request.ctx.origin + '/consumer/:topic',
consumerBulk: request.ctx.origin + '/consumer/:topic/:count',
fetch: request.ctx.origin + '/fetch/:topic',
fetchBulk: request.ctx.origin + '/fetch/:topic/:count',
listAll: request.ctx.origin + '/',
consume: request.ctx.origin + '/:queue',
produce: request.ctx.origin + '/:queue/send/:message',
acknowledgeAll: request.ctx.origin + '/:queue/ackAll',
retryAll: request.ctx.origin + '/:queue/retryAll',
acknowledge: request.ctx.origin + '/:queue/ack/:messageId',
retry: request.ctx.origin + '/:queue/retry/:messageId',
createWebhook: request.ctx.origin + '/:queue/webhook/:url',
listWebhooks: request.ctx.origin + '/:queue/webhook',
},
memberOf: 'https://apis.do/pubsub',
login: request.ctx.origin + '/login',
logout: request.ctx.origin + '/logout',
repo: 'https://github.com/drivly/kafka.do',
}
request.kafkaConfig = {
url: env.KAFKA_URL,
username: env.KAFKA_USERNAME,
password: env.KAFKA_PASSWORD,
}
}

const router = Router()
router.all('*', withCtx)

router.get('/topics', async (request) => {
const kafka = new Kafka(request.kafkaConfig)
const admin = kafka.admin()

try {
const topics = await admin.topics()
return json({ api: request.api, topics, user: request.ctx.user }, { status: 200 })
} catch (error) {
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

router.get('/producer/:topic/:message', async (request) => {
const { topic, message } = request.params
const kafka = new Kafka(request.kafkaConfig)
const producer = kafka.producer()

try {
const res = await producer.produce(topic, message)

return json({ api: request.api, topic, message, offset: res.result.baseOffset, user: request.ctx.user }, { status: 200 })
} catch (error) {
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

async function consumeMessages(topic, groupId) {
const kafka = new Kafka(request.kafkaConfig)
const consumer = kafka.consumer()

try {
const messages = await consumer.consume({
consumerGroupId: groupId,
instanceId: 'instance_1',
topics: [topic],
autoOffsetReset: 'earliest',
})

return messages
} catch (error) {
throw error
}
}

router.get('/consumer/:topic', async (request) => {
try {
const { topic, groupId } = request.params
const messages = await consumeMessages(topic, groupId || request.ctx.user.id)

return json(
{
api,
topic,
messages:
messages && messages.length > 0
? messages.map((message) => ({
message: message.value,
offset: message.offset,
}))
: [],
user: request.ctx.user,
},
{ status: 200 }
)
} catch (error) {
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

router.get('/fetch/:topic', async (request) => {
try {
const { topic } = request.params
const consumer = new Kafka(request.kafkaConfig).consumer()

const messages = await consumer.fetch({ topic, partition: 0, offset: 0 })

return json(
{
api,
topic,
messages:
messages && messages.length > 0
? messages.map((message) => ({
message: message.value,
offset: message.offset,
}))
: [],
user: request.ctx.user,
},
{ status: 200 }
)
} catch (error) {
return json({ api: request.api, error: error.message, user: request.ctx.user }, { status: 500 })
}
})

router.get('*', () => new Response('Not Found.', { status: 404 }))
router.get('*', (request) => json({ api: request.api, error: 'Not Found', user: request.ctx.user }, { status: 404 }))

export default {
async fetch(request) {
return router.all('*', withCtx).handle(request, env)
},
fetch(request, env) {
return router.handle(request, env)
},
}

0 comments on commit 6d3d9a5

Please sign in to comment.