Skip to content
Reactive (RSocket/gRPC) Gateway for the event-based systems
Java Dockerfile
Branch: master
Clone or download
lanwen Init jwt call grpc authentication in configurer (#274)
* Init jwt call grpc authentication in configurer

* non-static verifier
Latest commit 80948bf Feb 20, 2020
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
.github Update ci.yml Aug 23, 2019
api
app gRPC JWT-based authentication (#258) Feb 13, 2020
client move more functionality (metrics, transports) to plugins (#155) Jul 25, 2019
examples Treat deprecations as errors (#253) Jan 31, 2020
gradle Test all plugins in isolation (#161) Jul 28, 2019
plugins Init jwt call grpc authentication in configurer (#274) Feb 20, 2020
protocol
tck Handle non-partitioned Pulsar topics (#260) Feb 6, 2020
testing use junit platform by default (#152) Jul 18, 2019
.dockerignore Remove blocking operation from "addAssignListener" May 16, 2018
.gitignore move more functionality (metrics, transports) to plugins (#155) Jul 25, 2019
.travis.yml Revert "Use gRPC BOM, run examples on Travis" Jul 10, 2019
Dockerfile move more functionality (metrics, transports) to plugins (#155) Jul 25, 2019
LICENSE MAIN add README.md and LICENSE Mar 6, 2018
README.md
build.gradle
gradle.properties
gradlew Initial commit Feb 13, 2018
gradlew.bat Initial commit Feb 13, 2018
jitpack.yml Create jitpack.yml (#82) Jan 29, 2019
settings.gradle

README.md

Liiklus

Liiklus [li:klus] ("traffic" in Estonian) - RSocket/gRPC-based Gateway for the event-based systems from the ones who think that Kafka is too low-level.

Why

  • horizontally scalable RSocket/gRPC streaming gateway
  • supports as many client languages as RSocket+gRPC do (Java, Go, C++, Python, etc...)
  • reactive first
  • Per-partition backpressure-aware sources
  • at-least-once/at-most-once delivery guarantees
  • pluggable event storage (Kafka, Pulsar, Kinesis, etc...)
  • pluggable positions storage (DynamoDB, Cassandra, Redis, etc...)
  • WIP: cold event storage support (S3, Minio, SQL, key/value, etc...)

Who is using

  • https://vivy.com/ - 25+ microservices, an abstraction in front of Kafka for the Shared Log Infrastructure (Event Sourcing / CQRS)

Quick Start

The easiest (and recommended) way to run Liiklus is with Docker:

$ docker run \
    -e kafka_bootstrapServers=some.kafka.host:9092 \
    -e storage_positions_type=MEMORY \ # only for testing, DO NOT use in production
    -p 6565:6565 \
    bsideup/liiklus:$LATEST_VERSION

Where the latest version is:

Now use LiiklusService.proto to generate your client.

The clients must implement the following algorithm:

  1. Subscribe to the assignments:
    stub.subscribe(SubscribeRequest(
        topic="your-topic",
        group="your-consumer-group",
        [autoOffsetReset="earliest|latest"]
    ))
    
  2. For every emitted reply of Subscribe, using the same channel, subscribe to the records:
    stub.receive(ReceiveRequest(
        assignment=reply.getAssignment()
    ))
    
  3. ACK records
    stub.ack(AckRequest(
        assignment=reply.getAssignment(),
        offset=record.getOffset()
    ))
    
    Note 1: If you ACK record before processing it you get at-most-once, after processing - at-least-once
    Note 2: It's recommended to ACK every n-th record, or every n seconds to reduce the load on the positions storage

Java example:

Example code using Project Reactor and reactive-grpc:

var stub = ReactorLiiklusServiceGrpc.newReactorStub(channel);
stub
    .subscribe(
        SubscribeRequest.newBuilder()
            .setTopic("user-events")
            .setGroup("analytics")
            .setAutoOffsetReset(AutoOffsetReset.EARLIEST)
            .build()
    )
    .flatMap(reply -> stub
        .receive(ReceiveRequest.newBuilder().setAssignment(reply.getAssignment()).build())
        .window(1000) // ACK every 1000th records
        .concatMap(
            batch -> batch
                .map(ReceiveReply::getRecord)
                // TODO process instead of Mono.delay(), i.e. by indexing to ElasticSearch
                .concatMap(record -> Mono.delay(Duration.ofMillis(100)))
                .sample(Duration.ofSeconds(5)) // ACK every 5 seconds
                .onBackpressureLatest()
                .delayUntil(record -> stub.ack(
                    AckRequest.newBuilder()
                        .setAssignment(reply.getAssignment())
                        .setOffset(record.getOffset())
                        .build()
                )),
            1
        )
    )
    .blockLast()

Also check examples/java/ for a complete example

Configuration

The project is based on Spring Boot and uses it's configuration system
Please check application.yml for the available configuration keys.

License

See LICENSE.

You can’t perform that action at this time.