Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 204 additions & 6 deletions services/file_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,38 @@
import tempfile
import contextlib
import logging
from typing import IO
import hashlib
import asyncio
import mimetypes
from datetime import datetime, timezone, timedelta
from typing import IO, Optional, List, Dict, Any
from services.exceptions import DocumentSaveError
from concurrent.futures import ThreadPoolExecutor

UPLOAD_DIR = os.path.join(os.path.dirname(__file__), '..', 'data', 'uploads')
DEFAULT_MAX_UPLOAD_BYTES = int(os.environ.get("MAX_UPLOAD_BYTES", str(50 * 1024 * 1024))) # 50 MiB
_ENV_UPLOAD_DIR = os.environ.get("UPLOAD_DIR")
UPLOAD_DIR = os.path.abspath(_ENV_UPLOAD_DIR) if _ENV_UPLOAD_DIR else os.path.join(os.path.dirname(__file__), '..', 'data', 'uploads')

os.makedirs(UPLOAD_DIR, exist_ok=True)

# Optional comma-separated allowed content types (e.g. 'application/pdf,text/plain')
_ALLOWED_TYPES = os.environ.get("ALLOWED_CONTENT_TYPES")
ALLOWED_CONTENT_TYPES = set([t.strip() for t in _ALLOWED_TYPES.split(',')]) if _ALLOWED_TYPES else None

# Shared thread pool for offloading blocking file operations when used from async code
_THREAD_POOL: Optional[ThreadPoolExecutor] = None

def _get_thread_pool() -> ThreadPoolExecutor:
global _THREAD_POOL
if _THREAD_POOL is None:
_THREAD_POOL = ThreadPoolExecutor(max_workers=4)
return _THREAD_POOL

def save_upload_file(upload_file: IO, filename: str) -> str:
# Sanitize filename: remove path, reject empty/unsafe
logger = logging.getLogger(__name__)
base = os.path.basename(filename)
if not base or base in {'.', '..'} or any(c in base for c in '\/:*?"<>|'):
if not base or base in {'.', '..'} or any(c in base for c in '\\/:*?"<>|'):
logger.error("Invalid filename for upload: %s", filename)
raise DocumentSaveError("Invalid filename.")

Comment on lines 39 to 46
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix unused max_bytes: plumb through async wrapper to sync save

max_bytes in save_upload_file_async is ignored; wire it through and make sync save accept it.

Apply this diff:

-def save_upload_file(upload_file: IO, filename: str) -> str:
+def save_upload_file(upload_file: IO, filename: str, max_bytes: Optional[int] = None) -> str:
@@
-    # Default max bytes
-    max_bytes = DEFAULT_MAX_UPLOAD_BYTES
+    # Default max bytes
+    if max_bytes is None:
+        max_bytes = DEFAULT_MAX_UPLOAD_BYTES
@@
 async def save_upload_file_async(upload_file: IO, filename: str, max_bytes: Optional[int] = None) -> str:
     """Async wrapper that offloads the blocking save call to a thread pool."""
     loop = asyncio.get_running_loop()
-    return await loop.run_in_executor(_get_thread_pool(), save_upload_file, upload_file, filename)
+    return await loop.run_in_executor(_get_thread_pool(), save_upload_file, upload_file, filename, max_bytes)

Also applies to: 58-60, 179-183

🤖 Prompt for AI Agents
In services/file_service.py around lines 39-46 (and also update usages at 58-60
and 179-183), the async wrapper save_upload_file_async currently ignores the
max_bytes argument; update the sync function signature to accept an optional
max_bytes parameter, plumb max_bytes from the async wrapper into the sync call,
and enforce the byte limit inside the sync save (validate file size or stream
read versus max_bytes and raise DocumentSaveError on overflow). Also update all
callsites and tests to pass max_bytes through and ensure logging includes the
filename and reason when rejecting due to size.

Expand All @@ -35,23 +55,201 @@ def save_upload_file(upload_file: IO, filename: str) -> str:
logger.error("Unsafe file path detected: %s", final_path)
raise DocumentSaveError("Unsafe file path.")

# Write file atomically in chunks
# Default max bytes
max_bytes = DEFAULT_MAX_UPLOAD_BYTES

# Write file atomically in chunks while enforcing size limits and computing checksum
sha256 = hashlib.sha256()
total = 0

try:
with tempfile.NamedTemporaryFile(dir=UPLOAD_DIR, delete=False) as tmp:
for chunk in iter(lambda: upload_file.read(8192), b""):
tmp.write(chunk)
temp_path = tmp.name
# Support starlette UploadFile which provides .file
source = getattr(upload_file, 'file', upload_file)
for chunk in iter(lambda: source.read(8192), b""):
if not chunk:
break
tmp.write(chunk)
total += len(chunk)
sha256.update(chunk)
if total > max_bytes:
logger.warning("Upload exceeds max allowed size: %d bytes", total)
raise DocumentSaveError("Uploaded file exceeds maximum allowed size.")
os.replace(temp_path, final_path)
try:
os.chmod(final_path, 0o600)
except OSError:
pass # Best effort, ignore chmod errors
except DocumentSaveError:
with contextlib.suppress(Exception):
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
raise
except OSError as e:
logger.exception("OSError during file save: %s", filename)
with contextlib.suppress(Exception):
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
raise DocumentSaveError("Failed to save file securely.") from e

# Optionally detect mime type (by extension); more accurate detection can be added
try:
mime, _ = mimetypes.guess_type(final_path)
except Exception:
mime = None

logger.info("Saved upload %s as %s (%d bytes, mime=%s)", base, final_path, total, mime)
return final_path
Comment on lines +96 to 103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Enforce allowed MIME types and use detect_mime_type()

ALLOWED_CONTENT_TYPES is unused. Detect MIME and reject disallowed types; clean up the saved file if rejected.

Apply this diff:

-    # Optionally detect mime type (by extension); more accurate detection can be added
-    try:
-        mime, _ = mimetypes.guess_type(final_path)
-    except Exception:
-        mime = None
-
-    logger.info("Saved upload %s as %s (%d bytes, mime=%s)", base, final_path, total, mime)
+    # Detect MIME (with python-magic fallback) and enforce allowed types if configured
+    mime = detect_mime_type(final_path)
+    if ALLOWED_CONTENT_TYPES is not None and mime not in ALLOWED_CONTENT_TYPES:
+        logger.warning("Disallowed content type: %s for %s", mime, base)
+        with contextlib.suppress(Exception):
+            os.remove(final_path)
+        raise DocumentSaveError("Uploaded file type is not allowed.")
+
+    logger.info("Saved upload %s as %s (%d bytes, mime=%s)", base, final_path, total, mime)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Optionally detect mime type (by extension); more accurate detection can be added
try:
mime, _ = mimetypes.guess_type(final_path)
except Exception:
mime = None
logger.info("Saved upload %s as %s (%d bytes, mime=%s)", base, final_path, total, mime)
return final_path
# Detect MIME (with python-magic fallback) and enforce allowed types if configured
mime = detect_mime_type(final_path)
if ALLOWED_CONTENT_TYPES is not None and mime not in ALLOWED_CONTENT_TYPES:
logger.warning("Disallowed content type: %s for %s", mime, base)
with contextlib.suppress(Exception):
os.remove(final_path)
raise DocumentSaveError("Uploaded file type is not allowed.")
logger.info("Saved upload %s as %s (%d bytes, mime=%s)", base, final_path, total, mime)
return final_path



def save_upload_file_with_meta(upload_file: IO, filename: str, max_bytes: Optional[int] = None) -> Dict[str, Any]:
"""Save file and return metadata: path, size, sha256, mime_type.

This keeps the original save behavior but returns useful metadata for callers.
"""
logger = logging.getLogger(__name__)
# If caller passes a custom max_bytes use it, otherwise use default
if max_bytes is None:
max_bytes = DEFAULT_MAX_UPLOAD_BYTES

base = os.path.basename(filename)
if not base:
raise DocumentSaveError("Invalid filename.")

ext = os.path.splitext(base)[1]
safe_name = f"{uuid.uuid4().hex}{ext}"
final_path = os.path.abspath(os.path.join(UPLOAD_DIR, safe_name))

upload_dir_abs = os.path.abspath(UPLOAD_DIR)
if not final_path.startswith(upload_dir_abs + os.sep):
logger.error("Unsafe file path detected: %s", final_path)
raise DocumentSaveError("Unsafe file path.")

Comment on lines +116 to +128
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Apply same filename sanitization here as in save_upload_file

Inconsistent checks; mirror the stricter validation to avoid edge-case surprises.

Apply this diff:

-    base = os.path.basename(filename)
-    if not base:
-        raise DocumentSaveError("Invalid filename.")
+    base = os.path.basename(filename)
+    if not base or base in {'.', '..'} or any(c in base for c in '\\/:*?"<>|'):
+        logger.error("Invalid filename for upload: %s", filename)
+        raise DocumentSaveError("Invalid filename.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
base = os.path.basename(filename)
if not base:
raise DocumentSaveError("Invalid filename.")
ext = os.path.splitext(base)[1]
safe_name = f"{uuid.uuid4().hex}{ext}"
final_path = os.path.abspath(os.path.join(UPLOAD_DIR, safe_name))
upload_dir_abs = os.path.abspath(UPLOAD_DIR)
if not final_path.startswith(upload_dir_abs + os.sep):
logger.error("Unsafe file path detected: %s", final_path)
raise DocumentSaveError("Unsafe file path.")
base = os.path.basename(filename)
if not base or base in {'.', '..'} or any(c in base for c in '\\/:*?"<>|'):
logger.error("Invalid filename for upload: %s", filename)
raise DocumentSaveError("Invalid filename.")
ext = os.path.splitext(base)[1]
safe_name = f"{uuid.uuid4().hex}{ext}"
final_path = os.path.abspath(os.path.join(UPLOAD_DIR, safe_name))
upload_dir_abs = os.path.abspath(UPLOAD_DIR)
if not final_path.startswith(upload_dir_abs + os.sep):
logger.error("Unsafe file path detected: %s", final_path)
raise DocumentSaveError("Unsafe file path.")
🤖 Prompt for AI Agents
In services/file_service.py around lines 116 to 128, the current filename
handling builds a UUID-based safe_name but lacks the stricter checks used in
save_upload_file; update this block to reuse or mirror the same
sanitization/validation: validate and normalize the incoming filename (strip
path components, reject names containing os.sep or leading dots), enforce the
allowed/extensions rule and length limits used by save_upload_file, derive the
extension safely (lowercased) and build safe_name from a validated ext, then
compute final_path and re-check that final_path resides inside the absolute
UPLOAD_DIR before writing; if any check fails, log the full context and raise
DocumentSaveError.

sha256 = hashlib.sha256()
total = 0

try:
with tempfile.NamedTemporaryFile(dir=UPLOAD_DIR, delete=False) as tmp:
temp_path = tmp.name
source = getattr(upload_file, 'file', upload_file)
for chunk in iter(lambda: source.read(8192), b""):
if not chunk:
break
tmp.write(chunk)
total += len(chunk)
sha256.update(chunk)
if total > max_bytes:
logger.warning("Upload exceeds max allowed size: %d bytes", total)
raise DocumentSaveError("Uploaded file exceeds maximum allowed size.")
os.replace(temp_path, final_path)
try:
os.chmod(final_path, 0o600)
except OSError:
pass
except DocumentSaveError:
with contextlib.suppress(Exception):
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
raise
except OSError as e:
logger.exception("OSError during file save: %s", filename)
with contextlib.suppress(Exception):
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
raise DocumentSaveError("Failed to save file securely.") from e

try:
mime, _ = mimetypes.guess_type(final_path)
except Exception:
mime = None

meta = {
'path': final_path,
'original_name': base,
'size': total,
'sha256': sha256.hexdigest(),
'mime_type': mime,
'saved_at': datetime.now(timezone.utc).isoformat(),
}
logger.info("Saved upload metadata: %s", {k: meta[k] for k in ('original_name','path','size','mime_type')})
return meta
Comment on lines +162 to +176
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return accurate MIME and enforce allowed types in metadata save

Use detect_mime_type() and enforce ALLOWED_CONTENT_TYPES with cleanup on violation.

Apply this diff:

-    try:
-        mime, _ = mimetypes.guess_type(final_path)
-    except Exception:
-        mime = None
+    mime = detect_mime_type(final_path)
+    if ALLOWED_CONTENT_TYPES is not None and mime not in ALLOWED_CONTENT_TYPES:
+        logger.warning("Disallowed content type: %s for %s", mime, base)
+        with contextlib.suppress(Exception):
+            os.remove(final_path)
+        raise DocumentSaveError("Uploaded file type is not allowed.")
 
     meta = {
         'path': final_path,
         'original_name': base,
         'size': total,
         'sha256': sha256.hexdigest(),
         'mime_type': mime,
         'saved_at': datetime.now(timezone.utc).isoformat(),
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
mime, _ = mimetypes.guess_type(final_path)
except Exception:
mime = None
meta = {
'path': final_path,
'original_name': base,
'size': total,
'sha256': sha256.hexdigest(),
'mime_type': mime,
'saved_at': datetime.now(timezone.utc).isoformat(),
}
logger.info("Saved upload metadata: %s", {k: meta[k] for k in ('original_name','path','size','mime_type')})
return meta
mime = detect_mime_type(final_path)
if ALLOWED_CONTENT_TYPES is not None and mime not in ALLOWED_CONTENT_TYPES:
logger.warning("Disallowed content type: %s for %s", mime, base)
with contextlib.suppress(Exception):
os.remove(final_path)
raise DocumentSaveError("Uploaded file type is not allowed.")
meta = {
'path': final_path,
'original_name': base,
'size': total,
'sha256': sha256.hexdigest(),
'mime_type': mime,
'saved_at': datetime.now(timezone.utc).isoformat(),
}
logger.info("Saved upload metadata: %s", {k: meta[k] for k in ('original_name','path','size','mime_type')})
return meta



async def save_upload_file_async(upload_file: IO, filename: str, max_bytes: Optional[int] = None) -> str:
"""Async wrapper that offloads the blocking save call to a thread pool."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(_get_thread_pool(), save_upload_file, upload_file, filename)


async def save_upload_file_with_meta_async(upload_file: IO, filename: str, max_bytes: Optional[int] = None) -> Dict[str, Any]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(_get_thread_pool(), save_upload_file_with_meta, upload_file, filename, max_bytes)


def calculate_checksum(path: str) -> str:
"""Compute SHA256 checksum for a file path."""
h = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()


def detect_mime_type(path: str) -> Optional[str]:
"""Detect mime type by extension or fallback to mimetypes. If python-magic is available it will be used."""
try:
import magic as _magic # type: ignore
except Exception:
mime, _ = mimetypes.guess_type(path)
return mime
try:
m = _magic.Magic(mime=True)
return m.from_file(path)
except Exception:
return None


def list_uploads() -> List[Dict[str, Any]]:
"""Return list of uploaded files with basic metadata."""
results: List[Dict[str, Any]] = []
for name in os.listdir(UPLOAD_DIR):
path = os.path.join(UPLOAD_DIR, name)
if not os.path.isfile(path):
continue
stat = os.stat(path)
results.append({
'name': name,
'path': path,
'size': stat.st_size,
'mtime': datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat(),
})
return results


def delete_upload(name_or_path: str) -> bool:
"""Delete an upload by filename or absolute path. Returns True if deleted."""
if os.path.isabs(name_or_path):
path = name_or_path
else:
path = os.path.join(UPLOAD_DIR, name_or_path)
try:
if os.path.exists(path):
os.remove(path)
return True
except Exception:
logging.getLogger(__name__).exception("Failed to delete upload: %s", path)
return False

Comment on lines +230 to +243
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Block path traversal: restrict deletes to UPLOAD_DIR and files only

Allowing absolute paths enables deleting arbitrary files. Constrain deletions to files under UPLOAD_DIR.

Apply this diff:

 def delete_upload(name_or_path: str) -> bool:
-    """Delete an upload by filename or absolute path. Returns True if deleted."""
-    if os.path.isabs(name_or_path):
-        path = name_or_path
-    else:
-        path = os.path.join(UPLOAD_DIR, name_or_path)
-    try:
-        if os.path.exists(path):
-            os.remove(path)
-            return True
-    except Exception:
-        logging.getLogger(__name__).exception("Failed to delete upload: %s", path)
-    return False
+    """Delete an upload by filename or absolute path within UPLOAD_DIR. Returns True if deleted."""
+    logger = logging.getLogger(__name__)
+    if os.path.isabs(name_or_path):
+        path = name_or_path
+    else:
+        path = os.path.join(UPLOAD_DIR, name_or_path)
+    try:
+        path = os.path.abspath(path)
+        upload_dir_abs = os.path.abspath(UPLOAD_DIR)
+        # Ensure path is inside UPLOAD_DIR and is a regular file
+        if not path.startswith(upload_dir_abs + os.sep) or not os.path.isfile(path):
+            logger.warning("Refusing to delete path outside UPLOAD_DIR or non-file: %s", path)
+            return False
+        os.remove(path)
+        return True
+    except Exception:
+        logger.exception("Failed to delete upload: %s", path)
+        return False
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def delete_upload(name_or_path: str) -> bool:
"""Delete an upload by filename or absolute path. Returns True if deleted."""
if os.path.isabs(name_or_path):
path = name_or_path
else:
path = os.path.join(UPLOAD_DIR, name_or_path)
try:
if os.path.exists(path):
os.remove(path)
return True
except Exception:
logging.getLogger(__name__).exception("Failed to delete upload: %s", path)
return False
def delete_upload(name_or_path: str) -> bool:
"""Delete an upload by filename or absolute path within UPLOAD_DIR. Returns True if deleted."""
logger = logging.getLogger(__name__)
if os.path.isabs(name_or_path):
path = name_or_path
else:
path = os.path.join(UPLOAD_DIR, name_or_path)
try:
path = os.path.abspath(path)
upload_dir_abs = os.path.abspath(UPLOAD_DIR)
# Ensure path is inside UPLOAD_DIR and is a regular file
if not path.startswith(upload_dir_abs + os.sep) or not os.path.isfile(path):
logger.warning("Refusing to delete path outside UPLOAD_DIR or non-file: %s", path)
return False
os.remove(path)
return True
except Exception:
logger.exception("Failed to delete upload: %s", path)
return False
🤖 Prompt for AI Agents
In services/file_service.py around lines 230-243, the delete_upload function
currently accepts absolute paths and can delete files outside UPLOAD_DIR; change
it to resolve the requested path against UPLOAD_DIR only, compute both
UPLOAD_DIR and target via Path.resolve(), ensure the resolved target starts with
the resolved UPLOAD_DIR path (reject if not), verify the target is a regular
file (is_file()) and not a directory or symlink pointing outside, then remove it
and return True; on any check failure return False and keep the existing
exception logging for actual removal errors.


def cleanup_old_uploads(days: int = 7) -> int:
"""Remove uploads older than `days`. Returns number of files removed."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
removed = 0
for info in list_uploads():
mtime = datetime.fromisoformat(info['mtime'])
if mtime < cutoff:
if delete_upload(info['path']):
removed += 1
logging.getLogger(__name__).info("cleanup_old_uploads removed %d files older than %d days", removed, days)
return removed