diff --git a/agent_core/core/impl/action/manager.py b/agent_core/core/impl/action/manager.py index 84e7c4a0..3693982a 100644 --- a/agent_core/core/impl/action/manager.py +++ b/agent_core/core/impl/action/manager.py @@ -32,6 +32,15 @@ nest_asyncio.apply() + +def _to_pretty_json(value: Any) -> str: + """Serialize a value to pretty-printed JSON for readable logs and event streams.""" + try: + return json.dumps(value, indent=2, ensure_ascii=False, default=str) + except (TypeError, ValueError): + return str(value) + + # Type aliases for hooks OnActionStartHook = Callable[[str, Any, Dict, str, str], Any] # (run_id, action, inputs, parent_id, started_at) -> awaitable OnActionEndHook = Callable[[str, Any, Dict, str, str, str], Any] # (run_id, action, outputs, status, parent_id, ended_at) -> awaitable @@ -205,10 +214,11 @@ async def execute_action( # Log to event stream # Only pass session_id when is_running_task=True (task stream exists) # When no task exists, use global stream by not passing task_id + pretty_input = _to_pretty_json(input_data) self._log_event_stream( is_gui_task=is_gui_task, event_type="action_start", - event=f"Running action {action.name} with input: {input_data}.", + event=f"Running action {action.name} with input: {pretty_input}.", display_message=f"Running {action.display_name}", action_name=action.name, session_id=session_id if is_running_task else None, @@ -293,10 +303,11 @@ async def execute_action( # Only pass session_id when is_running_task=True (task stream exists) output_has_error = outputs and outputs.get("status") == "error" display_status = "failed" if (status == "error" or output_has_error) else "completed" + pretty_output = _to_pretty_json(outputs) self._log_event_stream( is_gui_task=is_gui_task, event_type="action_end", - event=f"Action {action.name} completed with output: {outputs}.", + event=f"Action {action.name} completed with output: {pretty_output}.", display_message=f"{action.display_name} → {display_status}", action_name=action.name, session_id=session_id if is_running_task else None, diff --git a/agent_core/core/prompts/action.py b/agent_core/core/prompts/action.py index f7c0a15b..e2c5b215 100644 --- a/agent_core/core/prompts/action.py +++ b/agent_core/core/prompts/action.py @@ -205,8 +205,8 @@ * Default reads first 2000 lines - check has_more to know if more exists * Use offset to skip to specific line numbers * Use limit to control how many lines to read -- To find specific content in large files: - 1. Use grep_files with keywords to locate relevant sections +- To find specific content in files: + 1. Use grep_files with a regex pattern to locate relevant sections (use output_mode='content' for lines with line numbers, or 'files_with_matches' to discover files first) 2. Note the line numbers from grep results 3. Use read_file with appropriate offset to read that section - DO NOT repeatedly read entire large files - use targeted reading with offset/limit diff --git a/agent_file_system/AGENT.md b/agent_file_system/AGENT.md index 426f8b5d..910a24a6 100644 --- a/agent_file_system/AGENT.md +++ b/agent_file_system/AGENT.md @@ -26,7 +26,7 @@ Efficient File Reading: File Actions: - read_file: General reading with pagination (offset/limit) -- grep_files: Search for keywords, returns matching chunks with line numbers +- grep_files: Search files/directories for regex patterns with three output modes: 'files_with_matches' (discover files), 'content' (matching lines with line numbers), 'count' (match counts). Supports glob/file_type filtering, before/after context lines, case_insensitive, and multiline. - stream_read + stream_edit: Use together for file modifications Avoid: Reading entire large files repeatedly - use grep + targeted offset/limit reads instead diff --git a/app/data/action/grep_files.py b/app/data/action/grep_files.py index 1cd7c372..a60d891d 100644 --- a/app/data/action/grep_files.py +++ b/app/data/action/grep_files.py @@ -1,6 +1,68 @@ from agent_core import action -# Common output schema for all platforms +_INPUT_SCHEMA = { + "pattern": { + "type": "string", + "example": "def \\w+\\(", + "description": "Regex pattern to search for. Supports full regex syntax (e.g., 'def \\w+\\(' to find function definitions, 'TODO:.*' to find TODOs). For literal text search, just use the plain text (special regex chars will need escaping)." + }, + "path": { + "type": "string", + "example": "/workspace/project", + "description": "File or directory path to search in. If a directory, searches all files recursively. If a file, searches only that file. Defaults to current working directory if not provided." + }, + "glob": { + "type": "string", + "example": "*.py", + "description": "Glob pattern to filter which files to search (e.g., '*.py' for Python files, '*.{js,ts}' for JS/TS files, 'test_*.py' for test files). Only applies when path is a directory." + }, + "file_type": { + "type": "string", + "example": "py", + "description": "Filter by file extension type (e.g., 'py', 'js', 'json', 'md'). Shorthand alternative to glob — 'py' is equivalent to glob '*.py'. If both glob and file_type are provided, glob takes priority." + }, + "output_mode": { + "type": "string", + "example": "content", + "description": "Controls what is returned. 'files_with_matches' (default): returns only file paths that contain matches. 'content': returns matching lines with line numbers and optional context. 'count': returns the number of matches per file." + }, + "case_insensitive": { + "type": "boolean", + "example": True, + "description": "If true, search is case-insensitive. Default is false (case-sensitive)." + }, + "before_context": { + "type": "integer", + "example": 2, + "description": "Number of lines to show BEFORE each match. Only applies when output_mode is 'content'. Default is 0." + }, + "after_context": { + "type": "integer", + "example": 2, + "description": "Number of lines to show AFTER each match. Only applies when output_mode is 'content'. Default is 0." + }, + "context": { + "type": "integer", + "example": 3, + "description": "Number of context lines to show both before AND after each match (shorthand for setting before_context and after_context to the same value). Only applies when output_mode is 'content'. Overridden by explicit before_context/after_context if provided." + }, + "multiline": { + "type": "boolean", + "example": False, + "description": "If true, enables multiline mode where '.' matches newlines and patterns can span across lines. Default is false." + }, + "head_limit": { + "type": "integer", + "example": 50, + "description": "Maximum number of results to return. For 'files_with_matches': max file paths. For 'content': max output lines. For 'count': max file entries. Default is 250. Pass 0 for unlimited results (no truncation). If results are truncated, the applied_limit field in the response tells you it happened — use offset to paginate through the rest." + }, + "offset": { + "type": "integer", + "example": 0, + "description": "Number of results to skip before returning. Use with head_limit for pagination. Default is 0." + } +} + _OUTPUT_SCHEMA = { "status": { "type": "string", @@ -9,262 +71,348 @@ }, "message": { "type": "string", - "example": "Found 5 matching chunks", - "description": "Status message or error description." + "example": "Found matches in 5 files", + "description": "Summary message or error description." }, - "chunks": { - "type": "array", - "example": [ - "[line 275] ...some text chunk...", - "[line 937] ...another text chunk..." - ], - "description": "List of formatted chunks for the requested range." + "mode": { + "type": "string", + "example": "content", + "description": "The output mode that was used." }, - "total_matches": { + "num_files": { "type": "integer", - "example": 23, - "description": "Total number of matched chunks available." + "example": 5, + "description": "Number of files that contained matches." }, - "returned_range": { + "filenames": { "type": "array", - "example": [1, 5], - "description": "The 1-based [start, end] chunk indices that were requested (clamped to available matches)." - } -} - -# Common input schema for all platforms -_INPUT_SCHEMA = { - "input_file": { - "type": "string", - "example": "/path/to/input.txt", - "description": "Absolute path to the input text file to search." + "example": ["/workspace/project/main.py", "/workspace/project/utils.py"], + "description": "List of file paths that contained matches." }, - "keywords": { - "type": "array", - "example": ["Mt. Fuji", "visibility"], - "description": "List of plain-text keywords to search for (OR-ed together, case-insensitive).", - "default": [] + "content": { + "type": "string", + "example": "File: /workspace/main.py\n10:def hello():\n11- pass\n--\n25:def world():\n26- return 1\n", + "description": "Matching lines with line numbers. Match lines use ':' after the line number (e.g., '10:matched line'), context lines use '-' (e.g., '11-context line'). Non-contiguous groups are separated by '--'. For single-file searches, the filepath is shown once at the top to save tokens. For multi-file searches, each file section is prefixed with 'File: path'. Only populated when output_mode is 'content'." }, - "chunk_size": { + "num_lines": { "type": "integer", - "example": 300, - "description": "Approximate number of words per chunk.", - "default": 300 + "example": 15, + "description": "Number of content lines returned. Only populated when output_mode is 'content'." }, - "overlap": { + "num_matches": { "type": "integer", - "example": 50, - "description": "Number of overlapping words between consecutive chunks.", - "default": 50 + "example": 42, + "description": "Total number of matches across all files. Only populated when output_mode is 'count'." }, - "chunk_start": { + "applied_limit": { "type": "integer", - "example": 1, - "description": "1-based start index of the matched chunk range to return.", - "default": 1 + "example": 250, + "description": "The head_limit that was applied, or null if unlimited (head_limit=0). If your results were truncated to this limit, use offset to paginate through the rest." }, - "chunk_end": { + "applied_offset": { "type": "integer", - "example": 5, - "description": "1-based end index of the matched chunk range to return.", - "default": 5 + "example": 0, + "description": "The offset that was applied." } } @action( name="grep_files", - description="Searches a text file for keywords and returns matching chunks with pagination.", + description=( + "Searches files for a regex pattern and returns results. " + "Supports searching a single file or an entire directory recursively. " + "Three output modes: " + "'files_with_matches' (default) returns file paths containing matches — use for discovery. " + "'content' returns matching lines with line numbers and optional before/after context — use to read matched code. " + "In content mode, match lines use ':' after line number (e.g., '10:matched line'), " + "context lines use '-' (e.g., '11-context line'), and non-contiguous groups are separated by '--'. " + "'count' returns match counts per file — use for quick frequency checks. " + "Supports glob and file_type filtering, case-insensitive search, and multiline patterns. " + "Use with read_file: first grep_files to find relevant line numbers, then read_file with offset to read that section." + ), mode="CLI", platforms=["linux", "windows", "darwin"], action_sets=["core"], input_schema=_INPUT_SCHEMA, output_schema=_OUTPUT_SCHEMA, test_payload={ - "input_file": "/path/to/input.txt", - "keywords": ["Mt. Fuji", "visibility"], - "chunk_size": 300, - "overlap": 50, - "chunk_start": 1, - "chunk_end": 5, + "pattern": "Mt\\. Fuji|visibility", + "path": "/path/to/input.txt", + "output_mode": "content", + "case_insensitive": True, + "head_limit": 50, "simulated_mode": True } ) def grep_files(input_data: dict) -> dict: - """Searches a text file for keywords and returns matching chunks with pagination.""" + """Searches files for a regex pattern and returns results.""" import os import re + import fnmatch - def chunk_text(text, chunk_size=300, overlap=50): - """Split text into overlapping word chunks.""" - words = re.findall(r'\S+', text or '') - if not words: - return [] - if chunk_size <= 0: - chunk_size = 300 - if overlap < 0: - overlap = 0 - step = max(1, chunk_size - overlap) - n = len(words) - segments = [] - for start in range(0, n, step): - end = min(start + chunk_size, n) - chunk_words = words[start:end] - if not chunk_words: - break - chunk_text_val = ' '.join(chunk_words).strip() - if not chunk_text_val: + # --- Helper functions (must be inside for sandboxed execution) --- + + def make_error(message): + return { + 'status': 'error', + 'message': message, + 'mode': None, + 'num_files': 0, + 'filenames': [], + 'content': None, + 'num_lines': None, + 'num_matches': None, + 'applied_limit': None, + 'applied_offset': None + } + + def collect_files(directory, glob_pat=None, max_files=10000): + SKIP_DIRS = { + '.git', '.svn', '.hg', '__pycache__', 'node_modules', + '.venv', 'venv', '.env', '.tox', '.mypy_cache', + '.pytest_cache', 'dist', 'build', '.idea', '.vscode' + } + collected = [] + for root, dirs, files in os.walk(directory): + dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')] + for fname in files: + if fname.startswith('.'): + continue + if glob_pat and not fnmatch.fnmatch(fname, glob_pat): + continue + collected.append(os.path.join(root, fname)) + if len(collected) >= max_files: + return collected + return collected + + def format_content_lines(fpath, lines, sorted_indices, display_map, single_file, first_file): + result = [] + if single_file: + if first_file: + result.append(f'File: {fpath}') + else: + if not first_file: + result.append('--') + result.append(f'File: {fpath}') + + prev_ln = None + for ln in sorted_indices: + if ln >= len(lines): continue - has_leading = start > 0 - has_trailing = end < n - segments.append({ - 'text': chunk_text_val, - 'start_word_index': start + 1, - 'has_leading_ellipsis': bool(has_leading), - 'has_trailing_ellipsis': bool(has_trailing) - }) - return segments + if prev_ln is not None and ln > prev_ln + 1: + result.append('--') + separator = ':' if display_map[ln] else '-' + result.append(f'{ln + 1}{separator}{lines[ln]}') + prev_ln = ln + return result + + # --- Main logic --- simulated_mode = input_data.get('simulated_mode', False) if simulated_mode: return { 'status': 'success', - 'message': 'Found 1 matching chunk(s)', - 'chunks': ['[line 10] Test chunk with keyword'], - 'total_matches': 1, - 'returned_range': [1, 5] + 'message': 'Found matches in 2 files', + 'mode': 'content', + 'num_files': 2, + 'filenames': ['/path/to/input.txt', '/path/to/other.txt'], + 'content': 'File: /path/to/input.txt\n10:Mt. Fuji is visible today\n11-The mountain was clear\n--\nFile: /path/to/other.txt\n5:visibility is low\n', + 'num_lines': 5, + 'num_matches': None, + 'applied_limit': 50, + 'applied_offset': 0 } + # --- Parse and validate inputs --- + pattern_str = input_data.get('pattern') + if not pattern_str: + return make_error('pattern is required.') + + search_path = input_data.get('path') or os.getcwd() + output_mode = input_data.get('output_mode', 'files_with_matches') + if output_mode not in ('files_with_matches', 'content', 'count'): + output_mode = 'files_with_matches' + + case_insensitive = bool(input_data.get('case_insensitive', False)) + multiline_mode = bool(input_data.get('multiline', False)) + glob_pattern = input_data.get('glob') + file_type = input_data.get('file_type') + + # Context lines (only for content mode) try: - input_file = input_data.get('input_file') - if not input_file: - return { - 'status': 'error', - 'message': 'input_file is required', - 'chunks': [], - 'total_matches': 0, - 'returned_range': [0, 0] - } - - if not os.path.isfile(input_file): - return { - 'status': 'error', - 'message': f'Input file does not exist: {input_file}', - 'chunks': [], - 'total_matches': 0, - 'returned_range': [0, 0] - } - - keywords = input_data.get('keywords') or [] - if not keywords: - return { - 'status': 'error', - 'message': 'keywords must be a non-empty array', - 'chunks': [], - 'total_matches': 0, - 'returned_range': [0, 0] - } + ctx = int(input_data.get('context', 0)) + except (TypeError, ValueError): + ctx = 0 + try: + before_ctx = int(input_data.get('before_context', ctx)) + except (TypeError, ValueError): + before_ctx = ctx + try: + after_ctx = int(input_data.get('after_context', ctx)) + except (TypeError, ValueError): + after_ctx = ctx + before_ctx = max(0, before_ctx) + after_ctx = max(0, after_ctx) + # Pagination + raw_limit = input_data.get('head_limit') + try: + head_limit = int(raw_limit) if raw_limit is not None else 250 + except (TypeError, ValueError): + head_limit = 250 + try: + offset = int(input_data.get('offset', 0)) + except (TypeError, ValueError): + offset = 0 + if head_limit < 0: + head_limit = 250 + unlimited = (head_limit == 0) + if offset < 0: + offset = 0 + + # --- Compile regex --- + flags = 0 + if case_insensitive: + flags |= re.IGNORECASE + if multiline_mode: + flags |= re.DOTALL | re.MULTILINE + + try: + regex = re.compile(pattern_str, flags) + except re.error as e: + return make_error(f'Invalid regex pattern: {e}') + + # --- Collect files to search --- + if not os.path.exists(search_path): + return make_error(f'Path does not exist: {search_path}') + + if os.path.isfile(search_path): + files_to_search = [search_path] + else: + if glob_pattern: + active_glob = glob_pattern + elif file_type: + active_glob = f'*.{file_type.lstrip(".")}' + else: + active_glob = None + files_to_search = collect_files(search_path, active_glob) + + # --- Search each file --- + matched_filenames = [] + content_lines = [] + total_match_count = 0 + count_entries = [] + is_single_file = len(files_to_search) == 1 + + for fpath in files_to_search: try: - chunk_size = int(input_data.get('chunk_size', 300)) - except (TypeError, ValueError): - chunk_size = 300 - try: - overlap = int(input_data.get('overlap', 50)) - except (TypeError, ValueError): - overlap = 50 - try: - start_idx = int(input_data.get('chunk_start', 1)) - except (TypeError, ValueError): - start_idx = 1 - try: - end_idx = int(input_data.get('chunk_end', 5)) - except (TypeError, ValueError): - end_idx = 5 - - # Normalize values - if chunk_size <= 0: - chunk_size = 300 - if overlap < 0: - overlap = 0 - if start_idx < 1: - start_idx = 1 - if end_idx < 1: - end_idx = 1 - if end_idx < start_idx: - start_idx, end_idx = end_idx, start_idx - - with open(input_file, 'r', encoding='utf-8', errors='ignore') as f: - content = f.read() - - segments = chunk_text(content, chunk_size=chunk_size, overlap=overlap) - - if not segments: - return { - 'status': 'success', - 'message': 'File is empty or has no content', - 'chunks': [], - 'total_matches': 0, - 'returned_range': [start_idx, end_idx] - } - - pattern = re.compile('(' + '|'.join(re.escape(k) for k in keywords) + ')', re.I) - matched_segments = [s for s in segments if pattern.search(s['text'])] - - total_matches = len(matched_segments) - if total_matches == 0: - return { - 'status': 'success', - 'message': 'No matches found for the given keywords', - 'chunks': [], - 'total_matches': 0, - 'returned_range': [start_idx, end_idx] - } - - start_idx_clamped = max(1, min(start_idx, total_matches)) - end_idx_clamped = max(1, min(end_idx, total_matches)) - if end_idx_clamped < start_idx_clamped: - start_idx_clamped, end_idx_clamped = end_idx_clamped, start_idx_clamped - - start_zero = start_idx_clamped - 1 - end_zero_excl = end_idx_clamped - - page_segments = matched_segments[start_zero:end_zero_excl] - - def clean_text(s): - s = s.strip() - s = re.sub(r'\s+', ' ', s) - return s - - formatted_chunks = [] - for seg in page_segments: - text_clean = clean_text(seg['text']) - if not text_clean: + with open(fpath, 'r', encoding='utf-8', errors='ignore') as f: + file_content = f.read() + except (OSError, IOError): + continue + + if not file_content: + continue + + lines = file_content.split('\n') + + if multiline_mode: + matches = list(regex.finditer(file_content)) + if not matches: continue - display_text = text_clean - if seg.get('has_leading_ellipsis'): - display_text = '...' + display_text - if seg.get('has_trailing_ellipsis'): - if not display_text.endswith('...'): - display_text = display_text + '...' - line_no = int(seg.get('start_word_index', 1)) - para = f"[line {line_no}] {display_text}" - formatted_chunks.append(para) + matched_line_nums = set() + for m in matches: + start_line = file_content[:m.start()].count('\n') + end_line = file_content[:m.end()].count('\n') + for ln in range(start_line, end_line + 1): + matched_line_nums.add(ln) + else: + matched_line_nums = set() + for i, line in enumerate(lines): + if regex.search(line): + matched_line_nums.add(i) + + if not matched_line_nums: + continue + + matched_filenames.append(fpath) + match_count = len(matched_line_nums) + total_match_count += match_count + + if output_mode == 'count': + count_entries.append(f'{fpath}: {match_count}') + elif output_mode == 'content': + display_map = {} + for ln in matched_line_nums: + display_map[ln] = True + for ctx_ln in range(max(0, ln - before_ctx), min(len(lines), ln + after_ctx + 1)): + if ctx_ln not in display_map: + display_map[ctx_ln] = False + sorted_indices = sorted(display_map.keys()) + file_lines = format_content_lines( + fpath, lines, sorted_indices, display_map, is_single_file, + first_file=(len(content_lines) == 0) + ) + content_lines.extend(file_lines) + + # --- Apply pagination and build output --- + def paginate(items): + after_offset = items[offset:] + if unlimited: + return after_offset + return after_offset[:head_limit] + + effective_limit = None if unlimited else head_limit + + if output_mode == 'files_with_matches': + total = len(matched_filenames) + paginated = paginate(matched_filenames) return { 'status': 'success', - 'message': f'Found {total_matches} matching chunk(s)', - 'chunks': formatted_chunks, - 'total_matches': total_matches, - 'returned_range': [start_idx_clamped, end_idx_clamped] + 'message': f'Found matches in {total} file(s)', + 'mode': 'files_with_matches', + 'num_files': total, + 'filenames': paginated, + 'content': None, + 'num_lines': None, + 'num_matches': None, + 'applied_limit': effective_limit, + 'applied_offset': offset } - except Exception as e: + elif output_mode == 'content': + total_lines = len(content_lines) + paginated = paginate(content_lines) + content_str = '\n'.join(paginated) + if paginated: + content_str += '\n' return { - 'status': 'error', - 'message': str(e), - 'chunks': [], - 'total_matches': 0, - 'returned_range': [0, 0] + 'status': 'success', + 'message': f'Found {total_match_count} match(es) in {len(matched_filenames)} file(s)', + 'mode': 'content', + 'num_files': len(matched_filenames), + 'filenames': matched_filenames, + 'content': content_str, + 'num_lines': len(paginated), + 'num_matches': None, + 'applied_limit': effective_limit, + 'applied_offset': offset + } + + else: # count + paginated = paginate(count_entries) + return { + 'status': 'success', + 'message': f'Total: {total_match_count} match(es) in {len(matched_filenames)} file(s)', + 'mode': 'count', + 'num_files': len(matched_filenames), + 'filenames': matched_filenames, + 'content': '\n'.join(paginated) + '\n' if paginated else '', + 'num_lines': None, + 'num_matches': total_match_count, + 'applied_limit': effective_limit, + 'applied_offset': offset } diff --git a/app/data/action/skill_management.py b/app/data/action/skill_management.py new file mode 100644 index 00000000..7daca570 --- /dev/null +++ b/app/data/action/skill_management.py @@ -0,0 +1,126 @@ +# core/data/action/skill_management.py +""" +Skill Management Actions + +These actions allow the agent to dynamically list and switch skills during task execution. +Both actions belong to the 'core' set and are always available. +""" + +from agent_core import action + + +@action( + name="list_skills", + description=( + "List all enabled skills with their names and descriptions. " + "Use this to discover available skills before using 'use_skill'." + ), + default=False, + mode="ALL", + action_sets=["core"], + input_schema={}, + output_schema={ + "skills": { + "type": "object", + "description": "Dictionary of enabled skill names to their descriptions.", + }, + }, + test_payload={ + "simulated_mode": True, + }, +) +def list_skills(input_data: dict) -> dict: + """List all enabled skills with their names and descriptions.""" + simulated_mode = input_data.get("simulated_mode", False) + + if simulated_mode: + return { + "skills": { + "pdf": "Read and create PDF documents", + "docx": "Read and create Word documents", + }, + } + + import app.internal_action_interface as iai + + try: + result = iai.InternalActionInterface.list_skills() + return result + except Exception as e: + return {"error": str(e)} + + +@action( + name="use_skill", + description=( + "Activate a skill for the current task, replacing the current skill in the system prompt. " + "ONLY use this action when the current skill need to be completely replaced with a new skill. " + "If you only need to read a skill's instructions while keeping the current skill in context, " + "find the skill directory and use 'read_file' on the skill's SKILL.md file instead. " + "Use 'list_skills' first to see enabled skill first." + ), + default=False, + mode="ALL", + action_sets=["core"], + parallelizable=False, + input_schema={ + "skill_name": { + "type": "string", + "description": "Name of the skill to activate.", + "example": "pdf", + }, + }, + output_schema={ + "success": { + "type": "boolean", + "description": "Whether the skill was activated successfully.", + }, + "active_skill": { + "type": "string", + "description": "Name of the now-active skill.", + }, + "skill_description": { + "type": "string", + "description": "Description of the activated skill.", + }, + "previous_skills": { + "type": "array", + "description": "List of previously active skill names that were replaced.", + }, + "added_action_sets": { + "type": "array", + "description": "Action sets that were added as recommended by the skill.", + }, + }, + test_payload={ + "skill_name": "pdf", + "simulated_mode": True, + }, +) +def use_skill(input_data: dict) -> dict: + """Activate a skill, replacing the current skill in the system prompt.""" + skill_name = input_data.get("skill_name", "") + simulated_mode = input_data.get("simulated_mode", False) + + if not skill_name: + return { + "success": False, + "error": "No skill_name specified.", + } + + if simulated_mode: + return { + "success": True, + "active_skill": skill_name, + "skill_description": "Simulated skill description", + "previous_skills": [], + "added_action_sets": [], + } + + import app.internal_action_interface as iai + + try: + result = iai.InternalActionInterface.use_skill(skill_name) + return result + except Exception as e: + return {"success": False, "error": str(e)} diff --git a/app/data/action/web_fetch.py b/app/data/action/web_fetch.py index 6dd9906b..361139fd 100644 --- a/app/data/action/web_fetch.py +++ b/app/data/action/web_fetch.py @@ -2,16 +2,15 @@ @action( name="web_fetch", - description="""Fetches content from a URL and returns processed markdown content. -- Takes a URL and an optional prompt describing what information to extract -- Fetches the URL content and converts HTML to markdown -- Uses two-tier extraction: fast static extraction first, then Jina Reader API for JS-rendered sites -- Handles redirects: when redirecting to a different host, returns redirect info -- HTTP URLs are automatically upgraded to HTTPS -- Use web_search action first to find relevant URLs, then use this to read full content - -IMPORTANT: This action may fail for authenticated or private URLs. For sites requiring -authentication (Google Docs, Confluence, Jira, etc.), use specialized authenticated tools.""", + description=( + "Fetches a URL and returns cleaned text/markdown content. " + "Use web_search first to find URLs, then web_fetch to read them. " + "Two modes: 'full' (default) returns extracted page content up to max_content_length chars. " + "'title' returns only the page title (cheap, no content extraction). " + "When content exceeds max_content_length, the full content is saved to a temp file " + "and content_file path is returned — use grep_files to search it or read_file with offset/limit to paginate. " + "HTTP is auto-upgraded to HTTPS (except localhost). Follows up to 10 redirects automatically." + ), mode="CLI", action_sets=["core"], input_schema={ @@ -21,182 +20,214 @@ "description": "The URL to fetch content from. Must be a valid http(s) URL.", "required": True }, - "prompt": { + "mode": { "type": "string", - "example": "Extract the main points and key takeaways from this article", - "description": "Optional prompt describing what information to extract from the page. If provided, content will be structured around this prompt." + "example": "full", + "description": "What to return. 'full' (default): extracted page content up to max_content_length, overflow saved to content_file. 'title': only the page title, no content extraction." }, "timeout": { "type": "number", - "example": 30, - "description": "Request timeout in seconds. Defaults to 30." + "example": 20, + "description": "Request timeout in seconds. Defaults to 20." }, "max_content_length": { "type": "integer", - "example": 50000, - "description": "Maximum content length in characters. Content exceeding this will be truncated. Defaults to 50000." + "example": 5000, + "description": "Maximum content length in characters returned inline. Content beyond this is saved to content_file — use grep_files to search it or read_file with offset/limit to paginate through it. Defaults to 5000. Pass 0 to return all content inline (use sparingly — large pages waste tokens)." }, "use_jina_fallback": { "type": "boolean", "example": True, - "description": "Use Jina Reader API as fallback for JS-rendered sites. Defaults to True." - }, - "min_content_length": { - "type": "integer", - "example": 200, - "description": "Minimum content length to consider extraction successful. Below this triggers fallback. Defaults to 200." + "description": "Use Jina Reader API as fallback for JS-rendered sites when static extraction yields too little content. Defaults to True." } }, output_schema={ "status": { "type": "string", "example": "success", - "description": "'success', 'redirect', or 'error'." + "description": "'success' or 'error'." }, - "url": { - "type": "string", - "description": "The original requested URL." + "status_code": { + "type": "integer", + "example": 200, + "description": "HTTP status code (e.g., 200, 404, 500)." }, - "final_url": { + "status_text": { "type": "string", - "description": "The final URL after any redirects (same host only)." + "example": "OK", + "description": "HTTP status reason (e.g., 'OK', 'Not Found')." }, - "redirect_url": { + "url": { "type": "string", - "description": "Present when status='redirect'. The URL to follow for cross-host redirects." + "description": "The final URL after following redirects." }, "title": { "type": "string", - "description": "The page title." + "description": "The page title, if extracted." }, "content": { "type": "string", - "description": "The extracted content in markdown format." + "description": "The extracted page content in markdown/text format, up to max_content_length chars. Empty when mode is 'title'." }, "content_length": { "type": "integer", - "description": "Length of the content in characters." + "description": "Length of the inline content in characters." + }, + "total_content_length": { + "type": "integer", + "description": "Total length of the full extracted content before truncation. Compare with content_length to know how much was cut." }, "was_truncated": { "type": "boolean", - "description": "True if content was truncated due to max_content_length." + "description": "True if content was truncated to max_content_length. When true, content_file contains the full content — use grep_files to search it or read_file with offset/limit to paginate." }, - "prompt_used": { + "content_file": { "type": "string", - "description": "The prompt that was applied (if any)." + "description": "Absolute path to the full content file when was_truncated is true. Use grep_files(pattern, path=content_file) to search for specific information, or read_file(file_path=content_file, offset=N, limit=M) to paginate. Null if content was not truncated." }, "message": { "type": "string", "description": "Error or informational message." - }, - "extraction_method": { - "type": "string", - "description": "Method used for extraction: 'static' (trafilatura/BeautifulSoup) or 'jina' (Jina Reader API)." } }, requirement=["requests", "beautifulsoup4", "trafilatura", "lxml"], test_payload={ "url": "https://example.com/article", - "prompt": "Summarize the main content", - "timeout": 30, + "timeout": 20, "simulated_mode": True } ) def web_fetch(input_data: dict) -> dict: - """ - Fetches content from a URL and returns processed markdown content. - Uses two-tier extraction: fast static extraction first, then Jina Reader API for JS-rendered sites. - """ + """Fetches a URL and returns cleaned text/markdown content.""" import re + import os + import tempfile from urllib.parse import urlparse + from datetime import datetime, timezone - simulated_mode = input_data.get('simulated_mode', False) - url = str(input_data.get('url', '')).strip() - prompt = str(input_data.get('prompt', '')).strip() if input_data.get('prompt') else None - timeout = float(input_data.get('timeout', 30)) - max_content_length = int(input_data.get('max_content_length', 50000)) - use_jina_fallback = input_data.get('use_jina_fallback', True) - min_content_length = int(input_data.get('min_content_length', 200)) + # --- Helper functions (must be inside for sandboxed execution) --- - def _make_error(message, url=''): + def make_error(message, err_url='', status_code=0, status_text=''): return { 'status': 'error', - 'url': url, - 'final_url': '', + 'status_code': status_code, + 'status_text': status_text, + 'url': err_url, 'title': '', 'content': '', 'content_length': 0, + 'total_content_length': 0, 'was_truncated': False, - 'prompt_used': prompt or '', + 'content_file': None, 'message': message } - def _make_redirect(original_url, redirect_url): + def make_result(res_url, title, content, total_content_length, + status_code, status_text, + was_truncated=False, content_file=None, message=''): return { - 'status': 'redirect', - 'url': original_url, - 'final_url': '', - 'redirect_url': redirect_url, - 'title': '', - 'content': '', - 'content_length': 0, - 'was_truncated': False, - 'prompt_used': prompt or '', - 'message': f'Redirect to different host detected. Please make a new request to: {redirect_url}' + 'status': 'success', + 'status_code': status_code, + 'status_text': status_text, + 'url': res_url, + 'title': title or '', + 'content': content, + 'content_length': len(content), + 'total_content_length': total_content_length, + 'was_truncated': was_truncated, + 'content_file': content_file, + 'message': message } - # Validate URL - if not url: - return _make_error('URL is required.') + def save_content_file(content, file_url, sess_id): + save_dir = None + if sess_id: + try: + current = os.path.abspath(__file__) + for _ in range(10): + current = os.path.dirname(current) + if os.path.isdir(os.path.join(current, 'agent_file_system')): + save_dir = os.path.join(current, 'agent_file_system', 'workspace', 'tmp', sess_id) + break + except Exception: + pass - # Auto-upgrade HTTP to HTTPS - if url.startswith('http://'): - url = 'https://' + url[7:] + if not save_dir: + save_dir = tempfile.gettempdir() - if not re.match(r'^https?://', url, re.I): - return _make_error('A valid http(s) URL is required.', url) + os.makedirs(save_dir, exist_ok=True) - # Parse original URL for host comparison - try: - original_parsed = urlparse(url) - original_host = original_parsed.netloc.lower() - except Exception as e: - return _make_error(f'Invalid URL format: {str(e)}', url) + try: + domain = urlparse(file_url).hostname or 'unknown' + domain = domain.replace('.', '_') + except Exception: + domain = 'unknown' - # Simulated mode for testing - if simulated_mode: - mock_content = f"""# Test Page Title + ts = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S%f') + filename = f'web_fetch_{domain}_{ts}.md' + file_path = os.path.join(save_dir, filename) -This is simulated content fetched from {url}. + with open(file_path, 'w', encoding='utf-8') as f: + f.write(f'\n\n') + f.write(content) -## Main Content + return file_path -This is the main body of the page content, converted to markdown format. + # --- Main logic --- -- Point 1: Important information -- Point 2: More details -- Point 3: Additional context + simulated_mode = input_data.get('simulated_mode', False) + url = str(input_data.get('url', '')).strip() + fetch_mode = str(input_data.get('mode', 'full')).strip().lower() + if fetch_mode not in ('full', 'title'): + fetch_mode = 'full' + timeout = float(input_data.get('timeout', 20)) + raw_max = input_data.get('max_content_length') + try: + max_content_length = int(raw_max) if raw_max is not None else 5000 + except (TypeError, ValueError): + max_content_length = 5000 + if max_content_length < 0: + max_content_length = 5000 + unlimited = (max_content_length == 0) + use_jina_fallback = input_data.get('use_jina_fallback', True) + session_id = input_data.get('_session_id', '') -## Summary + # --- Validate URL --- + if not url: + return make_error('URL is required.') -This is a test page demonstrating the web_fetch action functionality. -""" - if prompt: - mock_content = f"**Prompt:** {prompt}\n\n---\n\n{mock_content}" + # Auto-upgrade HTTP to HTTPS (except localhost) + if url.startswith('http://'): + try: + parsed = urlparse(url) + host = parsed.hostname or '' + if host not in ('localhost', '127.0.0.1', '::1'): + url = 'https://' + url[7:] + except Exception: + url = 'https://' + url[7:] - return { - 'status': 'success', - 'url': url, - 'final_url': url, - 'title': 'Test Page Title', - 'content': mock_content, - 'content_length': len(mock_content), - 'was_truncated': False, - 'prompt_used': prompt or '', - 'message': '' - } + if not re.match(r'^https?://', url, re.I): + return make_error('A valid http(s) URL is required.', url) - # Fetch the URL + # --- Simulated mode --- + if simulated_mode: + mock_content = ( + "# Test Page Title\n\n" + "This is simulated content fetched from the URL.\n\n" + "## Main Content\n\n" + "- Point 1: Important information\n" + "- Point 2: More details\n" + "- Point 3: Additional context\n\n" + "## Summary\n\n" + "This is a test page demonstrating the web_fetch action." + ) + if fetch_mode == 'title': + return make_result(url, 'Test Page Title', '', 0, 200, 'OK') + return make_result( + url, 'Test Page Title', mock_content, len(mock_content), 200, 'OK' + ) + + # --- Fetch the URL --- try: import requests from bs4 import BeautifulSoup @@ -208,39 +239,27 @@ def _make_redirect(original_url, redirect_url): 'Accept-Language': 'en-US,en;q=0.9' } - # First, make a HEAD request to check for redirects without downloading content - try: - head_response = requests.head(url, headers=headers, timeout=timeout, allow_redirects=True) - final_url = str(head_response.url) - final_parsed = urlparse(final_url) - final_host = final_parsed.netloc.lower() - - # Check if redirect is to a different host - if final_host != original_host: - return _make_redirect(url, final_url) - except requests.exceptions.RequestException: - # HEAD failed, continue with GET - pass - - # Fetch the content - response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True, stream=True) + # Fetch content — follow up to 10 redirects automatically + response = requests.get( + url, headers=headers, timeout=timeout, + allow_redirects=True, stream=True + ) response.raise_for_status() + status_code = response.status_code + status_text = response.reason or '' final_url = str(response.url) - final_parsed = urlparse(final_url) - final_host = final_parsed.netloc.lower() - - # Double-check for cross-host redirect - if final_host != original_host: - return _make_redirect(url, final_url) # Check content type content_type = response.headers.get('Content-Type', '') if not any(t in content_type for t in ('text/html', 'application/xhtml+xml', 'text/plain')): - return _make_error(f'Unsupported content-type: {content_type}', url) + return make_error( + f'Unsupported content-type: {content_type}', final_url, + status_code=status_code, status_text=status_text + ) - # Read content with size limit - max_bytes = max_content_length * 4 # Rough estimate for UTF-8 + # Read content with size limit (raw bytes cap to prevent memory issues) + max_bytes = 500000 # 500KB raw cap content_bytes = b'' for chunk in response.iter_content(chunk_size=65536): if chunk: @@ -251,13 +270,32 @@ def _make_redirect(original_url, redirect_url): encoding = response.encoding or 'utf-8' html_text = content_bytes.decode(encoding, errors='replace') - # === TIER 1: Fast Static Extraction === + # === Extract title (needed for both modes) === title = '' + try: + meta = trafilatura.metadata.extract_metadata(content_bytes, url=final_url) + if meta and getattr(meta, 'title', None): + title = meta.title.strip() + except Exception: + pass + + if not title: + try: + soup_title = BeautifulSoup(html_text[:5000], 'lxml') + if soup_title.title and soup_title.title.string: + title = soup_title.title.string.strip() + except Exception: + pass + + # === Title mode: return just the title === + if fetch_mode == 'title': + return make_result(final_url, title, '', 0, status_code, status_text) + + # === Full mode: extract content === content_md = '' - extraction_method = 'static' + min_content_length = 200 try: - # Try trafilatura for main content extraction content_md = trafilatura.extract( content_bytes, url=final_url, @@ -265,42 +303,27 @@ def _make_redirect(original_url, redirect_url): include_tables=True, output_format='markdown' ) or '' - - # Try to get title from metadata - try: - meta = trafilatura.metadata.extract_metadata(content_bytes, url=final_url) - if meta and getattr(meta, 'title', None): - title = meta.title.strip() - except Exception: - pass - except Exception: pass - # Fallback to BeautifulSoup if trafilatura fails + # Fallback to BeautifulSoup if not content_md or len(content_md) < min_content_length: - soup = BeautifulSoup(html_text, 'lxml') - - # Get title - if not title and soup.title and soup.title.string: - title = soup.title.string.strip() + try: + soup = BeautifulSoup(html_text, 'lxml') - # Remove script/style elements - for tag in soup(['script', 'style', 'noscript', 'nav', 'footer', 'header']): - tag.decompose() + for tag in soup(['script', 'style', 'noscript', 'nav', 'footer', 'header']): + tag.decompose() - # Get text content - text = soup.get_text('\n') - # Clean up whitespace - text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) - bs_content = text.strip() + text = soup.get_text('\n') + text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) + bs_content = text.strip() - # Use BeautifulSoup content if better than trafilatura - if len(bs_content) > len(content_md or ''): - content_md = bs_content + if len(bs_content) > len(content_md or ''): + content_md = bs_content + except Exception: + pass - # === TIER 2: Jina Reader API Fallback === - # Use Jina if static extraction got insufficient content + # === Jina Reader API Fallback === if use_jina_fallback and (not content_md or len(content_md) < min_content_length): try: jina_url = f"https://r.jina.ai/{url}" @@ -312,62 +335,74 @@ def _make_redirect(original_url, redirect_url): if jina_response.status_code == 200: jina_content = jina_response.text.strip() - - # Jina returns markdown with title as first line if jina_content and len(jina_content) > min_content_length: content_md = jina_content - extraction_method = 'jina' - - # Extract title from Jina response (usually first # heading) - title_match = re.match(r'^#\s*(.+?)[\n\r]', jina_content) - if title_match and not title: - title = title_match.group(1).strip() + if not title: + title_match = re.match(r'^#\s*(.+?)[\n\r]', jina_content) + if title_match: + title = title_match.group(1).strip() except Exception: - # Jina fallback failed, continue with whatever we have pass - # === Content Quality Check === - # Clean and validate content + # === Clean content === if content_md: - # Remove excessive whitespace content_md = re.sub(r'\n{4,}', '\n\n\n', content_md) content_md = content_md.strip() - # Check if truncation is needed + if not content_md: + return make_result( + final_url, title, '', 0, status_code, status_text, + message='No content could be extracted. Site may require JavaScript rendering — use browser tools (Playwright) instead.' + ) + + total_content_length = len(content_md) + + # === Truncation + file save === was_truncated = False - if len(content_md) > max_content_length: - content_md = content_md[:max_content_length] - # Try to truncate at a sentence boundary - last_period = content_md.rfind('.') + content_file = None + + if not unlimited and total_content_length > max_content_length: + content_file = save_content_file(content_md, final_url, session_id) + + truncated = content_md[:max_content_length] + last_period = truncated.rfind('.') if last_period > max_content_length * 0.8: - content_md = content_md[:last_period + 1] - content_md += '\n\n[Content truncated due to length...]' + truncated = truncated[:last_period + 1] + content_md = truncated was_truncated = True - # Build result with extraction method info + # === Build message === message = '' - if not content_md or len(content_md) < min_content_length: - message = 'Warning: Extracted content may be incomplete. Site may require JavaScript rendering or authentication.' - - return { - 'status': 'success', - 'url': url, - 'final_url': final_url, - 'title': title or '', - 'content': content_md, - 'content_length': len(content_md), - 'was_truncated': was_truncated, - 'prompt_used': prompt or '', - 'message': message, - 'extraction_method': extraction_method - } + if was_truncated: + message = ( + f'Content truncated to {len(content_md)} chars. ' + f'Full content ({total_content_length} chars) saved to content_file. ' + f'Use grep_files(pattern, path=content_file) to search for specific info, ' + f'or read_file(file_path=content_file, offset=N, limit=M) to paginate.' + ) + + return make_result( + final_url, title, content_md, total_content_length, + status_code, status_text, + was_truncated=was_truncated, content_file=content_file, + message=message + ) - except requests.exceptions.Timeout: - return _make_error(f'Request timed out after {timeout} seconds.', url) - except requests.exceptions.ConnectionError as e: - return _make_error(f'Connection error: {str(e)}', url) - except requests.exceptions.HTTPError as e: - return _make_error(f'HTTP error: {str(e)}', url) except Exception as e: - return _make_error(f'Unexpected error: {str(e)}', url) + sc, st = 0, '' + if hasattr(e, 'response') and e.response is not None: + sc = e.response.status_code + st = e.response.reason or '' + + error_type = type(e).__name__ + if 'Timeout' in error_type: + msg = f'Request timed out after {timeout} seconds.' + elif 'ConnectionError' in error_type: + msg = f'Connection error: {str(e)}' + elif 'HTTPError' in error_type: + msg = f'HTTP error: {str(e)}' + else: + msg = f'Fetch failed: {str(e)}' + + return make_error(msg, url, status_code=sc, status_text=st) diff --git a/app/data/agent_file_system_template/AGENT.md b/app/data/agent_file_system_template/AGENT.md index 426f8b5d..910a24a6 100644 --- a/app/data/agent_file_system_template/AGENT.md +++ b/app/data/agent_file_system_template/AGENT.md @@ -26,7 +26,7 @@ Efficient File Reading: File Actions: - read_file: General reading with pagination (offset/limit) -- grep_files: Search for keywords, returns matching chunks with line numbers +- grep_files: Search files/directories for regex patterns with three output modes: 'files_with_matches' (discover files), 'content' (matching lines with line numbers), 'count' (match counts). Supports glob/file_type filtering, before/after context lines, case_insensitive, and multiline. - stream_read + stream_edit: Use together for file modifications Avoid: Reading entire large files repeatedly - use grep + targeted offset/limit reads instead diff --git a/app/internal_action_interface.py b/app/internal_action_interface.py index 076d1623..a0aa2019 100644 --- a/app/internal_action_interface.py +++ b/app/internal_action_interface.py @@ -1001,3 +1001,77 @@ def list_action_sets(cls) -> Dict[str, Any]: "available_sets": available_sets, "current_sets": current_sets, } + + @classmethod + def list_skills(cls) -> Dict[str, Any]: + """ + List all enabled skills with their names and descriptions. + + Returns: + Dictionary with skill names mapped to descriptions. + """ + from agent_core.core.impl.skill.manager import skill_manager + + skills = skill_manager.list_skills_for_selection() + return {"skills": skills} + + @classmethod + def use_skill(cls, skill_name: str) -> Dict[str, Any]: + """ + Activate a skill for the current task, replacing the current skill + in the system prompt. Invalidates and re-creates LLM session caches + so the updated system prompt takes effect. + + Args: + skill_name: Name of the skill to activate. + + Returns: + Dictionary with success status and skill details. + """ + if cls.task_manager is None: + raise RuntimeError("InternalActionInterface not initialized with TaskManager.") + + from agent_core.core.impl.skill.manager import skill_manager + + # Validate skill exists and is enabled + skill = skill_manager.get_skill(skill_name) + if not skill: + return {"success": False, "error": f"Skill '{skill_name}' not found."} + if not skill.enabled: + return {"success": False, "error": f"Skill '{skill_name}' is not enabled."} + + # Get current task and save previous skills + task = cls.task_manager.get_task() + if not task: + return {"success": False, "error": "No active task."} + + previous_skills = list(task.selected_skills) + + # Replace selected skills + task.selected_skills = [skill_name] + + # Add skill-recommended action sets (if any new ones) + added_action_sets = [] + recommended_sets = skill_manager.get_skill_action_sets([skill_name]) + if recommended_sets: + current_sets = set(task.action_sets) + new_sets = [s for s in recommended_sets if s not in current_sets] + if new_sets: + cls.add_action_sets(new_sets) # This also invalidates caches + added_action_sets = new_sets + else: + # No new action sets but system prompt still changed — invalidate caches + cls._invalidate_action_selection_caches() + else: + # No recommended sets — still need to invalidate for skill change + cls._invalidate_action_selection_caches() + + logger.info(f"[SKILL] Activated skill '{skill_name}' (replaced: {previous_skills})") + + return { + "success": True, + "active_skill": skill_name, + "skill_description": skill.description, + "previous_skills": previous_skills, + "added_action_sets": added_action_sets, + } diff --git a/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.module.css b/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.module.css index b9892244..10f8f49e 100644 --- a/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.module.css +++ b/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.module.css @@ -255,6 +255,8 @@ .detailList dd { color: var(--text-primary); + white-space: pre-wrap; + word-break: break-word; } .mono { @@ -306,6 +308,8 @@ /* Expandable value for long text */ .expandableValue { display: inline; + white-space: pre-wrap; + word-break: break-word; } .expandButton { diff --git a/app/ui_layer/events/transformer.py b/app/ui_layer/events/transformer.py index e1fd6706..3bca0d10 100644 --- a/app/ui_layer/events/transformer.py +++ b/app/ui_layer/events/transformer.py @@ -283,21 +283,26 @@ def _create_task_end_event( @classmethod def _python_str_to_json(cls, python_str: str) -> str: - """Convert Python dict/list string representation to JSON. + """Convert a JSON or Python dict/list string to pretty-printed JSON. - Uses ast.literal_eval to safely parse Python literals, - then json.dumps to convert to proper JSON. + Tries json.loads first (handles pretty-printed JSON with null/true/false), + falls back to ast.literal_eval for legacy Python dict repr (None/True/False). """ import ast import json + # Try JSON first (handles pretty-printed JSON from _to_pretty_json) + try: + parsed = json.loads(python_str) + return json.dumps(parsed, indent=2, ensure_ascii=False) + except (json.JSONDecodeError, TypeError): + pass + + # Fallback: Python literal (legacy format) try: - # Parse Python literal (dict, list, etc.) parsed = ast.literal_eval(python_str) - # Convert to JSON string - return json.dumps(parsed) + return json.dumps(parsed, indent=2, ensure_ascii=False) except (ValueError, SyntaxError): - # If parsing fails, return original string return python_str @classmethod