Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for alternate backends, including a NATS adapter #79

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/docs/
/lib/
/bin/
lib/
bin/
/.shards/
*.dwarf

Expand Down
39 changes: 39 additions & 0 deletions examples/multi_backend/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Multi-Backend example

This is an example Turbo+Cable app to demo the use of multiple backends with the Cable shard.

## Installation

To use the Redis and NATS backends, you will need to have access to running
Redis and NATS servers. A package manager for your operating system can simplify
the installation of them on your machine.

If you don't want to install a NATS server, you can use publicly available servers. For example, there is a public NATS server available at `demo.nats.io` — just don't use it for production. 😄

Once you have Redis and NATS installed, install the Crystal dependencies:

```shell
shards install
```

## Usage

To use either backend, specify the url in the `CABLE_BACKEND_URL` environment variable:

```shell
CABLE_BACKEND_URL=redis:///
CABLE_BACKEND_URL=nats:///
CABLE_BACKEND_URL=nats://demo.nats.io/
```

If you would like to see the messages passing through Redis when using the Redis backend, you can use the Redis CLI with the following command:

```shell
redis-cli subscribe time
```

If you would like to see the messages passing through NATS when using the NATS backend, you can use the [NATS CLI](https://github.com/nats-io/natscli), which may need to be installed separately from the NATS server.

```shell
nats sub time
```
42 changes: 42 additions & 0 deletions examples/multi_backend/shard.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: 2.0
shards:
base32:
git: https://github.com/jgaskins/base32.git
version: 0.1.1+git.commit.18f5647c42dae4de654e3003825fab43dc95b029

cable:
path: ../..
version: 0.2.2

cron_parser:
git: https://github.com/kostya/cron_parser.git
version: 0.4.0

db:
git: https://github.com/crystal-lang/crystal-db.git
version: 0.12.0

future:
git: https://github.com/crystal-community/future.cr.git
version: 1.0.0

habitat:
git: https://github.com/luckyframework/habitat.git
version: 0.4.7

nats:
git: https://github.com/jgaskins/nats.git
version: 1.3.3

redis:
git: https://github.com/jgaskins/redis.git
version: 0.7.0

tasker:
git: https://github.com/spider-gazelle/tasker.git
version: 2.1.4

turbo:
git: https://github.com/jgaskins/turbo.git
version: 0.1.0+git.commit.8685616e26d7903d1559f5f3f8b96085bc10af12

23 changes: 23 additions & 0 deletions examples/multi_backend/shard.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: multi_backend
version: 0.1.0

authors:
- Jamie Gaskins <jgaskins@hey.com>

targets:
multi_backend:
main: src/multi_backend.cr

dependencies:
cable:
path: ../..
nats:
github: jgaskins/nats
redis:
github: jgaskins/redis
turbo:
github: jgaskins/turbo

crystal: 1.9.2

license: MIT
48 changes: 48 additions & 0 deletions examples/multi_backend/src/multi_backend.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require "turbo/cable"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Cable adopted a bunch of the customizations I'd made to my fork, I didn't have to make a single change to my turbo/cable integration! 💯

require "cable/backend/nats"
require "cable/backend/redis/backend"

module AppCable
class Connection < Cable::Connection
identified_by id

getter id = UUID.random.to_s

def connect
end
end
end

Cable.configure do |settings|
settings.route = "/cable" # the URL your JS Client will connect
# settings.url = "redis:///"
# settings.url = ENV.fetch("NATS_URL", "nats:///")
settings.url = ENV.fetch("CABLE_BACKEND_URL", "redis:///")
end
Comment on lines +16 to +21
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that in the settings for the demo we aren't actually setting backend_class anywhere. Which backend is used is based entirely on url.


Turbo::StreamsChannel.signing_key = "this is my signing key"

spawn do
loop do
duration = Time.measure do
Turbo::StreamsChannel.broadcast_update_to "time",
message: Time.local.to_s
end
sleep 1.second - duration
end
end

http = HTTP::Server.new([
HTTP::LogHandler.new,
Cable::Handler(AppCable::Connection).new,
]) do |context|
context.response << <<-HTML
<!doctype html>
#{Turbo.javascript_tag}
#{Turbo.cable_tag}
#{Turbo::Frame.new(id: "time") { }}
#{Turbo.stream_from "time"}
Comment on lines +41 to +44
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It blew my mind that this was all we needed to render in order to demo this. 🤯

HTML
end

http.listen 3200
64 changes: 64 additions & 0 deletions src/backend/nats.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
require "nats"

module Cable
class NATSBackend < BackendCore
register "nats"

getter nats : NATS::Client do
NATS::Client.new(URI.parse(Cable.settings.url))
end
getter streams = Hash(String, Set(NATS::Subscription)).new { |streams, channel|
streams[channel] = Set(NATS::Subscription).new
}
Comment on lines +10 to +12
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to use {} instead of do/end here due to precedence.

The reason we had to include this is because you can't unsubscribe from a NATS subject directly. You have to unsubscribe from the subscription id since you can subscribe to the same subject multiple times.


def subscribe_connection
nats
end

def publish_connection
nats
end

def close_subscribe_connection
nats.close rescue nil
end

def close_publish_connection
nats.close rescue nil
end
Comment on lines +22 to +28
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NATS multiplexes everything over a single TCP connection (no connection pool needed), so we ignore any errors caused by closing the client twice.

This might be too naive since NATS::Client#close also drains subscriptions (processes any pending messages and synchronous requests), which can also raise errors, so we may want this to be a bit more precise on what exceptions it swallows silently.


def open_subscribe_connection(channel)
nats
end

def publish_message(stream_identifier : String, message : String)
nats.publish stream_identifier, message
end

def subscribe(stream_identifier : String)
subscription = nats.subscribe stream_identifier, queue_group: object_id.to_s do |msg|
Cable.server.fiber_channel.send({
stream_identifier,
String.new(msg.body),
})
end
streams[stream_identifier] << subscription
end

def unsubscribe(stream_identifier : String)
if subscriptions = streams.delete(stream_identifier)
subscriptions.each do |subscription|
nats.unsubscribe subscription
end
end
end

def ping_redis_subscribe
nats.ping
end

def ping_redis_publish
nats.ping
end
end
end
5 changes: 5 additions & 0 deletions src/backend/redis/backend.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
require "redis"

module Cable
class RedisBackend < Cable::BackendCore
register "redis"
register "rediss"
Comment on lines +5 to +6
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registering which URI schemes you support is as simple as this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐍


# connection management
getter redis_subscribe : Redis::Connection = Redis::Connection.new(URI.parse(Cable.settings.url))
getter redis_publish : Redis::Client = Redis::Client.new(URI.parse(Cable.settings.url))
Expand Down
9 changes: 6 additions & 3 deletions src/cable.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require "habitat"
require "json"
require "redis"
require "./cable/**"

# TODO: Write documentation for `Cable`
Expand Down Expand Up @@ -32,8 +31,12 @@ module Cable
setting token : String = "token", example: "token"
setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379"
setting disable_sec_websocket_protocol_header : Bool = false
setting backend_class : Cable::BackendCore.class = Cable::RedisBackend, example: "Cable::RedisBackend"
setting redis_ping_interval : Time::Span = 15.seconds
setting backend_class : Cable::BackendCore.class = Cable::RegistryBackend, example: "Cable::RedisBackend"
setting backend_ping_interval : Time::Span = 15.seconds
@[Deprecated("Use backend_ping_interval")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be curious to see if this shows... Since the method doesn't take annotations in to consideration, this may not actually render. That would be a nice feature for Habitat though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. It looks like it collects the settings into a data structure and then emits them later instead of emitting them in-place, so this annotation may actually be attached to something else, which would be confusing, or nothing at all. 🤔

setting redis_ping_interval : Time::Span do
backend_ping_interval
end
Comment on lines +37 to +39
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell if Habitat actually supports the block format like getter does, but it didn't raise an error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, that's interesting... At this level, setting is a macro that takes a TypeDeclaration. That basically does this..

def self.{{ type_dec.var }} : {{ type_dec.type }}
  {{ type_dec.value }}
end

A little more complex, but that basic idea. So I guess the question is, what does Crystal consider the block here? My guess is the block is passed to setting which never calls yield and so it's ignored 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there's an open issue on it. luckyframework/habitat#54

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very likely. When I looked at the code for the setting macro it looked like there was some indirection that I didn't know how to follow.

What I was aiming for here was something like the block for getter and property to lazily initialize the setting, so if you were using the Redis-specific setting name it would still work, but now that I write that out I'm thinking actually maybe we could just do methods for that instead of setting multiple ivars?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, methods may be fine here. I'm updating habitat issues to track some of these things too.

setting restart_error_allowance : Int32 = 20
setting on_error : Proc(Exception, String, Nil) = ->(exception : Exception, message : String) do
Cable::Logger.error(exception: exception) { message }
Expand Down
32 changes: 32 additions & 0 deletions src/cable/backend_core.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
module Cable
abstract class BackendCore
def self.register(uri_scheme : String, backend : BackendCore.class = self)
::Cable::RegistryBackend.register uri_scheme, backend
end

# connection management
abstract def subscribe_connection
abstract def publish_connection
Expand All @@ -21,4 +25,32 @@ module Cable
abstract def ping_redis_subscribe
abstract def ping_redis_publish
end

class RegistryBackend < BackendCore
REGISTERED_BACKENDS = {} of String => BackendCore.class

def self.register(uri_scheme : String, backend : BackendCore.class = self)
REGISTERED_BACKENDS[uri_scheme] = backend
end

@backend : BackendCore

def initialize
@backend = REGISTERED_BACKENDS[URI.parse(::Cable.settings.url).scheme].new
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the magic behind the automatic runtime backend selection.

TODO: I want to provide a better error than a plain-old KeyError. I'm thinking maybe something like this:

raise Cable::UnknownBackend.new("Cannot find a backend for URI scheme #{scheme.inspect}. Did you require the adapter for it?")

end

delegate(
subscribe_connection,
publish_connection,
close_subscribe_connection,
close_publish_connection,
open_subscribe_connection,
publish_message,
subscribe,
unsubscribe,
ping_redis_subscribe,
ping_redis_publish,
to: @backend
)
end
end
Loading