diff --git a/.gitignore b/.gitignore index 0bbd4a9..4b01810 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /docs/ -/lib/ -/bin/ +lib/ +bin/ /.shards/ *.dwarf diff --git a/examples/multi_backend/README.md b/examples/multi_backend/README.md new file mode 100644 index 0000000..7d6cc31 --- /dev/null +++ b/examples/multi_backend/README.md @@ -0,0 +1,39 @@ +# Multi-Backend example + +This is an example Turbo+Cable app to demo the use of multiple backends with the Cable shard. + +## Installation + +To use the Redis and NATS backends, you will need to have access to running +Redis and NATS servers. A package manager for your operating system can simplify +the installation of them on your machine. + +If you don't want to install a NATS server, you can use publicly available servers. For example, there is a public NATS server available at `demo.nats.io` — just don't use it for production. 😄 + +Once you have Redis and NATS installed, install the Crystal dependencies: + +```shell +shards install +``` + +## Usage + +To use either backend, specify the url in the `CABLE_BACKEND_URL` environment variable: + +```shell +CABLE_BACKEND_URL=redis:/// +CABLE_BACKEND_URL=nats:/// +CABLE_BACKEND_URL=nats://demo.nats.io/ +``` + +If you would like to see the messages passing through Redis when using the Redis backend, you can use the Redis CLI with the following command: + +```shell +redis-cli subscribe time +``` + +If you would like to see the messages passing through NATS when using the NATS backend, you can use the [NATS CLI](https://github.com/nats-io/natscli), which may need to be installed separately from the NATS server. + +```shell +nats sub time +``` diff --git a/examples/multi_backend/shard.lock b/examples/multi_backend/shard.lock new file mode 100644 index 0000000..3feb84e --- /dev/null +++ b/examples/multi_backend/shard.lock @@ -0,0 +1,42 @@ +version: 2.0 +shards: + base32: + git: https://github.com/jgaskins/base32.git + version: 0.1.1+git.commit.18f5647c42dae4de654e3003825fab43dc95b029 + + cable: + path: ../.. + version: 0.2.2 + + cron_parser: + git: https://github.com/kostya/cron_parser.git + version: 0.4.0 + + db: + git: https://github.com/crystal-lang/crystal-db.git + version: 0.12.0 + + future: + git: https://github.com/crystal-community/future.cr.git + version: 1.0.0 + + habitat: + git: https://github.com/luckyframework/habitat.git + version: 0.4.7 + + nats: + git: https://github.com/jgaskins/nats.git + version: 1.3.3 + + redis: + git: https://github.com/jgaskins/redis.git + version: 0.7.0 + + tasker: + git: https://github.com/spider-gazelle/tasker.git + version: 2.1.4 + + turbo: + git: https://github.com/jgaskins/turbo.git + version: 0.1.0+git.commit.8685616e26d7903d1559f5f3f8b96085bc10af12 + diff --git a/examples/multi_backend/shard.yml b/examples/multi_backend/shard.yml new file mode 100644 index 0000000..1164a26 --- /dev/null +++ b/examples/multi_backend/shard.yml @@ -0,0 +1,23 @@ +name: multi_backend +version: 0.1.0 + +authors: + - Jamie Gaskins + +targets: + multi_backend: + main: src/multi_backend.cr + +dependencies: + cable: + path: ../.. + nats: + github: jgaskins/nats + redis: + github: jgaskins/redis + turbo: + github: jgaskins/turbo + +crystal: 1.9.2 + +license: MIT diff --git a/examples/multi_backend/src/multi_backend.cr b/examples/multi_backend/src/multi_backend.cr new file mode 100644 index 0000000..d4976d3 --- /dev/null +++ b/examples/multi_backend/src/multi_backend.cr @@ -0,0 +1,48 @@ +require "turbo/cable" +require "cable/backend/nats" +require "cable/backend/redis/backend" + +module AppCable + class Connection < Cable::Connection + identified_by id + + getter id = UUID.random.to_s + + def connect + end + end +end + +Cable.configure do |settings| + settings.route = "/cable" # the URL your JS Client will connect + # settings.url = "redis:///" + # settings.url = ENV.fetch("NATS_URL", "nats:///") + settings.url = ENV.fetch("CABLE_BACKEND_URL", "redis:///") +end + +Turbo::StreamsChannel.signing_key = "this is my signing key" + +spawn do + loop do + duration = Time.measure do + Turbo::StreamsChannel.broadcast_update_to "time", + message: Time.local.to_s + end + sleep 1.second - duration + end +end + +http = HTTP::Server.new([ + HTTP::LogHandler.new, + Cable::Handler(AppCable::Connection).new, +]) do |context| + context.response << <<-HTML + + #{Turbo.javascript_tag} + #{Turbo.cable_tag} + #{Turbo::Frame.new(id: "time") { }} + #{Turbo.stream_from "time"} + HTML +end + +http.listen 3200 diff --git a/src/backend/nats.cr b/src/backend/nats.cr new file mode 100644 index 0000000..2984d0b --- /dev/null +++ b/src/backend/nats.cr @@ -0,0 +1,64 @@ +require "nats" + +module Cable + class NATSBackend < BackendCore + register "nats" + + getter nats : NATS::Client do + NATS::Client.new(URI.parse(Cable.settings.url)) + end + getter streams = Hash(String, Set(NATS::Subscription)).new { |streams, channel| + streams[channel] = Set(NATS::Subscription).new + } + + def subscribe_connection + nats + end + + def publish_connection + nats + end + + def close_subscribe_connection + nats.close rescue nil + end + + def close_publish_connection + nats.close rescue nil + end + + def open_subscribe_connection(channel) + nats + end + + def publish_message(stream_identifier : String, message : String) + nats.publish stream_identifier, message + end + + def subscribe(stream_identifier : String) + subscription = nats.subscribe stream_identifier, queue_group: object_id.to_s do |msg| + Cable.server.fiber_channel.send({ + stream_identifier, + String.new(msg.body), + }) + end + streams[stream_identifier] << subscription + end + + def unsubscribe(stream_identifier : String) + if subscriptions = streams.delete(stream_identifier) + subscriptions.each do |subscription| + nats.unsubscribe subscription + end + end + end + + def ping_redis_subscribe + nats.ping + end + + def ping_redis_publish + nats.ping + end + end +end diff --git a/src/backend/redis/backend.cr b/src/backend/redis/backend.cr index 08a4f9a..e193813 100644 --- a/src/backend/redis/backend.cr +++ b/src/backend/redis/backend.cr @@ -1,5 +1,10 @@ +require "redis" + module Cable class RedisBackend < Cable::BackendCore + register "redis" + register "rediss" + # connection management getter redis_subscribe : Redis::Connection = Redis::Connection.new(URI.parse(Cable.settings.url)) getter redis_publish : Redis::Client = Redis::Client.new(URI.parse(Cable.settings.url)) diff --git a/src/cable.cr b/src/cable.cr index af3f441..f1e2761 100644 --- a/src/cable.cr +++ b/src/cable.cr @@ -1,6 +1,5 @@ require "habitat" require "json" -require "redis" require "./cable/**" # TODO: Write documentation for `Cable` @@ -32,8 +31,12 @@ module Cable setting token : String = "token", example: "token" setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379" setting disable_sec_websocket_protocol_header : Bool = false - setting backend_class : Cable::BackendCore.class = Cable::RedisBackend, example: "Cable::RedisBackend" - setting redis_ping_interval : Time::Span = 15.seconds + setting backend_class : Cable::BackendCore.class = Cable::RegistryBackend, example: "Cable::RedisBackend" + setting backend_ping_interval : Time::Span = 15.seconds + @[Deprecated("Use backend_ping_interval")] + setting redis_ping_interval : Time::Span do + backend_ping_interval + end setting restart_error_allowance : Int32 = 20 setting on_error : Proc(Exception, String, Nil) = ->(exception : Exception, message : String) do Cable::Logger.error(exception: exception) { message } diff --git a/src/cable/backend_core.cr b/src/cable/backend_core.cr index 9d04b9c..3038b28 100644 --- a/src/cable/backend_core.cr +++ b/src/cable/backend_core.cr @@ -1,5 +1,9 @@ module Cable abstract class BackendCore + def self.register(uri_scheme : String, backend : BackendCore.class = self) + ::Cable::RegistryBackend.register uri_scheme, backend + end + # connection management abstract def subscribe_connection abstract def publish_connection @@ -21,4 +25,32 @@ module Cable abstract def ping_redis_subscribe abstract def ping_redis_publish end + + class RegistryBackend < BackendCore + REGISTERED_BACKENDS = {} of String => BackendCore.class + + def self.register(uri_scheme : String, backend : BackendCore.class = self) + REGISTERED_BACKENDS[uri_scheme] = backend + end + + @backend : BackendCore + + def initialize + @backend = REGISTERED_BACKENDS[URI.parse(::Cable.settings.url).scheme].new + end + + delegate( + subscribe_connection, + publish_connection, + close_subscribe_connection, + close_publish_connection, + open_subscribe_connection, + publish_message, + subscribe, + unsubscribe, + ping_redis_subscribe, + ping_redis_publish, + to: @backend + ) + end end