Skip to content

Commit

Permalink
Update usage of instances to processes
Browse files Browse the repository at this point in the history
This is to help differentiate between AblyD instances and the processes it spools off.
  • Loading branch information
tomczoink committed Jul 30, 2021
1 parent 4f1874f commit d513f30
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 28 deletions.
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -32,22 +32,22 @@ Once the server receives a message in the `command` channel, it will start up an
}
```

The client can identify the instance which has started for them by the `MessageID`, then use the `Prefix` to connect to an input and an output channel for the process. These will be of structure `{Prefix}{Pid}:serverinput` and `{Prefix}{Pid}:serveroutput`.
The client can identify the process which has started for them by the `MessageID`, then use the `Prefix` to connect to an input and an output channel for the process. These will be of structure `{Prefix}{Pid}:serverinput` and `{Prefix}{Pid}:serveroutput`.

Subscribing to the `serveroutput` channel will allow the client to receive any stdout messages from the server. The client can also publish messages into the `serverinput` channel which will be passed into the stdin of the process.

This will continue until the program naturally terminates, resulting in the process dying, or the client submits a message to the `serverinput` with data `KILL`.

## Checking Current State of Processes and an AblyD Instance

AblyD makes use of Ably Presence to identify what AblyD instances exist, and what processes are running on each instance. If you check the presence set of the `command` channel, you'll see each currently active instance present with the following attached data:
AblyD makes use of Ably Presence to identify what AblyD instances exist, and what processes are running on each instance. If you check the presence set of the `command` channel, you'll see each currently active process present with the following attached data:

```json
{
"ServerID": "my-server-id",
"Namespace": "ablyd",
"MaxInstances": 20,
"Instances": {
"MaxProcesses": 20,
"Processes": {
"3490348": "Running",
"Another PID": "Running"
}
Expand Down
4 changes: 2 additions & 2 deletions asyncapispec.yml
Expand Up @@ -210,7 +210,7 @@ components:
payload:
type: object
example:
name: new-instance
name: new-process
data:
MessageID: 384n92923n
Pid: "83483992"
Expand All @@ -228,7 +228,7 @@ components:
type: string
description: Indicates the action the server is peforming
enum:
- new-instance
- new-process
data:
type: object
description: Object containing details on process that has started.
Expand Down
2 changes: 1 addition & 1 deletion examples/bash/count.html
Expand Up @@ -28,7 +28,7 @@ <h1>AblyD Example</h1>
log(`Presence data of ${msg.clientId}`, msg.data);
})

commandChannel.subscribe("new-instance", (msg) => {
commandChannel.subscribe("new-process", (msg) => {
if (msg.data.MessageID == MESSAGEID) {
let inputChannel = ably.channels.get(`${msg.data.ChannelPrefix}${msg.data.Pid}:serverinput`);
let outputChannel = ably.channels.get(`[?rewind=20]${msg.data.ChannelPrefix}${msg.data.Pid}:serveroutput`);
Expand Down
9 changes: 0 additions & 9 deletions libablyd/ablyd_instance_state.go

This file was deleted.

@@ -1,6 +1,6 @@
package libablyd

type AblyDInstanceStartMessage struct {
type AblyDProcessStartMessage struct {
MessageID string `json:"messageId"`
ServerID string `json:"serverID"`
Args [] string `json:"args"`
Expand Down
9 changes: 9 additions & 0 deletions libablyd/ablyd_process_state.go
@@ -0,0 +1,9 @@
package libablyd

type AblyDProcessState struct {
ServerID string
Namespace string

MaxProcesses int
Processes map[int]string
}
21 changes: 11 additions & 10 deletions libablyd/handler.go
Expand Up @@ -14,7 +14,7 @@ type AblyDHandler struct {
ablyRealtime *ably.Realtime // Ably Realtime Instance
ablyCommandChannel *ably.RealtimeChannel

ablyDState AblyDInstanceState // Active instances running
ablyDState AblyDProcessState // Active instances running

log *LogScope

Expand All @@ -26,7 +26,7 @@ func NewAblyDHandler(ablyRealtime *ably.Realtime, config *ProcessConfig, newLog
ablyDHandler.command = config.CommandName

ablyDHandler.startUpCommandChannel(ablyRealtime)
ablyDHandler.ablyDState = AblyDInstanceState{config.ServerID, config.ChannelNamespace, config.MaxForks, make(map[int]string)}
ablyDHandler.ablyDState = AblyDProcessState{config.ServerID, config.ChannelNamespace, config.MaxForks, make(map[int]string)}

ablyDHandler.enterPresence()

Expand All @@ -50,15 +50,15 @@ func (ablyDHandler *AblyDHandler) ListenForCommands(wg *sync.WaitGroup) {
// Subscribe to messages sent on the channel
ablyDHandler.ablyCommandChannel.Subscribe(context.Background(), "start", func(msg *ably.Message) {
stringData := msg.Data.(string)
data := AblyDInstanceStartMessage{}
data := AblyDProcessStartMessage{}
json.Unmarshal([]byte(stringData), &data)

if data.MessageID != "" {
if data.ServerID != "" && data.ServerID != ablyDHandler.config.ServerID {
return
}
if (ablyDHandler.ablyDState.MaxInstances <= len(ablyDHandler.ablyDState.Instances)) {
ablyDHandler.ablyCommandChannel.Publish(context.Background(), "Error", "Failed to create new instance: Max instances reached")
if (ablyDHandler.ablyDState.MaxProcesses <= len(ablyDHandler.ablyDState.Processes)) {
ablyDHandler.ablyCommandChannel.Publish(context.Background(), "Error", "Failed to create new process: Max processes reached")
} else {
// TODO: Should use msg ID not data,
// but this does not currently work https://github.com/ably/ably-go/issues/58
Expand Down Expand Up @@ -97,21 +97,22 @@ func (ablyDHandler *AblyDHandler) Accept(messageID string, args []string) {

// Enter presence of serverinput
channelInput.Presence.Enter(context.Background(), "")
channelOutput.Presence.Enter(context.Background(), "")

ablyEndpoint := NewAblyEndpoint(channelInput, channelOutput, log)

newInstanceMessage := &NewInstanceMessage{MessageID: messageID, Pid: pid,
newProcessMessage := &NewProcessMessage{MessageID: messageID, Pid: pid,
Namespace: ablyDHandler.config.ChannelNamespace, ChannelPrefix: ablyDHandler.config.ChannelPrefix}

ablyDHandler.ablyCommandChannel.Publish(context.Background(), "new-instance", newInstanceMessage)
ablyDHandler.ablyCommandChannel.Publish(context.Background(), "new-process", newProcessMessage)

// Add to our list of active instances
ablyDHandler.ablyDState.Instances[launched.cmd.Process.Pid] = "Running"
// Add to our list of active processes
ablyDHandler.ablyDState.Processes[launched.cmd.Process.Pid] = "Running"
ablyDHandler.ablyCommandChannel.Presence.Update(context.Background(), ablyDHandler.ablyDState)

PipeEndpoints(process, ablyEndpoint)

delete(ablyDHandler.ablyDState.Instances, launched.cmd.Process.Pid)
delete(ablyDHandler.ablyDState.Processes, launched.cmd.Process.Pid)
ablyDHandler.ablyCommandChannel.Presence.Update(context.Background(), ablyDHandler.ablyDState)
channelInput.Detach(context.Background())
channelOutput.Detach(context.Background())
Expand Down
@@ -1,6 +1,6 @@
package libablyd

type NewInstanceMessage struct {
type NewProcessMessage struct {
MessageID string
Pid string
Namespace string
Expand Down

0 comments on commit d513f30

Please sign in to comment.