Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ausimian committed Jan 26, 2022
0 parents commit a678d30
Show file tree
Hide file tree
Showing 17 changed files with 1,823 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
32 changes: 32 additions & 0 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: CI

on: [push, pull_request]

jobs:
build:

name: Build and test
runs-on: ubuntu-latest

env:
MIX_ENV: test
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Elixir
uses: erlef/setup-beam@988e02bfe678367a02564f65ca2e37726dc0268f
with:
elixir-version: '1.12.3' # Define the elixir version [required]
otp-version: '24.1' # Define the OTP version [required]
- name: Restore dependencies cache
uses: actions/cache@v2
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-
- name: Install dependencies
run: mix deps.get
- name: Run tests
run: mix coveralls.github
38 changes: 38 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Release

on: workflow_dispatch

jobs:
build:

name: Publish hex package
runs-on: ubuntu-latest

env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
HEX_API_KEY: ${{ secrets.HEX_API_KEY }}
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Fetch tags
run: git fetch --tags --force
- name: Set up Elixir
uses: erlef/setup-beam@988e02bfe678367a02564f65ca2e37726dc0268f
with:
elixir-version: '1.12.3' # Define the elixir version [required]
otp-version: '24.1' # Define the OTP version [required]
- name: Restore dependencies cache
uses: actions/cache@v2
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-
- name: Install dependencies
run: mix deps.get
- name: Compile
run: mix compile
- name: Build hex package
run: mix hex.build
- name: Publish hex package
run: mix hex.publish --yes
29 changes: 29 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
stagger-*.tar

# Temporary files, for example, from tests.
/tmp/

# LSP
.elixir_ls
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Changelog

## 0.1.0 (2022-01-26) - Initial revision

22 changes: 22 additions & 0 deletions LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# MIT License

Copyright (c) 2022 Nick Gunn

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Stagger

[![Build Status](https://github.com/ausimian/stagger/actions/workflows/elixir.yml/badge.svg)](https://github.com/ausimian/stagger/actions?query=workflow%3A%22CI%22)
[![Coverage Status](https://coveralls.io/repos/github/ausimian/stagger/badge.svg?branch=main)](https://coveralls.io/github/ausimian/stagger?branch=main)
[![Hex](https://img.shields.io/hexpm/v/stagger.svg)](https://hex.pm/packages/stagger)
[![Hex Docs](https://img.shields.io/badge/hex-docs-blue.svg)](https://hexdocs.pm/stagger)

Point-to-point, durable message-queues as GenStage producers.

Stagger enables the creation of GenStage processes that enqueue terms to simple,
file-backed message-queues, allowing the producer and consumer to run independently
of each other, possibly at different times.

+----------+ +----------+ +----------+ +------------+
| Upstream | | MsgQueue | | MsgQueue | | Downstream |
| | -> | | <- | | <---> | |
| Client | | Producer | | Consumer | | Processing |
+----------+ +----------+ +----------+ +------------+
| | read
write | |
+------+
| FILE |
| |
| |
+------+

Your upstream client writes its events into the message-queue (provided by
Stagger), which persists them to local storage. Your (GenStage) consumer, subscribes
to the producer and receives events, via this local storage.
## Installation

The package can be installed by adding `stagger` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:stagger, "~> 0.1.0"}
]
end
```

The docs can be found at <https://hexdocs.pm/stagger>.

## Copyright and License

Copyright (c) 2022, Nick Gunn

Stagger runtime source code is licensed under the [MIT License](LICENSE.md).
Stagger test source code is licensed under the [GPL3 License](test/LICENSE).

129 changes: 129 additions & 0 deletions lib/stagger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
defmodule Stagger do
@moduledoc ~S"""
Point-to-point, durable message-queues as GenStage producers.
Stagger enables the creation of GenStage processes that enqueue terms to simple,
file-backed message-queues, allowing the producer and consumer to run independently
of each other, possibly at different times.
+----------+ +----------+ +----------+ +------------+
| Upstream | | MsgQueue | | MsgQueue | | Downstream |
| | -> | | <- | | <---> | |
| Client | | Producer | | Consumer | | Processing |
+----------+ +----------+ +----------+ +------------+
| | read
write | |
+------+
| FILE |
| |
| |
+------+
Your upstream client writes its events into the message-queue (provided by
Stagger), which persists them to local storage. Your (GenStage) consumer, subscribes
to the producer and receives events, via this local storage.
## Producers
Upstream clients must first open their message-queue, via `open/1`, and then use the
resulting process to enqueue writes, via `write/2`.
{:ok, pid} = Stagger.open("/path/to/msg/queue")
...
:ok = Stagger.write(pid, "foo")
:ok = Stagger.write(pid, "bar")
:ok = Stagger.write(pid, "baz")
The process created via `open/1` is the GenStage MsgQueue - by writing entries to it,
it will satisfy demand from a downstream consumer.
## Consumers
Downstream clients are GenStage consumers. They must also open the message-queue, via
`open/1` and then subscribe use existing GenStage subscription facilities:
def init(args) do
{:ok, pid} = Stagger.open("/path/to/msg/queue")
{:ok, sub} = GenStage.sync_subscribe(self(), to: pid, ack: last_processed())
...
end
def handle_events(events, _from, stage) do
...
end
## Sequence numbers
Sequence numbers are used to control the events seen by a subscriber. Every event
delivered to a consumer is a 2-tuple of `{seqno, msg}` and it is the consumer's
responsibility to successfully record this sequence number as having been
processed.
A consumer must indicate its last-processed sequence number by passing `ack: N` in
the subscription options (pass `ack: 0` when no such number exists) whenever it
(re)subscribes. Event delivery will resume from the Nth + 1 event.
Every message _written_ to the message-queue is assigned an incrementing sequence number
by the Stagger process. When an existing message queue is re-opened, the process will
first recover the last written number, using that as the base for any subsequent writes.
## Purging
In order to prevent unconstrained growth of the message-queue file, a consumer may
periodically purge the queue of old entries by passing a `purge: N` option when it
(re)subscribes e.g:
last = last_processed()
{:ok, sub} = GenStage.sync_subscribe(self(), to: pid, ack: last, purge: last)
All entries _up to and including_ N are removed from the head of message-queue file.
The value of N will be capped to no more than the value of the last ack'd message.
To summarize:
- `ack: N` determines that the next delivered message will have a seqno of N + 1
- `purge: M` is a hint to the producer to remove messages 1..M from the head of
the message queue.
## Why not RabbitMQ?
If you think you need something like RabbitMQ, you probably do :-). Stagger is
intended to be a lightweight durable message queue with minimal dependencies.
"""

@doc """
Open a message-queue file, returning the pid responsible for managing it.
The resulting pid can be used by upstream clients to enqueue messages via `write/2`, or
may be used as the target of a GenStage subscribe operation.
The following option may be passed to the function:
* `hibernate_after: N` - After a period of _N_ milliseconds, the returned process will
hibernate. If unspecified, defaults to 15 seconds. Pass `hibernate_after: :infinite`
to inhibit this behaviour. This option only takes effect if the process managing the
queue is created by the call to `open/2`.
"""
@spec open(binary, Keyword.t) :: :ignore | {:error, any} | {:ok, any} | {:ok, pid, any}
def open(path, opts \\ []) when is_binary(path) do
args = Map.new(opts) |> Map.put(:path, path)
case DynamicSupervisor.start_child(Stagger.MsgQueueSupervisor, {Stagger.MsgQueue, args}) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
{:ok, pid}
error ->
error
end
end

@doc """
Write a term to the message-queue.
"""
@spec write(pid :: atom | pid | {atom, any} | {:via, atom, any}, term :: any, timeout :: :infinite | non_neg_integer()) :: :ok | {:error, any}
def write(pid, term, timeout \\ 5000) do
Stagger.MsgQueue.write(pid, term, timeout)
end

end
18 changes: 18 additions & 0 deletions lib/stagger/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Stagger.Application do
@moduledoc false

use Application

@impl true
def start(_type, _args) do
children = [
# A registry used to hold each MsgQueue, keyed by the path of the MsgQueue file
%{id: Stagger.MsgQueueRegistry, start: {Registry, :start_link, [[keys: :unique, name: Stagger.MsgQueueRegistry]]}},
# The supervisor for the MsgQueue processes
{DynamicSupervisor, strategy: :one_for_one, name: Stagger.MsgQueueSupervisor},
]

opts = [strategy: :one_for_one, name: Stagger.Supervisor]
Supervisor.start_link(children, opts)
end
end

0 comments on commit a678d30

Please sign in to comment.