Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.
/ ghost Public archive

A Kafka wrapper with mock testing capabilities for elixir

License

Notifications You must be signed in to change notification settings

IBM/ghost

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

👻Ghost

The package can be installed by adding ghost to your list of dependencies in mix.exs:

def deps do
  [
    {:ghost, "~> 0.1.0"},
    {:brod, "~> 3.7"} # Only add if you want to use brod for your Kafka client
  ]
end

Available in Hex

Introduction

At IBM many of our internal cloud services connect via a kafka based service bus. While this allows our developers to easily exchange information, it makes testing these services extremely difficult as Kafka is neither transactional nor synchronous in many common usage patterns. Your service may publish a message to kafka, but it can take an indeterminate amount of time to read that message back making unit testing in elixir near impossible. Which is why we developed Ghost.

Ghost aims to offer the same kind testing experience as Ecto provides for databases via the Ecto.Adapter.Sandbox.

How to use it?

Production and Dev Usage

Out of the box Ghost provides a default module (Ghost) that changes it's implementation based on configuration options. These configuration options are as follows

config :ghost, Ghost,
  client: :brod_client,
  adapter: Ghost.VirtualQueue

At IBM we have both our dev.exs and prod.exs file configured somewhat like this, the host name for Kafka obviously changes between deploys, but in general that's what it looks like for both files. You configure brod to make sure you can connect to kafka in a prod environment, and specify that the implementation of Ghost you would like to use is the Kafka backend.

config :brod,
  clients: [
    brod_client: [ # notice this key is equal to client in ghost
      endpoints: [
        {'kafka.dev', 9_095}
      ],
      reconnect_cool_down_seconds: 10,
      auto_start_producers: true
    ]
  ]

config :ghost, Ghost,
  queue_adapter: Ghost.Kafka,
  client: :brod_client # notice this value is equal to the brod config

This configuration uses brod for the Ghost implementation and you would use it like this

iex(1)> Ghost.produce_sync("kafka_topic_name", "key", "payload")
{:ok, 0}

Which will place the message "payload" on the "kafka_topic_name"topic with key set to "key". This a production and dev configuration where we actually want to produce messages to Kafka.

Testing Usage

In the testing configuration we only need to add the following to our test.exs.

config :ghost, Ghost,
  queue_adapter: Ghost.VirtualQueue,
  client: nil

In this case we don't need to configure brod since in the test case we won't actually be producing messages to Kafka.

How does it work?

Every adapter must implement the Ghost.Queue behavior which includes 2 synchronous produce APIs, 2 async produce APIs, and one fetch API. Any new adapters simply implement the API and the implementation can be provided in the configuration.

defmodule Ghost.Queue do
  @doc """
  The `produce_sync/5` function will add a value to the topic and partition synchronously.
  """
  @callback produce_sync(
              atom() | pid(),
              String.t(),
              (String.t(), integer(), binary(), binary() -> {:ok, integer()}),
              binary(),
              binary()
            ) :: {:ok, integer()} | {:error, any()}

  @doc """
  Same as `produce_sync/4`, but uses a predefined partition selection function.
  """
  @callback produce_sync(atom() | pid(), String.t(), binary(), binary()) :: {:ok, integer()} | {:error, any()}

  @doc """
  This is an asynchronous produce to the queue. This function will send a message to the caller
  of a format specified by the specific implementation of `Queue`.
  """
  @callback produce(
              atom() | pid(),
              String.t(),
              (String.t(), integer(), binary(), binary() -> {:ok, integer()}),
              binary(),
              binary()
            ) :: {:ok, any()} | {:error, any()}

  @doc """
  Same as `produce/5`, but uses a predefined partition selection function.
  """
  @callback produce(
              atom() | pid(),
              String.t(),
              binary(),
              binary()
            ) :: {:ok, any()} | {:error, any()}

  @doc """
  Returns a list of messages produced by a pid to a topic in the order that they were produced in.
  """
  @callback fetch(pid(), String.t()) :: [binary()]
end

Limitations

Currently ghost only supports only provides a one kafka adapter out of the box for the brod client provided by Klarna. Internally brod is the prefered client for both Elixir and Erlang code as it is the most up to date and complete Kafka client. An adapter could be provided for kafka_ex by implementing the behavior defined in Ghost.Queue.

Releases

No releases published

Packages

No packages published

Languages