Writing Custom AnyCable Server

Vladimir Dementyev edited this page Oct 21, 2017 · 4 revisions

You can write your own server to handle cable clients and connect them to your business logic through AnyCable.

Saying "cable clients" we want to underline that AnyCable doesn't depend on any transport protocol (e.g. WebSockets), you can use any protocol for client-server communication (e.g. RTMP, custom TCP, long-polling, etc).

Requirements

The server should be able to:

  • Communicate with gRPC server as a gRPC client

gRPC provides libraries for most popular languages (see docs. If there is no gRPC support for you favourite language, you can build it yourself (the minimal implementation for AnyCable) – it's just HTTP2 + Protocol Buffers.

See erlgrpc for the example of minimal gRPC client.

  • Subscribe to Redis channels.

We use Redis to receive broadcast events from the application. For now it's the only option, but we're going to provide alternatives.

Step-by-step

Let's go through all steps to implement a custom server (using abstract language).

Step 1. Server

First of all, you need a server – the entry point for clients connections – which can handle incoming data and disconnection events.

interface Server {
 # Invoked on socket connection.
 # socket_handle is an entity (object/record/whatever) representing connection socket 
 func socket_conn(socket_handle);
 
 # Invoked on socket disconnection
 func socket_disconn(socket_handle);

 # Invoker on incoming message
 func socket_data(socket_handle, msg);
}

Step 2. Hub

Hub stores information about clients subscriptions and has the following interface:

interface Hub {
  # Subscribe socket to the stream.
  # We also need a channel_id to sign messages with it (see below)
  func add(socket_handle, channel_id, stream);
  
  # Unsubscribe socket from all streams for the given channel
  func remove(socket_handle, channel_id);

  # Broadcast a message to all subscribed sockets
  func broadcast(stream, msg);
}

Why do we need a channel_id? Unfortunately, this is required by Action Cable client. The JS client doesn't know about streams, only about channels. So it needs a channel identifier to be present in incoming messages to resolve channels.

Moreover, there is no uniqueness restrictions on streams names – the same stream name can be used for different channels.

Thus, our broadcast function may look like this:

func broadcast(stream, msg) {
  # Assume that we have a nested structure to store subscriptions:
  # sockets2streams
  #           |
  #           stream1
  #           |    |
  #           |    channel1 - (socket1, ..., socketN)
  #           |    |
  #           |    channel2 – ( ... )
  #           |
  #           stream2 ...
  #     
  for (channel in channels_for_stream(stream)) {
    channel_msg = msg_for_channel(msg, channel.id())
    for (socket in channel.sockets()) {
      socket.transmit(channel_msg)
    }
  }
}

# msg – JSON encoded string
# We should transform into another JSON "{"identifier":<identifier>,"message": <msg>}"
func msg_for_channel(msg, identifier) {
  return json_encode(['identifier', 'message'], [identifier, json_decode(msg)]); 
}

Step 3. Pinger

Action Cable clients assumes that a server sends a special message – ping – every 3 seconds (configurable). Thus we should implement a pinger.

Pinger is a simple entity that holds a list of active sockets and broadcast a message to them every X seconds.

interface Pinger {
  # Add socket to the active list
  func register(socket_handle);
  
  # Remove socket from the active list
  func unregister(socket_handle);
}

And we need a kind of loop method:

func loop() {
  while(true) {
    var msg = ping_message()
    for (socket in active_sockets) {
      socket.transmit(msg)
    }
    sleep(INTERVAL)
  }
}


func ping_message() {
  return json_encode(['type', 'message'], ['ping', time.utc()])
}

Step 4. gRPC Client

Then you have to build a gRPC client using rpc.proto.

It has a simple interface with only three methods: Connect, Disconnect and Command. Let's go to the Step 4 to see, how to use these methods and their return values.

Step 5. Server – RPC communication.

Now, when we already have server and RPC client, let's fit them together.

Client Connection

Every time a client is connected to our server we should invoke Connect method to authorize connection:

func socket_conn(socket_handle) {
  # We need request URL and cookies (if we want to use cookie-based authentication)
  var url = socket_handle.url()
  # Extract Cookie header and build a map { 'Cookie' => cookie_val }
  # NOTE: you MAY provide more headers if you want
  var headers = header('Cookie', socket_handle.header('Cookie'));

  # Then generate a payload (build protobuf msg)
  # ConnectionRequest contains fields:
  #   path - string - request URL
  #   headers - map<string><string>
  var payload = pb::ConnectionRequest(url, headers)
  
  # Make a call and get a response – ConnectionResponse:
  #    status – Status::SUCCESS | Status::ERROR – status enum is a part of rpc.proto
  #    identifiers – string (connection identifiers string used by the app)
  #    transmissions - list of strings (repeated string)
  var response = rpc::Connect(payload)
  
  # handle response
  if (response.status() == pb::Status::SUCCESS) {
    # store identifiers for the socket
    # we will use them in later calls
    socket_handle.setIdentifiers(response.identifiers())
    
    # transmit messages to socket
    # NOTE: typically Connect returns only "welcome" message
    socket_handle.transmit(response.transmissions())
   
    # register socket to pinger
    pinger.register(socket_handle)
  } else {
    # if Status is not SUCCESS we should disconnect the socket
    socket_handle.close()
    
    # non-SUCCESS status could be:
    #  - ERROR - there was an en exception during the call; in this case we also have response.error_msg()
    #  - FAILURE - application-level "rejection" (e.g. authentication failed)
  }
}

Client Commands

Command is an incoming message from the client. We should distinguish "subscribe" and "unsubscribe" command from others, 'cause they're responsible for subscriptions.

func socket_data(socket_handle, msg) {
  var decoded = json_decode(msg)
  var type = decoded.key("type")
  # Every command is associated with the specified channel
  var identifier = decoded.key("identifier")
  var data = decoded.key("data")

  # Generate a payload (build protobuf msg)
  # CommandMessage contains fields:
  #   command - string
  #   identifier - string (channel identifier)
  #   connection_identifiers - string (identifiers from Connect call)
  #   data – string (additional provided data)
  var payload = pb::CommandMessage(type, identifier, socket_handle.identifiers(), data)
  
  # Make a call and get a response – ConnectionResponse:
  #    status – Status::SUCCESS | Status::FAILURE | Status::ERROR– status enum is a part of rpc.proto
  #    disconnect – bool – whether to disconnect client or not
  #    stop_streams – bool – whether to stop all existing subscriptions for the channel
  #    streams – list of strings – new subscriptions
  #    transmissions - list of strings – messages to send to the client
  #    error_msg – error message in case of ERROR
  var response = rpc::Command(payload)
  
  # handle response
  if (response.status() == pb::Status::SUCCESS) {
    # First, handle subscription commands
    # We should track client subscriptions in order to call `#unsubscribe` callbacks on disconnection
    if (type == "subscribe") {
      socket_handle.addSubscripton(identifier)
    }

    if (type == "unsubscribe") {
      socket_handle.removeSubscription(identifier)
    }
    
    # Then handle other response information
    # If response contains disconnect flag set to true
    # The we immediately disconnect the client 
    if (response.disconnect()) {
      return socket_handle.close()
    }
    
    if (response.stop_streams()) {
      # Stop all subscriptions for the channel
      hub.remove(socket_handle, identifier)
    }
    
    # Add new subscriptions
    for (stream in response.streams()) {
      hub.add(socket_handle, identifier, stream)
    }

    # And, finally, transmit messages
    socket_handle.transmit(response.transmissions())
  } else {
    # in case of failure you may want to disconnect the client
    socket_handle.close()
  }
}

Client Disconnection

When client disconnects we should remove its subscriptions, de-register from pinger and invoke #disconnect/#unsubscribe callbacks in the app.

func socket_disconn(socket_handle) {
  # De-register socket from pinger
  pinger.unregister(socket_handle)
  
  # Remove subscriptions
  var subscriptions = socket_handle.subscriptions()
  
  for (channel in subscriptions) {
    hub.remove(socket_handle, channel)
  }
 
  # And only after that notify the app thru RPC

  # Extract Cookie header and build a map { 'Cookie' => cookie_val }
  var headers = header('Cookie', socket_handle.header('Cookie'));
  # Then generate a payload (build protobuf msg)
  # DisconnectRequest contains fields:
  #  identifiers – string – connection identifiers
  #  subscriptions – list of strings – connections channels
  #  path – string – request URL
  #  headers - map of strings
  var payload = pb::DisconnectRequest(socket_handle.identifier(), subscriptions, socket_handle.url(), headers)
  
  # Make a call and get a response – DisconnectResponse:
  #    status – Status::SUCCESS | Status::ERROR – status enum is a part of rpc.proto
  # Actually, response status does not matter here, we should cleanup  
  rpc::Disconnect(payload)
}

NOTE: It make sense to call Disconnect asynchronously or using a queue in order to avoid RPC calls spikes caused by mass-disconnection.

You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.