Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Carl Hörberg August 31, 2013
file 47 lines (40 sloc) 1.086 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
require 'sinatra'
require 'sinatra/streaming'
require 'haml'
require 'amqp'

configure do
  disable :logging
  EM.next_tick do
    # Connect to CloudAMQP and set the default connection
    url = ENV['CLOUDAMQP_URL'] || "amqp://guest:guest@localhost"
    AMQP.connection = AMQP.connect url
    PUB_CHAN = AMQP::Channel.new
  end
end

get '/' do
  haml :index
end

post '/publish' do
  # publish a message to a fanout exchange
  PUB_CHAN.fanout("f1").publish "Hello, world!"
  204
end

get '/stream', provides: 'text/event-stream' do
  stream :keep_open do |out|
    AMQP::Channel.new do |channel|
      channel.queue('', exclusive: true) do |queue|
        # create a queue and bind it to the fanout exchange
        queue.bind(channel.fanout("f1")).subscribe do |payload|
          out << "data: #{payload}\n\n"
        end
      end

      # add a timer to keep the connection alive
      timer = EM.add_periodic_timer(20) { out << ":\n" }

      # clean up when the user closes the stream
      out.callback do
        timer.cancel
        channel.close
      end
    end
  end
end
Something went wrong with that request. Please try again.