Skip to content

Commit

Permalink
Fastly fanout server (#7)
Browse files Browse the repository at this point in the history
* feat: fanout

* feat: CI

* feat: bump version

* feat: bump version js

* feat: added server param in broadcast

* fix: bump version
  • Loading branch information
aablinov committed May 26, 2023
1 parent 5098d78 commit 920f60f
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 57 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ jobs:
MERCURE_DOMAIN: ${{ secrets.MERCURE_DOMAIN }}
MERCURE_PUBLISHER_KEY: ${{ secrets.MERCURE_PUBLISHER_KEY }}
MERCURE_SUBSCRIBER_KEY: ${{ secrets.MERCURE_SUBSCRIBER_KEY }}
FANOUT_FASTLY_KEY: ${{ secrets.FANOUT_FASTLY_KEY }}
FANOUT_SERVICE_ID: ${{ secrets.FANOUT_SERVICE_ID }}
FANOUT_SERVICE_URL: ${{ secrets.FANOUT_SERVICE_URL }}

name: ${{ format('Tests (Ruby {0}, Rails {1})', matrix.ruby-version, matrix.rails-version) }}
runs-on: ubuntu-latest
Expand All @@ -42,7 +45,16 @@ jobs:
run: |
bin/rails db:create db:migrate
- name: Run tests
- name: Run tests [mercure]
env:
TURBO_TRAIN_TEST_SERVER: mercure
run: |
bin/test test/**/*_test.rb
- name: Run tests [fanout]
env:
TURBO_TRAIN_TEST_SERVER: fanout
run: |
bin/test test/**/*_test.rb

2 changes: 1 addition & 1 deletion app/assets/javascripts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@uscreentv/turbo-train",
"version": "0.0.2",
"version": "0.0.3",
"description": "",
"main": "turbo-train.js",
"type": "module",
Expand Down
12 changes: 2 additions & 10 deletions app/assets/javascripts/turbo-train.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import { Turbo } from "@hotwired/turbo-rails"

export default class TurboTrain extends HTMLElement {
static get observedAttributes() {
return [ 'href', 'session', 'name' ];
return [ 'href' ];
}

constructor() {
super();
}

connectedCallback() {
this.eventSource = new EventSource(`${this.href}/mercure?topic=${this.name}&authorization=${this.session}`);
this.eventSource = new EventSource(this.href);
Turbo.connectStreamSource(this.eventSource);
}

Expand All @@ -21,14 +21,6 @@ export default class TurboTrain extends HTMLElement {
get href() {
return this.getAttribute('href');
}

get session() {
return this.getAttribute('session');
}

get name() {
return this.getAttribute('name');
}
}

if (
Expand Down
2 changes: 1 addition & 1 deletion app/assets/javascripts/turbo-train.min.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions app/helpers/turbo/train/streams_helper.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module Turbo::Train::StreamsHelper
def turbo_train_from(*streamables, **attributes)
attributes[:name] = Turbo::Train.signed_stream_name(streamables)
attributes[:session] = Turbo::Train.encode({ platform: "web" })
attributes[:href] = Turbo::Train.configuration.url
attributes[:href] = Turbo::Train.server(attributes[:server]&.to_sym).listen_url(streamables, platform: "web")
tag.turbo_train_stream_source(**attributes)
end
end
4 changes: 3 additions & 1 deletion lib/turbo/train.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
require "turbo/train/version"
require "turbo/train/config"
require 'turbo/train/broadcasts'
require 'turbo/train/server'
require 'turbo/train/base_server'
require 'turbo/train/mercure_server'
require 'turbo/train/fanout_server'
require 'turbo/train/test_server'
require 'turbo/train/test_helper'
require "turbo/train/engine"
27 changes: 27 additions & 0 deletions lib/turbo/train/base_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Turbo
module Train
class BaseServer
attr_reader :configuration

def initialize(configuration)
@configuration = configuration
end

def publish(topics:, data:)
raise NotImplementedError
end

def server_config
raise NotImplementedError
end

def publish_url
server_config.publish_url
end

def listen_url(topic, **options)
server_config.listen_url(topic, **options)
end
end
end
end
6 changes: 3 additions & 3 deletions lib/turbo/train/broadcasts.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Turbo::Train::Broadcasts
def broadcast(streamables, content:)
def broadcast(streamables, content:, server: nil)
topics = if streamables.is_a?(Array)
streamables.map { |s| signed_stream_name(s) }
else
Expand All @@ -11,13 +11,13 @@ def broadcast(streamables, content:)
data: content
}

Turbo::Train.server.publish(topics: topics, data: data)
Turbo::Train.server(server).publish(topics: topics, data: data)
end

def broadcast_action_to(*streamables, action:, target: nil, targets: nil, **rendering)
broadcast(streamables, content: turbo_stream_action_tag(action, target: target, targets: targets, template:
rendering.delete(:content) || rendering.delete(:html) || (rendering.any? ? render_format(:html, **rendering) : nil)
))
), server: rendering.delete(:server))
end

def broadcast_render_to(*streamables, **rendering)
Expand Down
62 changes: 59 additions & 3 deletions lib/turbo/train/config.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,74 @@
module Turbo
module Train
class Configuration
attr_accessor :mercure_domain, :publisher_key, :subscriber_key, :skip_ssl_verification
class MercureConfiguration
attr_accessor :mercure_domain, :publisher_key, :subscriber_key

def initialize
super
@mercure_domain = 'localhost'
@publisher_key = 'test'
@subscriber_key = 'testing'
@skip_ssl_verification = Rails.env.development? || Rails.env.test?
end

def url
"https://#{mercure_domain}/.well-known"
end

def publish_url
"#{url}/mercure"
end

def listen_url(topic, platform: 'web')
"#{url}/mercure?topic=#{Turbo::Train.signed_stream_name(topic)}&authorization=#{jwt_auth_token({ platform: platform })}"
end

def jwt_auth_token(payload)
structured_payload = { mercure: { payload: payload } }
JWT.encode structured_payload, subscriber_key, ALGORITHM
end
end

class FanoutConfiguration
attr_accessor :fastly_api_url, :service_url, :fastly_key, :service_id

def initialize
super
@fastly_api_url = 'https://api.fastly.com'
@service_url = 'https://johnny-cage-fake-url.edgecompute.app'
@fastly_key = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
end

def publish_url
"#{@fastly_api_url}/service/#{@service_id}/publish/"
end

def listen_url(topic, **)
"#{service_url}/stream/sse?topic=#{Turbo::Train.signed_stream_name(topic)}"
end
end

class Configuration
attr_accessor :skip_ssl_verification, :mercure, :fanout, :default_server

def initialize
@skip_ssl_verification = Rails.env.development? || Rails.env.test?
@mercure = nil
@fanout = nil
@default_server = :mercure
end

def server(server_name)
case server_name
when :mercure
@mercure ||= MercureConfiguration.new
yield(@mercure)
when :fanout
@fanout ||= FanoutConfiguration.new
yield(@fanout)
else
raise ArgumentError, "Unknown server name: #{server_name}"
end
end
end

class << self
Expand Down
36 changes: 36 additions & 0 deletions lib/turbo/train/fanout_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Turbo
module Train
class FanoutServer < BaseServer
def publish(topics:, data:)
uri = URI(server_config.publish_url)
req = Net::HTTP::Post.new(uri)
req['Fastly-Key'] = server_config.fastly_key

message = data[:data].gsub("\n", "")
payload = {items: []}

Array(topics).each do |topic|
payload[:items] << {channel: topic, formats: { 'http-stream': { content: "event: message\ndata: #{message}\n\n" } } }
end

req.body = payload.to_json

opts = {
use_ssl: uri.scheme == 'https'
}

if configuration.skip_ssl_verification
opts[:verify_mode] = OpenSSL::SSL::VERIFY_NONE
end

Net::HTTP.start(uri.host, uri.port, opts) do |http|
http.request(req)
end
end

def server_config
configuration.fanout
end
end
end
end
16 changes: 7 additions & 9 deletions lib/turbo/train/server.rb → lib/turbo/train/mercure_server.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
module Turbo
module Train
class Server
attr_reader :configuration

def initialize(configuration)
@configuration = configuration
end

class MercureServer < BaseServer
def publish(topics:, data:)
payload = { mercure: { publish: topics } }
token = JWT.encode payload, configuration.publisher_key, ALGORITHM
token = JWT.encode payload, server_config.publisher_key, ALGORITHM

uri = URI("#{configuration.url}/mercure")
uri = URI(publish_url)

req = Net::HTTP::Post.new(uri)
req['Content-Type'] = 'application/x-www-form-urlencoded'
Expand All @@ -30,6 +24,10 @@ def publish(topics:, data:)
http.request(req)
end
end

def server_config
configuration.mercure
end
end
end
end
42 changes: 40 additions & 2 deletions lib/turbo/train/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,31 @@ module Turbo
module Train
module TestHelper
def before_setup
Turbo::Train.instance_variable_set(:@server, Turbo::Train::TestServer.new(Turbo::Train.configuration))
test_server = case ENV.fetch('TURBO_TRAIN_TEST_SERVER', 'mercure').to_sym
when :mercure
Turbo::Train::TestServer.new(Turbo::Train.mercure_server, Turbo::Train.configuration)
when :fanout
Turbo::Train::TestServer.new(Turbo::Train.fanout_server, Turbo::Train.configuration)
else
raise "Unknown test server: #{ENV['TURBO_TRAIN_TEST_SERVER']}"
end

Turbo::Train.instance_variable_set(:@server, test_server)
super
end

def after_teardown
test_server = case ENV.fetch('TURBO_TRAIN_TEST_SERVER', 'mercure').to_sym
when :mercure
Turbo::Train.mercure_server
when :fanout
Turbo::Train.fanout_server
else
raise "Unknown test server: #{ENV['TURBO_TRAIN_TEST_SERVER']}"
end

super
Turbo::Train.instance_variable_set(:@server, Turbo::Train::Server.new(Turbo::Train.configuration))
Turbo::Train.instance_variable_set(:@server, test_server)
end

def assert_broadcast_on(stream, data, &block)
Expand All @@ -36,6 +54,26 @@ def assert_broadcast_on(stream, data, &block)

assert message, "No messages sent with #{data} to #{Turbo::Train.stream_name_from(stream)}"
end

def assert_body_match(r)
if Turbo::Train.server.real_server.is_a?(Turbo::Train::FanoutServer)
assert_match "Published\n", r.body
elsif Turbo::Train.server.real_server.is_a?(Turbo::Train::MercureServer)
assert_match /urn:uuid:.*/, r.body
else
raise "Unknown server type"
end
end

def assert_response_from_mercure_server(r)
assert_equal r.code, '200'
assert_match /urn:uuid:.*/, r.body
end

def assert_response_from_fanout_server(r)
assert_equal r.code, '200'
assert_match "Published\n", r.body
end
end
end
end
9 changes: 5 additions & 4 deletions lib/turbo/train/test_server.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module Turbo
module Train
class TestServer < Server
attr_reader :configuration, :channels_data
class TestServer < BaseServer
attr_reader :configuration, :channels_data, :real_server

def initialize(configuration)
def initialize(real_server, configuration)
@configuration = configuration
@channels_data = {}
@real_server = real_server
end

def publish(topics:, data:)
Expand All @@ -14,7 +15,7 @@ def publish(topics:, data:)
@channels_data[topic] << data[:data]
end

super
real_server.publish(topics: topics, data: data) if real_server
end

def broadcasts(channel)
Expand Down
Loading

0 comments on commit 920f60f

Please sign in to comment.