# SSH open session as Job queue

NB both SSH options does not provide a mechanism to pass environment variables, which may be helpful

Requirements for Aqueduct -- a linux machine (container) with N cores and SSH server installed

In [1]:
import os, subprocess
from subprocess import PIPE, STDOUT
from io import StringIO
import select
import time

## Functions to support connection, monitoring, and cancellation

In [2]:
def ask(session, poll, command: str) -> str:
    session.stdin.write((command + "\n").encode('utf-8'))
    session.stdin.flush()
    result = ""
    
    while not poll.poll(0):
        time.sleep(1)

    result = session.stdout.read1().decode('utf-8')

    while poll.poll(0):
        line = session.stdout.read1().decode('utf-8')
        result += line
        time.sleep(1)

    return result


def get_session(server: str):
    process = subprocess.Popen(
        ["ssh", server], shell=False,
        stdout=PIPE,
        stderr=STDOUT,
        stdin=PIPE)
    poll_obj = select.poll()
    poll_obj.register(process.stdout, select.POLLIN)    
    
    while not poll_obj.poll(1):
        time.sleep(1)
    
    print(process.stdout.read1().decode('utf-8'))
    time.sleep(1)
    
    while poll_obj.poll(0):
        print(process.stdout.read1().decode('utf-8'))
        time.sleep(1)
    return process, poll_obj


def set_task(session, poll, task: str) -> int:
    return int(ask(ps, poll, f"bash -c 'echo $$ && exec {task}' &"))


def kill_task(session, poll, pid):
    text = ask(ps, poll, f"kill -9 {pids[0]}; echo \"$?\"")
    tpl = text.split('\n')
    return int(tpl[0]), "\n".join(tpl[1:])

## Establish connection

In [3]:
ps, poll = get_session("qiron-server")

Pseudo-terminal will not be allocated because stdin is not a terminal.
Welcome to Ubuntu 20.04.6 LTS (GNU/Linux 5.15.0-1050-azure x86_64)

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/pro

 * Strictly confined Kubernetes makes edge and IoT secure. Learn how MicroK8s
   just raised the bar for easy, resilient and secure K8s cluster deployment.

   https://ubuntu.com/engage/secure-kubernetes-at-the-edge

Expanded Security Maintenance for Applications is not enabled.

0 updates can be applied immediately.

10 additional security updates can be applied with ESM Apps.
Learn more about enabling ESM Apps service at https://ubuntu.com/esm





## Raw action running

In [4]:
print(ask(ps, poll, "ls"))

bpac
cert_path
chp.py
docs
etc
glibc
hub
jail.py
jupyterhub-event.log
jupyterhub.sqlite
jupyterhub_cookie_secret
mailpass.txt
node-v10.24.1-linux-x64
node-v10.24.1-linux-x64.tar.xz
nohup.out
qec
qec_explorer_api-1.0.0-py3-none-any.whl
sandbox
sbpeedy
sbpeedy-0.1.0-cp310-cp310-linux_x86_64.whl
update_password.py



## Task scheduling and monitoring

In [5]:
pids = [
    set_task(ps, poll, f"sleep {tt}")
    for tt in range(110, 115)
]
print(pids)

[1522179, 1522334, 1522489, 1522644, 1522799]


In [6]:
print(ask(ps, poll, "jobs -l"))
print(ask(ps, poll, f"ps u -p {pids[0]}"))

[1]  1522179 Running                 bash -c 'echo $$ && exec sleep 110' &
[2]  1522334 Running                 bash -c 'echo $$ && exec sleep 111' &
[3]  1522489 Running                 bash -c 'echo $$ && exec sleep 112' &
[4]- 1522644 Running                 bash -c 'echo $$ && exec sleep 113' &
[5]+ 1522799 Running                 bash -c 'echo $$ && exec sleep 114' &

USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
azureus+ 1522179  0.3  0.0   7236   580 ?        S    17:52   0:00 sleep 110



### Task killing

by pid

In [7]:
print(kill_task(ps, poll, pids[1]))
print(kill_task(ps, poll, pids[1]))

(0, '')
(1, "-bash: line 10: kill: (1522179) - No such process\n-bash: line 11: 1522179 Killed                  bash -c 'echo $$ && exec sleep 110'\n")


In [8]:
print("Clean VENV\n", ask(ps, poll, "rm -r ~/.test-venv ; echo $?"))

Clean VENV
 0



In [9]:
print("Install VENV\n", ask(ps, poll, "python -m venv ~/.test-venv ; echo $?"))

Install VENV
 0



In [10]:
print("use VENV\n", ask(ps, poll, "~/.test-venv/bin/python -c 'import os; print(os.environ)'"))

use VENV
 environ({'SHELL': '/bin/bash', 'NVM_INC': '/home/azureuser/.nvm/versions/node/v10.24.1/include/node', 'SSH_AUTH_SOCK': '/tmp/ssh-XLPWqR943e/agent.1522019', 'PWD': '/home/azureuser', 'LOGNAME': 'azureuser', 'XDG_SESSION_TYPE': 'tty', 'MOTD_SHOWN': 'pam', 'HOME': '/home/azureuser', 'LANG': 'C.UTF-8', 'SSH_CONNECTION': '185.219.110.153 46552 10.0.1.4 22', 'NVM_DIR': '/home/azureuser/.nvm', 'SGX_AESM_ADDR': '1', 'XDG_SESSION_CLASS': 'user', 'USER': 'azureuser', 'SHLVL': '1', 'NVM_CD_FLAGS': '', 'XDG_SESSION_ID': '9521', 'XDG_RUNTIME_DIR': '/run/user/1000', 'SSH_CLIENT': '185.219.110.153 46552 22', 'XDG_DATA_DIRS': '/usr/local/share:/usr/share:/var/lib/snapd/desktop', 'PATH': '/home/azureuser/.nvm/versions/node/v10.24.1/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin', 'DBUS_SESSION_BUS_ADDRESS': 'unix:path=/run/user/1000/bus', 'NVM_BIN': '/home/azureuser/.nvm/versions/node/v10.24.1/bin', '_': '/home/azureuser/.test-venv/bin/p

# SSH NOHUP as executor

In [11]:
template = "ssh {server} \"nohup exec {task} 1>/tmp/$$.out 2>/tmp/$$.err & jobs -p\""
server = "qiron-server"

def exec_ssh(command):
    print(template.format(server=server, task=command))
    with subprocess.Popen(
            template.format(server=server, task=command),
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        ) as proc:
            out, err = proc.communicate(3)
            code = proc.returncode
            return (code, out.decode(), err.decode())


def read(pid):
    with subprocess.Popen(
            f"ssh {server} 'cat /tmp/{pid}.out && cat /tmp/{pid}.err'",
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        ) as proc:
            out, err = proc.communicate(3)
            code = proc.returncode
            return (code, out.decode(), err.decode())

## Run a task

In [12]:
exec_ssh("sleep 100")

ssh qiron-server "nohup exec sleep 100 1>/tmp/$$.out 2>/tmp/$$.err & jobs -p"


(0, '1523154\n', '')

## Kill a task

This bit does not work, as PIDs are a little of mess...

In [13]:
code, pid, err = exec_ssh("kill -9 1518706")
read(int(pid))

ssh qiron-server "nohup exec kill -9 1518706 1>/tmp/$$.out 2>/tmp/$$.err & jobs -p"


(1, '', 'cat: /tmp/1523347.out: No such file or directory\n')

# Celery as Job queue

Celery needs:
1. a message queue (redis or RabbitMQ <-- docker image)
   
   ```
   docker run -d -p 5672:5672 rabbitmq
   
   ```
   
2. a celery worker(s) connected to this queue, which has some code implemented and marked with annotation

   ```
   celery -A tasks worker --loglevel=INFO
   
   ```
   
3. This code should also be present on the caller's side.

   ```
   from tasks import run_executable
   
   ```

In [14]:
from tasks import run_executable

## Task setting

In [15]:
detached_job = run_executable.delay("echo $XX; sleep 100; ls", XX="1234qwert")
job = run_executable.delay("echo $XX; sleep 10; ls", XX="1234-----")
print(job.id)

a6532cad-d0f6-495e-b3fb-76b2aae376a2


## Task termination

In [16]:
detached_job.revoke(terminate=True, signal='SIGKILL')

## Status check

In [17]:
import time
while not job.ready():
    print(job.state, job.status, job.id)
    time.sleep(1)
    # job.revoke(terminate=True)

print(job.state, job.status, job.id)
print(job.result)

PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
PENDING PENDING a6532cad-d0f6-495e-b3fb-76b2aae376a2
SUCCESS SUCCESS a6532cad-d0f6-495e-b3fb-76b2aae376a2
[0, b'1234-----\n__pycache__\ntasks.py\n', b'']


In [18]:
job = run_executable.delay("echo $XX", XX="1234qwert")
while not job.ready():
    print(job.state, job.status, job.id)
    time.sleep(1)
print(job.result)

PENDING PENDING 01d3c31d-a90d-4dd5-a4d1-b474c3afe567
[0, b'1234qwert\n', b'']


## flower - monitoring

Should be the latest version, 2.0.1, with celery 5.+

```
python -m pip install celery==5.4.0 flower==2.0.1                   
python -m celery --broker=amqp://guest:guest@localhost:5672// flower --port=8585
```

Here is a docker file which runs RabbitMQ, Celery and Flower interface
```
FROM rabbitmq:latest

WORKDIR /celery

COPY tasks.py .

RUN     apt-get update

# clean installation to be smaller with --no-install-recommends
# but this may lead to a problem https://askubuntu.com/q/65081
RUN     apt-get install -y python3.10
# probably needed for pip installaton
RUN     apt-get install -y gcc
RUN     apt-get install -y python3-pip
RUN     python3 -m pip install celery==5.4.0 flower==2.0.1

EXPOSE  5555

CMD     rabbitmq-server & \
            python3 -m celery --broker=amqp://guest:guest@localhost:5672// -A tasks worker --loglevel=INFO & \
            python3 -m celery --broker=amqp://guest:guest@localhost:5672// flower --port=5555
```


And then:

```
docker build -t celery-worker .
docker run -p 5555:5555 -p 5672:5672 --name -celery-worker-container celery-worker
```