Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add GitHub actions #50

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Docker confluentic version tag
TAG=5.5.4
17 changes: 17 additions & 0 deletions .github/workflows/github-actions-demo.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: GitHub Actions Demo
on: [push]
jobs:
Explore-GitHub-Actions:
runs-on: ubuntu-latest
steps:
- run: echo "🎉 The job was automatically triggered by a ${{ github.event_name }} event."
- run: echo "🐧 This job is now running on a ${{ runner.os }} server hosted by GitHub!"
- run: echo "🔎 The name of your branch is ${{ github.ref }} and your repository is ${{ github.repository }}."
- name: Check out repository code
uses: actions/checkout@v2
- run: echo "💡 The ${{ github.repository }} repository has been cloned to the runner."
- run: echo "🖥️ The workflow is now ready to test your code on the runner."
- name: List files in the repository
run: |
ls ${{ github.workspace }}
- run: echo "🍏 This job's status is ${{ job.status }}."
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
node_modules/
/.idea
/dist

*.crt
*.cert
*.pem
*.key
**/*.secret.*

yarn-error.log
8 changes: 8 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"semi": false,
"tabWidth": 2,
"proseWrap": "never",
"printWidth": 120,
"arrowParens": "always",
"trailingComma": "es5"
}
165 changes: 86 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,119 +1,126 @@
# avro-schema-registry
# schematic-kafka

Confluent Schema Registry implementation to easily serialize and deserialize kafka messages with only one peer depencency.
This work is based on [avro-schema-registry](https://github.com/bencebalogh/avro-schema-registry).

# Quickstart
There are a couple of differences, the obvious being a pure typescript implementation.

```
const registry = require('avro-schema-registry')('https://host.com:8081');
This lib is **schema type agnostic**. It works fine with whatever protocol you may want to use, but it doesn't take care of this aspect.

const schema = {type: 'string'};
const message = 'test message';
## Quickstart

registry.encodeMessage('topic', schema, message)
.then((msg) => {
console.log(msg); // <Buffer 00 00 00 00 01 18 74 65 73 74 20 6d 65 73 73 61 67 65>
### Install

return registry.decode(msg);
})
.then((msg) => {
console.log(msg); // test message
});
```
npm install avsc schematic-kafka
# or
yarn add avsc schematic-kafka
```

registry.encodeById(1, message)
.then((msg) => {
console.log(msg); // <Buffer 00 00 00 00 01 18 74 65 73 74 20 6d 65 73 73 61 67 65>
### Use

return registry.decode(msg);
```
import { KafkaRegistryHelper, SchemaType } from "schematic-kafka"
import { parse, Type as AVSCInstance } from "avsc"

// create instance
const registry = new KafkaRegistryHelper({ baseUrl: "https://schemaRegistryHost:8081" })
.withSchemaHandler(SchemaType.AVRO, (schema: string) => {
// if you want to customize your encoder, this is where you'd do it
const avsc: AVSCInstance = parse(schema)
return {
encode: (message: any) => {
return avsc.toBuffer(message)
},
decode: (message: Buffer) => {
return avsc.fromBuffer(message)
},
}
})

// how to decode a message from kafka
// AVSC return parsed json, so decodedMessage this is an already object, ready to use
const decodedMessage = await registry.decode(rawMessageFromKafka)

// how to encode a message with a schema
// where
// - subject is the kafka topic plus the (-key, -value) postfix
// - message the actual message to send (this has to be in whatever format
// the schema handler defined above expects in the encode-function)
// - schemaType (optional) AVRO/PROTOBUF/JSON
// - schema (optional) serialized schema to be used
// returns a Buffer that you can send to the kafka broker
const encodeResult = await registry.encodeForSubject(subject, message, SchemaType.AVRO, schema)
```

# Install
For more examples, take a look at `src/kafka-registry-helper.testcontainer.spec.ts`.

## How this library works

This is how a kafka message looks like when you send or receive it.

```
npm install avsc // if not already installed
npm install avro-schema-registry
[ 1 byte | 0 | 0 indicates this message is schema encoded ]
[ 4 bytes | number | schema id ]
[ n bytes | msg | protocol encoded message ]
```

# Doc
The first byte being a zero tells us that the following four bytes contain the schema id. With this schema id we can request the schema type (AVRO, PROTOBUF or JSON) and schema (serialized representation of the schema for the respective schema type) from the schema registry.

The module exports one function only, which expects a `url` parameter, which is a Confluent Schema Registry endpoint and an optional auth object. The function returns an object .
This library can decodes whole kafka message header and then calls the appropriate decoder that you provide with the schema as argument.

Every method returns a Promise.
Every method uses an internal cache to store already retrieved schemas and if the same id or schema is used again it won't perform another network call. Schemas are cached with their parsing options.
## Documentation

## Authentication with the Schema Registry
### Client SSL authentication

You can set username and password in the url object:
```
require('avro-schema-registry')('https://username:password@host.com:8081');
```
This library uses node's http/https request. As such you can provide an Agent to modify your requests.

You can pass in an optional second parameter for the registry, with the username and password:
```
require('avro-schema-registry')('https://host.com:8081', {username: 'username', password: 'password'});
import { Agent } from "https"

const agent = new Agent({
key: readFileSync("./client.key"),
cert: readFileSync("./client.cert"),
})
new KafkaRegistryHelper({ baseUrl: "https://schemaRegistryHost:8081", agent })
...
```

If both the url contains the authencation information and there's an authentication object parameter then the object takes precedence.

## decode
Parameters:
- msg: object to decode
- parseOptions: parsiong options to pass to `avsc.parse`, default: `null`
### Basic authentication

Decodes an avro encoded buffer into a javascript object.

## decodeMessage
Same as **decode**, only exists for backward compatibility reason.

## encodeKey
Parameters:
- topic: the topic to register the schema, if it doesn't exist already in the registry. The schema will be put under the subject `${topic}-key`
- schema: object representing an avro schema
- msg: message object to be encoded
- parseOptions: parsiong options to pass to `avsc.parse`, default: `null`

Encodes an object into an avro encoded buffer.
```
new KafkaRegistryHelper({ baseUrl: "https://schemaRegistryHost:8081", username: "username", password: "password })

## encodeMessage
Parameters:
- topic: the topic to register the schema, if it doesn't exist already in the registry. The schema will be put under the subject `${topic}-value`
- schema: object representing an avro schema
- msg: message object to be encoded
- parseOptions: parsiong options to pass to `avsc.parse`, default: `null`
// OR

Encodes a message object into an avro encoded buffer.
new KafkaRegistryHelper({ baseUrl: "https://username:password@schemaRegistryHost:8081" })
```

## encodeById
Parameters:
- id: schema id in the registry
- msg: message object to be encoded
- parseOptions: parsiong options to pass to `avsc.parse`, default: `null`
# Doc

Encodes a message object into an avro encoded buffer by fetching the schema from the registry.
The module exports one function only, which expects a `url` parameter, which is a Confluent Schema Registry endpoint and an optional auth object. The function returns an object .

## encodeMessageByTopicName
Every method returns a Promise. Every method uses an internal cache to store already retrieved schemas and if the same id or schema is used again it won't perform another network call. Schemas are cached with their parsing options.

Try to get already existing schema from the schema registry and encode message with obtained schema. Please note, that the latest schema will be obtained.
## Authentication with the Schema Registry

This may be useful when topic consumer is on duty of providing the schema for the topic's messages.
You can set username and password in the url object:

Parameters:
```
require('avro-schema-registry')('https://username:password@host.com:8081');
```

- topic: topic name to fetch schema for
- msg: message object to be encoded
- parseOptions: parsiong options to pass to `avsc.parse`, default: `null`
You can pass in an optional second parameter for the registry, with the username and password:

## getSchemaByTopicName
```
require('avro-schema-registry')('https://host.com:8081', {username: 'username', password: 'password'});
```

This method tries to get already existing schema from the schema registry. Please note, that the latest schema will be obtained.
If both the url contains the authencation information and there's an authentication object parameter then the object takes precedence.

Parameters:
## feature x

- topic: topic name to fetch schema for
- parseOptions: parsiong options to pass to `avsc.parse`, default: `null`
TODO - document things hint: Most methods have jsdoc comments on them. Have a look.

# Peer dependency
# Dependencies

The module has no dependency, only one peer dependency: [avsc](https://github.com/mtth/avsc)
The module has just the tslib as dependency.
69 changes: 69 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
version: "3.1"

services:
zookeeper:
image: confluentinc/cp-zookeeper:${TAG}
hostname: zookeeper
# ports:
# - 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:${TAG}
restart: always
hostname: broker
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

kafka-tools:
image: confluentinc/cp-kafka:${TAG}
command: ["tail", "-f", "/dev/null"]

schema-registry:
image: confluentinc/cp-schema-registry:${TAG}
hostname: schema-registry
depends_on:
- zookeeper
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:9092"

control-center:
image: confluentinc/cp-enterprise-control-center:${TAG}
hostname: control-center
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 9021:9021
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:9092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
20 changes: 0 additions & 20 deletions index.d.ts

This file was deleted.

9 changes: 9 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
globals: {
"ts-jest": {
tsconfig: "tsconfig.spec.json",
},
},
}
Loading