Skip to content

Commit

Permalink
first draft of working with unix
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdanRada committed Mar 7, 2016
1 parent fe4d048 commit bf260ea
Show file tree
Hide file tree
Showing 4 changed files with 597 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
source 'http://rubygems.org'

gemspec

gem 'reel', github: 'celluloid/reel', branch: 'unix_server'
72 changes: 72 additions & 0 deletions examples/simple_test0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
require 'bundler/setup'
require 'celluloid_pubsub'
require 'logger'

debug_enabled = ENV['DEBUG'].present? && ENV['DEBUG'].to_s == 'true'
log_file_path = File.join(File.expand_path(File.dirname(__FILE__)), 'log', 'celluloid_pubsub.log')

# actor that subscribes to a channel
class FirstActor
include Celluloid

def initialize(options = {})
@client = CelluloidPubsub::UnixClient.new({ actor: Actor.current, channel: 'test_channel' }.merge(options))
end

def on_message(message)
if @client.succesfull_subscription?(message)
puts "subscriber got successful subscription #{message.inspect}"
@client.publish('test_channel2', 'data' => ' subscriber got successfull subscription') # the message needs to be a Hash
else
puts "subscriber got message #{message.inspect}"
end
end

def on_close(code, reason)
puts "websocket connection closed: #{code.inspect}, #{reason.inspect}"
terminate
end


end

# actor that publishes a message in a channel
class SecondActor
include Celluloid

def initialize(options = {})
@client = CelluloidPubsub::UnixClient.new({ actor: Actor.current, channel: 'test_channel2' }.merge(options))
end

def on_message(message)
if @client.succesfull_subscription?(message)
puts "publisher got successful subscription #{message.inspect}"
@client.publish('test_channel', 'data' => ' my_message') # the message needs to be a Hash
else
puts "publisher got message #{message.inspect}"
end
end

def on_close(code, reason)
puts "websocket connection closed: #{code.inspect}, #{reason.inspect}"
terminate
end

end


# please don't use the BaseActor class to supervise actors. This is subject to change . This class is used only to test backward compatibility.
# For more information on how to supervise actors please see Celluloid wiki.
CelluloidPubsub::BaseActor.setup_actor_supervision(CelluloidPubsub::UnixServer, actor_name: :web_server, args: {enable_debug: debug_enabled, adapter: nil,log_file_path: log_file_path })
CelluloidPubsub::BaseActor.setup_actor_supervision(FirstActor, actor_name: :first_actor, args: {enable_debug: debug_enabled })
CelluloidPubsub::BaseActor.setup_actor_supervision(SecondActor, actor_name: :second_actor, args: {enable_debug: debug_enabled })

signal_received = false

Signal.trap('INT') do
puts "\nAn interrupt signal has been triggered!"
signal_received = true
end

puts 'Exited succesfully! =)'
sleep 0.1 until signal_received
260 changes: 260 additions & 0 deletions lib/celluloid_pubsub/unix_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
require_relative './helper'
module CelluloidPubsub
# worker that subscribes to a channel or publishes to a channel
# if it used to subscribe to a channel the worker will dispatch the messages to the actor that made the
# connection in the first place.
#
# @!attribute actor
# @return [Celluloid::Actor] actor to which callbacks will be delegated to
#
# @!attribute options
# @return [Hash] the options that can be used to connect to webser and send additional data
#
# @!attribute channel
# @return [String] The channel to which the client will subscribe to
class UnixClient
include CelluloidPubsub::BaseActor


attr_accessor :actor, :options, :channel
finalizer :shutdown
# receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
# when receiving messages from a channel
#
# @param [Hash] options the options that can be used to connect to webser and send additional data
# @option options [String] :actor The actor that made the connection
# @option options [String] :channel The channel to which the client will subscribe to once the connection is open
# @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
# @option options [String]:hostname The hostname on which the webserver runs on
# @option options [String] :port The port on which the webserver runs on
# @option options [String] :path The request path that the webserver accepts
#
# @return [void]
#
# @api public
def initialize(options)
@options = options.stringify_keys!
@actor ||= @options.fetch('actor', nil)
@channel ||= @options.fetch('channel', nil)
raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
supervise_actors
setup_celluloid_logger
end

# the method will return the path to the log file where debug messages will be printed
#
# @return [String, nil] return the path to the log file where debug messages will be printed
#
# @api public
def log_file_path
@log_file_path ||= @options.fetch('log_file_path', nil)
end

# the method will link the current actor to the actor that is attached to, and the connection to the current actor
#
# @return [void]
#
# @api public
def supervise_actors
current_actor = Actor.current
@actor.link current_actor if @actor.respond_to?(:link)
current_actor.link connection
end

# the method will return the client that is used to
#
#
# @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
#
# @api public
def connection
#@connection ||= Celluloid::WebSocket::Client.new("ws://#{hostname}:#{port}#{path}", Actor.current)
@connection ||=Net::BufferedIO.new Celluloid::IO::UNIXSocket.new(CelluloidPubsub::WebServer::UNIX_PATH)
end

# the method will return the hostname of the server
#
#
# @return [String] the hostname where the server runs on
#
# @api public
def hostname
@hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
end

# the method will return the port on which the server accepts connections
#
#
# @return [String] the port on which the server accepts connections
#
# @api public
def port
@port ||= @options.fetch('port', CelluloidPubsub::WebServer::PORT)
end

# the method will return the path of the URL on which the servers acccepts the connection
#
#
# @return [String] the URL path that the server is mounted on
#
# @api public
def path
@path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
end

# the method will terminate the current actor
#
#
# @return [void]
#
# @api public
def shutdown
log_debug "#{self.class} tries to 'shudown'"
terminate
end

# checks if debug is enabled
#
#
# @return [boolean]
#
# @api public
def debug_enabled?
@options.fetch('enable_debug', false).to_s == 'true'
end

# subscribes to a channel . need to be used inside the connect block passed to the actor
#
# @param [string] channel
#
# @return [void]
#
# @api public
def subscribe(channel)
log_debug("#{@actor.class} tries to subscribe to channel #{channel}")
async.send_action('subscribe', channel)
end

# publishes to a channel some data (can be anything)
#
# @param [string] channel
# @param [#to_s] data
#
# @return [void]
#
# @api public
def publish(channel, data)
send_action('publish', channel, data)
end

# unsubscribes current client from a channel
#
# @param [string] channel
#
# @return [void]
#
# @api public
def unsubscribe(channel)
send_action('unsubscribe', channel)
end

# unsubscribes all clients subscribed to a channel
#
# @param [string] channel
#
# @return [void]
#
# @api public
def unsubscribe_clients(channel)
send_action('unsubscribe_clients', channel)
end

# unsubscribes all clients from all channels
#
# @return [void]
#
# @api public
def unsubscribe_all
send_action('unsubscribe_all')
end

# callback executes after connection is opened and delegates action to actor
#
# @return [void]
#
# @api public
def on_open
log_debug("#{@actor.class} websocket connection opened")
async.subscribe(@channel) if @channel.present?
end

# callback executes when actor receives a message from a subscribed channel
# and parses the message using JSON.parse and dispatches the parsed
# message to the original actor that made the connection
#
# @param [JSON] data
#
# @return [void]
#
# @api public
def on_message(data)
message = JSON.parse(data)
log_debug("#{@actor.class} received JSON #{message}")
@actor.async.on_message(message)
end

# callback executes when connection closes
#
# @param [String] code
#
# @param [String] reason
#
# @return [void]
#
# @api public
def on_close(code, reason)
connection.terminate
terminate
log_debug("#{@actor.class} dispatching on close #{code} #{reason}")
@actor.async.on_close(code, reason)
end

private

# method used to send an action to the webserver reactor , to a chanel and with data
#
# @param [String] action
# @param [String] channel
# @param [Hash] data
#
# @return [void]
#
# @api private
def send_action(action, channel = nil, data = {})
data = data.is_a?(Hash) ? data : {}
publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
async.chat(publishing_data)
end

# method used to send messages to the webserver
# checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
# otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
#
# @param [Hash] message
#
# @return [void]
#
# @api private
def chat(message)
final_message = nil
if message.is_a?(Hash)
final_message = message.to_json
else
final_message = JSON.dump(action: 'message', message: message)
end
log_debug("#{@actor.class} sends JSON #{final_message}")
request = Net::HTTP::Get.new('/')
request.exec(connection, final_message, CelluloidPubsub::WebServer::UNIX_PATH)
end
end
end
Loading

0 comments on commit bf260ea

Please sign in to comment.