Skip to content

funbox/kashka

master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code
This branch is 15 commits ahead of netDalek:master.

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Kashka

Elixir Kafka REST proxy client for Confluent REST Proxy.

Examples

Produce

{:ok, conn} = Kashka.Kafka.produce("http://localhost:8082/", "topic_name', [%{value: %{foo: "bar"}}])
:ok == Kashka.Kafka.close(conn)

See Kashka.Kafka.produce/4 for details.

Consume

defmodule TestModule do
  @behaviour Kashka.GenConsumer

  def init(conn, _args) do
    {:ok, conn, :state}
  end

  def handle_message_set(conn, :state, message_set) do
    IO.inspect(message_set)

    #[%{"key" => nil, "offset" => 0, "partition" => 0, "topic" => "test_name", "value" => %{"foo" => "bar"}}]

    {:ok, conn, :state}
  end
end

args = [
  url: "http://localhost:8082/",
  instance_id: "my",
  consumer_group: "consumer_group",
  topics: ["topic_name"],
  module: TestModule,
  delete_on_exists: false,
  retry_on_exists: true,
  consumer_opts: %{"auto.offset.reset": :earliest, "auto.commit.enable": true},
]

{:ok, pid} = GenConsumer.start_link(args)

See Kashka.GenConsumer for details.

More

See tests for more examples.

Kafka URL explanation

Kafka URL can be a String or a Keyword list. Keyword list can contain url, headers, fix_schema, fix_port, fix_host keys.

Examples:

[url: "http://127.0.0.1:8811", headers: [{"host", "smth.com"}]]

With fix_* keys:

[url: "http://127.0.0.1", fix_port: true, fix_schema: true, fix_host: true, headers: [{"host", "smth.com"}]]

fix_* keys used to automatically preprocess the URL returned from the consumer creation method.

For example if create_consumer method returns https://smth1.com:443/comsumers/group/instances/name as a base_uri, it is transformed to http://127.0.0.1:80/comsumers/group/instances/name with host header smth1.com.

It can be helpful while connecting to Kafka REST API through Nginx proxy.

protocols: option can be used for explicit specification of HTTP protocols negotiation with Kafka REST server. It can take the following values: :http1, :http2 or :any by default.

How to run tests

  1. Run doker-compose up.
  2. Run mix test in separate terminal.

Roadmap

  • Make protection from endless loop in Kashka.Http.request.
  • Correctly process other errors in Kashka.Http.request.

Sponsored by FunBox

About

Elixir Kafka REST proxy client for Confluent REST Proxy

Topics

Resources

License

Stars

Watchers

Forks

Languages

  • Elixir 99.9%
  • Makefile 0.1%