In [None]:
import json
import re

import docker
import docker.models.containers

import time
import paramiko


def create_ssh_client(
    hostname="localhost", port=2222, username="root", password="1234"
):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(hostname, port=port, username=username, password=password)
    return client


def create_shell(client: paramiko.SSHClient):
    channel = client.invoke_shell(width=1000)
    return channel


def clean_terminal_output(output):
    ansi_escape = re.compile(r"(\x1b\[[0-?]*[ -/]*[@-~])|(\r)")
    cleaned_output = ansi_escape.sub("", output)
    return cleaned_output


def extract_relevant_output(output):
    # Adjusted pattern to match the Linux command prompt
    pattern = re.compile(r"(\w+@\w+:[~\/\w-]*[#\$])")

    # Split the output based on the pattern
    parts = pattern.split(output)

    # Filter out empty strings and reconstruct the output
    cleaned_output = "".join([part for part in parts if part.strip()])

    # Exclude the first and last prompts from the reconstructed output
    prompts = pattern.findall(cleaned_output)

    if prompts:
        first_prompt_pos = cleaned_output.find(prompts[0])
        # last_prompt_pos = cleaned_output.rfind(prompts[-1])
        cleaned_output = cleaned_output[first_prompt_pos:].strip()

    return cleaned_output, prompts[-1]


def execute_command_shell(channel: paramiko.Channel, command: str):
    channel.send(command + "\n")
    while not channel.recv_ready():
        time.sleep(0.1)

    time.sleep(0.5)

    output = b""
    while channel.recv_ready():
        output += channel.recv(1024)

    processed_output = output.decode("utf-8").rstrip()
    return clean_terminal_output(processed_output)


class DummyOutput:
    output: str
    exit_code: int

    def __init__(self, code, o):
        self.output = o
        self.exit_code = code


class Container:
    def __init__(self, image: str, port: int = 2222, username="root", password="1234"):
        self.image = image
        self.client = docker.from_env()
        self.container: docker.models.containers.Container = self.client.containers.run(
            image,
            detach=True,
            tty=True,
            stdin_open=True,
            remove=True,
            labels={"created_by": "os-pipeline"},
            ports={"22/tcp": port},
        )
        self.ssh_client = create_ssh_client(
            port=port, username=username, password=password
        )
        self.channel = create_shell(self.ssh_client)
        self.last_line = None

    def __del__(self):
        try:
            self.channel.close()
            self.ssh_client.close()
            self.container.stop()
        except:
            pass

    def execute(self, command: str):
        if not isinstance(command, str):
            return DummyOutput(-1, b"")

        print("---------SEND CMD---------")
        print(command)
        print("---------EXECUTING---------")
        result = execute_command_shell(self.channel, command)
        print(result)
        if self.last_line is not None:
            result = self.last_line + result

        result, last_line = extract_relevant_output(result)
        self.last_line = last_line
        print("---------RESULT---------")
        print(result)
        return DummyOutput(0, result)

    def execute_independent(self, command, *params):
        # print("=== EXECUTING INDEPENDENT ===\n", command)
        language, command = command
        # if params:
        #     print("== Parameters ==\n", params)
        if language == "bash":
            cmd = ["bash", "-c", command]
            if params:
                cmd.append("--")
                cmd.extend(params)
        elif language == "python":
            cmd = ["python3", "-c", command, *params]
        elif language == "c++":
            self.execute_independent(
                (
                    "bash",
                    f'echo "{json.dumps(command)}" > /tmp/main.cpp && '
                    f"g++ -o /tmp/a.out /tmp/main.cpp",
                ),
                None,
            )
            cmd = ["/tmp/a.out", *params]
        elif language == "c":
            self.execute_independent(
                (
                    "bash",
                    f'echo "{json.dumps(command)}" > /tmp/main.cpp && '
                    f"gcc -o /tmp/a.out /tmp/main.cpp",
                ),
                None,
            )
            cmd = ["/tmp/a.out", *params]
        else:
            raise ValueError("Unsupported language")
        return self.container.exec_run(cmd)

In [1]:
import sys
from pathlib import Path

sys.path.insert(0, str(Path().resolve().parent.parent))

from src.server.tasks.os_interaction.task import Container

container = Container(image="local-os/ssh")

  "class": algorithms.Blowfish,


In [3]:
try:
    # command = 'cd ~'
    # command = 'cd /'
    # command = 'while true; do :; done'
    # command = 'cd /home'
    # command = 'ls'
    # command = 'pwd'
    command = """cd ~
    echo hello world > test.txt
    cat test.txt"""
#     command = """
# # find ~ -type f -name '*.c'
# # """
#     command = """
# ps -eo user=,pid= | grep -v '^root' | awk '{print $2}' | sort -n | head -n 1 | awk '{print $1}'
# """
    # command = 'cat test.txt'
    # command = """cat test.txt"""
    res = container.execute(command=command)
except Exception as ex:
    print(ex)

---------SEND CMD---------
cd ~
    echo hello world > test.txt
    cat test.txt
---------EXECUTING---------
cd ~
root@adcfe07c0b9d:~#     echo hello world > test.txt
root@adcfe07c0b9d:~#     cat test.txt
hello world
root@adcfe07c0b9d:~#
---------RESULT---------
root@adcfe07c0b9d:~#cd ~
root@adcfe07c0b9d:~#     echo hello world > test.txt
root@adcfe07c0b9d:~#     cat test.txt
hello world
root@adcfe07c0b9d:~#


In [None]:
import docker

image = "local-os/ssh"
client = docker.from_env()
container = docker.models.containers.Container = client.containers.run(
    image,
    detach=True,
    tty=True,
    stdin_open=True,
    remove=True,
    labels={"created_by": "os-pipeline"},
    ports={"22/tcp": 2222},
)


In [None]:
import paramiko
import time
import re


def create_client(hostname="localhost", port=2222, username="root", password="1234"):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(hostname, port=port, username=username, password=password)
    return client


def create_shell(client: paramiko.SSHClient):
    channel = client.invoke_shell(width=1000)
    return channel


def clean_terminal_output(output):
    ansi_escape = re.compile(r"(\x1b\[[0-?]*[ -/]*[@-~])|(\r)")
    cleaned_output = ansi_escape.sub("", output)
    return cleaned_output


def extract_relevant_output(output):
    # Adjusted pattern to match the Linux command prompt
    pattern = re.compile(r"(\w+@\w+:[~\/\w-]*[#\$])")

    # Split the output based on the pattern
    parts = pattern.split(output)

    # Filter out empty strings and reconstruct the output
    cleaned_output = "".join([part for part in parts if part.strip()])

    # Exclude the first and last prompts from the reconstructed output
    prompts = pattern.findall(cleaned_output)

    if prompts:
        first_prompt_pos = cleaned_output.find(prompts[0])
        last_prompt_pos = cleaned_output.rfind(prompts[-1])
        cleaned_output = cleaned_output[first_prompt_pos:last_prompt_pos].strip()

    return cleaned_output


def execute_command_shell(channel: paramiko.Channel, command: str):
    channel.send(command + "\n")
    while not channel.recv_ready():
        time.sleep(0.1)

    time.sleep(0.5)

    output = b""
    while channel.recv_ready():
        output += channel.recv(1024)

    processed_output = output.decode("utf-8").rstrip()
    res = clean_terminal_output(processed_output)
    print(res)
    relevant = extract_relevant_output(res)
    print()
    print(relevant)
    return relevant


client = create_client()
channel = create_shell(client)

# execute_command_shell(channel, "stty sane")
# command = "ps -eo user=,pid= | grep -v '^root' | awk '{print $2}' | sort -n | head -n 1 | awk '{print $1}'"
command = """
cd /home
echo hello world
mkdir tst
cd tst
echo this is a test > test.txt
cat test.txt
""".strip()
execute_command_shell(channel, command)

channel.close()
client.close()

In [None]:
len("find /etc -type f -exec ls -l {} \; | awk '{print $5, $9}' |".encode())

In [None]:
''.join(['012'] * 50)

In [None]:
len(b'echo 012345678901234567890123456789012345678901234567890123456789012345678901234"')

In [None]:
channel.close()
client.close()

In [None]:
command

In [None]:
cleaned = re.sub('\r', '', res.output)
cleaned

In [None]:
cleaned, command

In [None]:
command in cleaned

In [None]:
re.sub(re.escape(command), '', cleaned)

In [None]:
cleaned.replace(command, '')

In [None]:
command

In [None]:
list(command.encode())

zip_command_and_cleaned = list(zip(list(command.encode()), list(cleaned.encode())))
print(zip_command_and_cleaned)


In [None]:
len(cleaned), len(command), len(zip_command_and_cleaned)

In [None]:
bytes(list(map(lambda x: x[0], zip_command_and_cleaned[-4:]))).decode(), bytes(list(map(lambda x: x[1], zip_command_and_cleaned[-4:]))).decode()

In [None]:
import itertools

# Using takewhile
result = list(
    map(
        lambda x: x[0],
        list(itertools.takewhile(lambda x: x[0] == x[1], zip_command_and_cleaned)),
    )
)

# Convert the result to a list and print it
print(result)

In [None]:
bytes(result).decode()

In [None]:
re.sub(command.encode(), b'', cleaned.encode())

In [None]:
reset_socket(container)


In [None]:
a = b"\r\n\x1b[?2004l\r\x1b[?2004h\x1b]0;root@46e6e0db0346: /\x07root@46e6e0db0346:/# ps -eo user=,pid= | grep -v \'^root\' | awk \'{print $2}\' | sor\rrt -n | head -n 1 | awk \'{print $1}\'\r\n\x1b[?2004l\r"

In [None]:
print(a)

In [None]:
re.sub(b"\r", b"", a)

In [None]:
import re

def clean_terminal_output(output):
    # Regex pattern to match ANSI escape sequences and terminal control characters
    ansi_escape = re.compile(r'(\x1b\[[0-?]*[ -/]*[@-~])|(\r)')
    cleaned_output = ansi_escape.sub('', output)
    return cleaned_output

# Sample output string
output = b'pwd\r\n\x1b[?2004l\r/home\r\n\x1b[?2004hroot@89032952e974:/home# '

# Cleaning the output
cleaned_output = clean_terminal_output(output.decode("utf-8"))
cleaned_output

In [None]:
print(cleaned_output)

In [None]:
import re

def extract_relevant_output(output):
    # General pattern to match a typical command prompt (e.g., 'root@hostname:~# ')
    # Adjust the pattern if your prompt significantly differs from this format
    pattern = re.compile(r'(\S+@\S+:[^\n]+[#\$>])\s+.*?\1', re.DOTALL)

    # Search for the pattern in the output
    match = pattern.search(output)
    if match:
        # Extract the content, removing the prompts
        content = match.group()
        content = content.replace(match.group(1), '').strip()
        return content
    else:
        return output

# Usage with your output
command_output = """
Last login: Sat Dec 23 18:05:28 2023 from 172.17.0.1
cd ~
echo hello world
echo this is a test > test.txt
cat test.txt
root@89032952e974:~# cd ~
root@89032952e974:~# echo hello world
hello world
root@89032952e974:~# echo this is a test > test.txt
root@89032952e974:~# cat test.txt
this is a test
root@89032952e974:~#
""".strip()

cleaned_output = extract_relevant_output(command_output)
print(cleaned_output)


In [None]:
def extract_relevant_output_v4(output):
    # Adjusted pattern to match the Linux command prompt
    pattern = re.compile(r'(\w+@\w+:[~\/\w-]*[#\$])')

    # Split the output based on the pattern
    parts = pattern.split(output)

    # Filter out empty strings and reconstruct the output
    cleaned_output = ''.join([part for part in parts if part.strip()])

    # Exclude the first and last prompts from the reconstructed output
    prompts = pattern.findall(cleaned_output)
    
    if prompts:
        first_prompt_pos = cleaned_output.find(prompts[0])
        last_prompt_pos = cleaned_output.rfind(prompts[-1])
        cleaned_output = cleaned_output[first_prompt_pos:last_prompt_pos].strip()

    return cleaned_output

# Apply the updated function to the provided output
cleaned_output_v4 = extract_relevant_output_v4(command_output)
print(cleaned_output_v4)


In [None]:
pattern = re.compile(r'(\w+@\w+:[~\/\w-]*[#\$])')

    # Split the output based on the pattern
parts = pattern.split(command_output)

# Filter out empty strings and reconstruct the output
cleaned_output = ''.join([part for part in parts if part.strip()])

# Exclude the first and last prompts from the reconstructed output
prompts = pattern.findall(cleaned_output)
prompts