<a href="https://colab.research.google.com/github/ashuotaku/sillytavern/blob/main/Scripts/JanitorAI/janitor_scraper.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!pip install flask flask_cors flask_cloudflared



In [13]:
import os
import re
import json
import uuid
import logging
from datetime import datetime
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
from flask_cloudflared import run_with_cloudflared

# ──────────────────────────────────────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────────────────────────────────────

PORT = int(os.environ.get("PORT", 5000))
API_KEY = os.environ.get("API_KEY", "custom-key")
VALID_API_KEYS = {API_KEY}

# Directory for storing logs
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = "/content"
LOGS_DIR = os.path.join(BASE_DIR, "logs")

# Ensure logs directory exists
os.makedirs(LOGS_DIR, exist_ok=True)

# ──────────────────────────────────────────────────────────────────────────────
# Flask App Setup
# ──────────────────────────────────────────────────────────────────────────────

app = Flask(__name__)
run_with_cloudflared(app)
CORS(app)


# ──────────────────────────────────────────────────────────────────────────────
# Utility Functions
# ──────────────────────────────────────────────────────────────────────────────

def authenticate_api_key():
    """
    Extracts the API key from the Authorization header and checks validity.
    If invalid, abort with 401.
    """
    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        abort(401, description=json.dumps({
            "error": {
                "message": "Invalid API key",
                "type": "invalid_request_error"
            }
        }))
    api_key = auth_header.split("Bearer ")[1].strip()
    if api_key not in VALID_API_KEYS:
        abort(401, description=json.dumps({
            "error": {
                "message": "Invalid API key",
                "type": "invalid_request_error"
            }
        }))


def extract_all_characters(messages):
    """
    From the array of message dicts, filter out system‐role messages,
    strip out <system>…</system> blocks, then find <TagName>…</TagName> blocks
    and look for tags that have “Name:”, “Age:”, etc., or are very long.
    Also picks up patterns like Name("Character Name").
    Returns a list of tag names.
    """
    characters = []

    # Filter system messages
    system_messages = [m for m in messages if m.get("role") == "system"]
    tag_pattern = re.compile(r"<([^>]+)>([\s\S]*?)<\/\1>", re.DOTALL)
    name_quotes_pattern = re.compile(r'Name\s*\(\s*"([^"]+)"\s*\)')

    for msg in system_messages:
        content = msg.get("content", "")
        # Remove <system>…</system> entirely so nested tags don’t confuse us
        content_without_system = re.sub(r"<system>[\s\S]*?<\/system>", "", content, flags=re.DOTALL)

        # Find all generic <Tag>…</Tag>
        for match in tag_pattern.finditer(content_without_system):
            tag_name = match.group(1).strip()
            # filter out common non-character tags
            if tag_name.lower() in {"system", "scenario", "example_dialogs", "roleplay_guidelines", "/"}:
                continue

            tag_contents = match.group(2)
            # If the tag includes “Name:”, “Age:”, “Personality:”, “Character Details”,
            # or is very long (>200 chars), treat it as a character block
            if (
                "Name:" in tag_contents
                or "Age:" in tag_contents
                or "Personality:" in tag_contents
                or "Character Details" in tag_contents
                or len(tag_contents) > 200
            ):
                characters.append(tag_name)

        # Also look for explicit Name("…") patterns
        for nm in name_quotes_pattern.findall(content_without_system):
            characters.append(nm.strip())

    return characters


def normalize_name(s):
    """
    Lowercase, strip out non-word characters (incl. emojis), collapse whitespace.
    """
    no_special = re.sub(r"[^\w\s]", "", s)
    return re.sub(r"\s+", " ", no_special).strip().lower()


def detect_user_character(messages, all_characters):
    """
    Scan from the last message backward. Whenever you see a user role message
    whose content starts with “SomeName:”, try to match SomeName to known characters.
    If a close match is found, return it. Otherwise, return the raw prefix.
    """
    for msg in reversed(messages):
        if msg.get("role") != "user":
            continue
        content = msg.get("content", "")
        if not content:
            continue
        prefix_match = re.match(r"^([^:]+):", content)
        if prefix_match:
            potential = prefix_match.group(1).strip()
            normalized_pot = normalize_name(potential)

            for char in all_characters:
                normalized_char = normalize_name(char)
                if (
                    normalized_pot == normalized_char
                    or normalized_pot in normalized_char
                    or normalized_char in normalized_pot
                ):
                    return char
            # no exact match, but return the raw prefix anyway
            return potential
    return None


def determine_ai_character(all_characters, user_character):
    """
    If there's only one character, return it. If user_character is known,
    pick any other character from the list that doesn’t match. Otherwise,
    return the first character longer than 2 chars.
    """
    if not all_characters:
        return "unknown"
    if len(all_characters) == 1:
        return all_characters[0]
    if user_character:
        norm_user = normalize_name(user_character)
        for char in all_characters:
            norm_char = normalize_name(char)
            if norm_user != norm_char and norm_char not in norm_user and norm_user not in norm_char:
                return char
    # fallback: first char name with length > 2
    for char in all_characters:
        if len(char) > 2:
            return char
    return all_characters[0]


def anonymize_user_character(text, user_character):
    """
    Replace occurrences of the user_character (as a standalone word or
    at start of line “Name:”) with {{user}}.
    """
    if not user_character or user_character not in text:
        return text

    esc = re.escape(user_character)
    # Replace at start of lines: “Name:” → “{{user}}:”
    text = re.sub(rf"^{esc}:", "{{user}}:", text, flags=re.MULTILINE)
    # Replace whole‐word occurrences
    text = re.sub(rf"\b{esc}\b", "{{user}}", text)
    # Also replace any leftover fragments (for names with punctuation/emoji)
    text = re.sub(esc, "{{user}}", text)
    return text


def format_message_content(messages, user_character):
    """
    For each message, depending on role, wrap/format accordingly:
      - system → “### SYSTEM MESSAGE ###” + each <Tag>…</Tag> on its own lines
      - assistant → “### ASSISTANT MESSAGE ###” + wrap content in <firstmessage>…</firstmessage>
      - user → “### USER MESSAGE ###” + raw content
    After each message block, add “========================================”
    Convert “\n” escape sequences back into newlines, then anonymize if needed.
    """
    formatted = ""
    tag_pattern = re.compile(r"<([^>]+)>([\s\S]*?)<\/\1>", re.DOTALL)

    for msg in messages:
        role = msg.get("role")
        raw = msg.get("content", "")
        # Convert literal “\n” into real newlines
        raw = raw.replace(r"\n", "\n")
        if user_character:
            raw = anonymize_user_character(raw, user_character)

        if role == "system":
            formatted += "### SYSTEM MESSAGE ###\n\n"
            # Try to find any <Tag>…</Tag> pairs:
            found_any = False
            for m in tag_pattern.finditer(raw):
                found_any = True
                tag_name = m.group(1).strip()
                tag_contents = m.group(2)
                # Already replaced newlines, now include them literally
                formatted += f"<{tag_name}>\n{tag_contents}\n</{tag_name}>\n\n"
            if not found_any:
                # Dump raw as‐is
                formatted += raw + "\n\n"
        elif role == "assistant":
            formatted += "### ASSISTANT MESSAGE ###\n\n"
            formatted += "<firstmessage>\n"
            formatted += raw + "\n"
            formatted += "</firstmessage>\n\n"
        elif role == "user":
            formatted += "### USER MESSAGE ###\n\n"
            formatted += raw + "\n\n"
        formatted += "=" * 40 + "\n\n"
    return formatted


def sanitize_filename(name):
    """
    Lowercase, strip non‐alphanumeric, collapse spaces into underscores.
    """
    no_special = re.sub(r"[^\w\s]", "", name)
    collapsed = re.sub(r"\s+", "_", no_special.strip().lower())
    return re.sub(r"_+", "_", collapsed)


def log_request(messages, character_name="unknown", user_character=None):
    """
    Writes two files:
      1) request_<safe_char_name>.log — human‐readable, with a header if new.
      2) request_<safe_char_name>_raw.json — the raw messages array as pretty‐printed JSON.
    Returns (filename, is_new_character).
    """
    # Timestamp
    timestamp = datetime.utcnow().isoformat() + "Z"
    formatted_content = format_message_content(messages, user_character)

    # Build a sanitized character name for filenames
    safe_char = sanitize_filename(character_name) or "unknown"
    log_filename = os.path.join(LOGS_DIR, f"request_{safe_char}.log")
    raw_filename = os.path.join(LOGS_DIR, f"request_{safe_char}_raw.json")

    is_new = not os.path.exists(log_filename)
    header = ""
    if is_new:
        header = f"===== LOG FILE FOR CHARACTER: {character_name} =====\nCreated: {timestamp}\n\n"

    log_entry = f"==== Request at {timestamp} ====\n\n{formatted_content}\n\n"
    # Write (append) to the .log file
    with open(log_filename, "a", encoding="utf-8") as f_log:
        f_log.write(header + log_entry)

    # Overwrite the raw JSON file each time
    with open(raw_filename, "w", encoding="utf-8") as f_raw:
        json.dump(messages, f_raw, indent=2, ensure_ascii=False)

    return log_filename, is_new


# ──────────────────────────────────────────────────────────────────────────────
# Routes
# ──────────────────────────────────────────────────────────────────────────────

@app.errorhandler(400)
@app.errorhandler(401)
@app.errorhandler(500)
def handle_error(e):
    """
    Return JSON errors for abort() calls that passed a JSON string in description.
    """
    try:
        payload = json.loads(e.description)
    except Exception:
        payload = {
            "error": {
                "message": getattr(e, "description", "An error occurred"),
                "type": "server_error"
            }
        }
    status_code = e.code if hasattr(e, "code") else 500
    return jsonify(payload), status_code


@app.route("/v1/models", methods=["GET"])
def list_models():
    authenticate_api_key()

    response = {
        "object": "list",
        "data": [
            {
                "id": "mock-model-1",
                "object": "model",
                "created": int(datetime.utcnow().timestamp() * 1000),
                "owned_by": "custom-owner"
            }
        ]
    }
    return jsonify(response)


@app.route("/v1/chat/completions", methods=["POST"])
def chat_completions():
    authenticate_api_key()

    body = request.get_json(force=True)
    messages = body.get("messages")
    if not isinstance(messages, list):
        abort(400, description=json.dumps({
            "error": {
                "message": "Messages array is required",
                "type": "invalid_request_error"
            }
        }))

    # 1) Extract all characters from system messages
    all_characters = extract_all_characters(messages)

    # 2) Detect user character
    user_character = detect_user_character(messages, all_characters)

    # 3) Determine AI's character
    ai_character = determine_ai_character(all_characters, user_character)

    # 4) Build a mock response
    mock_response = {
        "id": "mock-" + uuid.uuid4().hex[:9],
        "object": "chat.completion",
        "created": int(datetime.utcnow().timestamp()),
        "model": "mock-model-1",
        "choices": [
            {
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": "This is a mock response from the custom OpenAI-compatible server"
                },
                "finish_reason": "stop"
            }
        ],
        "usage": {
            "prompt_tokens": 0,
            "completion_tokens": 0,
            "total_tokens": 0
        }
    }

    # 5) Console logging for debugging
    logging.debug(f"Request body: {json.dumps(body, indent=2, ensure_ascii=False)}")
    logging.debug(f"Detected characters: {all_characters}")
    logging.debug(f"User character: {user_character or 'Unknown'}")
    logging.debug(f"AI character: {ai_character}")

    # 6) Write to log files
    filename, is_new = log_request(messages, ai_character, user_character)
    if is_new:
        logging.info(f"New character detected: {ai_character} - Logs saved to '{filename}'")
    else:
        logging.info(f"Logs for {ai_character} appended to '{filename}'")

    return jsonify(mock_response)


# ──────────────────────────────────────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Set Flask logging to debug so we see our debug/info statements
    logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
    logging.info(f"Mock OpenAI server running on port {PORT}")
    logging.info(f"Valid API keys: {', '.join(VALID_API_KEYS)}")
    logging.info(f"Log files will be stored in: {LOGS_DIR}")
    app.run(host="0.0.0.0", port=PORT)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://172.28.0.12:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


 * Running on https://licenses-statements-logic-nb.trycloudflare.com
 * Traffic stats available on http://127.0.0.1:8996/metrics


INFO:werkzeug:127.0.0.1 - - [06/Jun/2025 12:26:04] "OPTIONS /v1/chat/completions HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [06/Jun/2025 12:26:05] "POST /v1/chat/completions HTTP/1.1" 200 -
