Skip to content

helpshift/kafboy

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

30 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kafboy

a low latency http server for writing to kafka. Optimized for heavy loads, hundreds of partition workers, supports batching, and more. Written in Erlang. Powered by ekaf and Cowboy

ordered_round_robin see https://github.com/helpshift/ekaf for more information

Architecture

With 0.8, Kafka clients take greater responsibility of deciding which broker and partition to publish to for a given topic.

kafboy is a http wrapper over the ekafka client, that takes care of routing http requests to the right kafka broker socket. kafboy is self-aware over a cluster, and supoprts nodes routing requests arriving on any node, to the right process in the cluster.

Simply send a POST with the desired JSON, to one of the following paths

Fire and forget

% fire and forget asynchronous call. the event is immediately send to kafka asynchronously
POST /async/topic

Synchronous calls

% synchronous call that returns with the response after sending to kafka
% `NOTE: a reply is sent until after kafka resonds, so is not recommended for low latency needs`
POST /sync/topic

Batching

% will be added to a queue, and sent to the broker in a batch.
% batch size, and flush timeout are configurable
POST /batch/async/topic

The payload is expected to be of the JSON format, but this can be configured to send the data as is. Very little else is done by this server in terms of dealing with kafka. It simply calls ekafka's produce function.

Configuring kafboy

{kafboy,[
    % optional. you get to edit the json before it goes to kafka over here
    {kafboy_callback_edit_json, {my_module, massage_json}},
    % M:F({post, Topic, Req, Json, Callback}) will be called. return with what you want to send to kafka
    % if an error occurs M:F({error, StatusCode, Message}) wil be called

    % optional.
    {kafboy_load_balancer, "http://localhost:8080/disco"}
    % should return plaintext of a node name with the right cookie eg: `node2@some-host`
    % can be used to distribute work to other nodes if ekaf thinks this one is too busy

    % optional, see more in kafboy_app.erl
    {kafboy_routes_async_batch,["/1/foo/:topic"]},
    {kafboy_routes_async,[]},
    {kafboy_routes_sync,[]}
]}

In this example, you have to implement my_module:massage_json/1, on the lines of

massage_json({post, Topic, Req, Body, Callback})->
    Callback ! { edit_json_callback, Topic, Body }.

Here is a more elaborate example:

%% Let's check for the contents of Body
%% and if its valid, add an extra field
%% and then submit to kafka
massage_json({post, Topic, _Req, Body, CallbackPid})->
    case Body of
        [{<<"hello">>, Foo}] ->
            % either reply like this
            CallbackPid ! { edit_json_callback, Topic, Foo };
        [] ->
            CallbackPid ! { edit_json_callback, {error, <<Topic/binary,".insufficient">>}};
        _ ->
            %% i want to first reply
            CallbackPid ! { edit_json_callback, {200, <<"{\"ok\":\"fast reply\"}">>}},

            %% then directly call ekaf, adding this msg to a batch
            Final = jsx:encode([{<<"extra">>,<<"true">>}| Body]),
            ekaf:produce_async_batched(Topic, Final)
    end;
massage_json({error, Status, Message}) ->
    io:format("~n some ~p error: ~p",[Status, Message]),
    ok.

kafboy will handle sending batch requests where the batch size is configurable, disconnections with brokers, and max retries.

To see the API of ekaf, see http://github.com/helpshift/ekaf

Quick start

On terminal 1

git clone https://github.com/helpshift/kafboy
cd kafboy
rebar get-deps compile
erl -pa deps/*/ebin -pa ebin -s kafboy_demo

On terminal 2

curl localhost:9903/batch/async/ekaf -XPOST  -d 'test=a'
{"ok":"fast reply"}
 
curl localhost:9903/batch/async/ekaf -XPOST  -d 'hello=a'
{"ok":1}

curl localhost:9903/batch/async/ekaf -XPOST 
{"error":"ekaf.insufficient"}

Configuring ekaf

An example ekaf config

{ekaf,[

    % required.
    {ekaf_bootstrap_broker, {"localhost", 9091} },
    % pass the {BrokerHost,Port} of atleast one permanent broker. Ideally should be
    %       the IP of a load balancer so that any broker can be contacted


    % optional
    {ekaf_per_partition_workers,100},
    % how big is the connection pool per partition
    % eg: if the topic has 3 partitions, then with this eg: 300 workers will be started


    % optional
    {ekaf_max_buffer_size, [{<<"topic">>,10000},                % for specific topic
                            {ekaf_max_buffer_size,100}]},       % for other topics
    % how many events should the worker wait for before flushing to kafka as a batch


    % optional
    {ekaf_partition_strategy, random}
    % if you are not bothered about the order, use random for speed
    % else the default is ordered_round_robin


]},

To see how to configure the number of workers per topic+partition, the buffer batch size, buffer flush ttl, and more see the extensive README for ekaf https://github.com/helpshift/ekaf

License

Copyright 2014, Helpshift, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Add a feature request at https://github.com/helpshift/ekaf or check the ekaf web server at https://github.com/helpshift/kafboy

About

A low latency HTTP server for writing to Kafka

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages