diff --git a/pyproject.toml b/pyproject.toml index bbaeb0e..babd002 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "socketdev" -version = "3.0.14" +version = "3.0.16" requires-python = ">= 3.9" dependencies = [ 'requests', diff --git a/socketdev/__init__.py b/socketdev/__init__.py index 820f7fa..04a7227 100644 --- a/socketdev/__init__.py +++ b/socketdev/__init__.py @@ -25,6 +25,7 @@ from socketdev.analytics import Analytics from socketdev.alerttypes import AlertTypes from socketdev.basics import Basics +from socketdev.uploadmanifests import UploadManifests from socketdev.log import log __author__ = "socket.dev" @@ -74,6 +75,7 @@ def __init__(self, token: str, timeout: int = 1200): self.analytics = Analytics(self.api) self.alerttypes = AlertTypes(self.api) self.basics = Basics(self.api) + self.uploadmanifests = UploadManifests(self.api) @staticmethod def set_timeout(timeout: int): diff --git a/socketdev/uploadmanifests/__init__.py b/socketdev/uploadmanifests/__init__.py new file mode 100644 index 0000000..38c5a2c --- /dev/null +++ b/socketdev/uploadmanifests/__init__.py @@ -0,0 +1,67 @@ +import os +import logging +from typing import List, Optional, Union +from ..utils import Utils + +log = logging.getLogger("socketdev") + + +class UploadManifests: + def __init__(self, api): + self.api = api + + def upload_manifest_files(self, org_slug: str, file_paths: List[str], workspace: Optional[str] = None, base_path: Optional[str] = None, base_paths: Optional[List[str]] = None, use_lazy_loading: bool = True) -> str: + """ + Upload manifest files to Socket API and return tarHash. + + Args: + org_slug: Organization slug + file_paths: List of manifest file paths to upload + workspace: Base directory path to make paths relative to + base_path: Optional base path to strip from key names for cleaner file organization + base_paths: Optional list of base paths to strip from key names (takes precedence over base_path) + use_lazy_loading: Whether to use lazy file loading (default: True) + + Returns: + str: The tarHash from the upload response + + Raises: + Exception: If upload fails + """ + # Filter to only existing files + valid_files = [f for f in file_paths if os.path.exists(f) and os.path.isfile(f)] + + if not valid_files: + raise Exception("No valid manifest files found to upload") + + # Prepare files for upload using the utility function + if use_lazy_loading: + loaded_files = Utils.load_files_for_sending_lazy( + valid_files, + workspace=workspace, + base_path=base_path, + base_paths=base_paths + ) + else: + # Fallback to basic file loading if needed + loaded_files = [] + for file_path in valid_files: + key = os.path.basename(file_path) + with open(file_path, 'rb') as f: + loaded_files.append((key, (key, f.read()))) + + # Make the upload request + path = f"orgs/{org_slug}/upload-manifest-files" + response = self.api.do_request(path=path, files=loaded_files, method="POST") + + if response.status_code != 200: + raise Exception(f"Upload failed with status {response.status_code}: {response.text}") + + result = response.json() + tar_hash = result.get('tarHash') + + if not tar_hash: + raise Exception("Server did not return a tarHash") + + log.info(f"Successfully uploaded {len(valid_files)} manifest files, tarHash: {tar_hash}") + return tar_hash \ No newline at end of file diff --git a/socketdev/utils/__init__.py b/socketdev/utils/__init__.py index e9f116b..718eeee 100644 --- a/socketdev/utils/__init__.py +++ b/socketdev/utils/__init__.py @@ -1,4 +1,4 @@ -from typing import Literal, List, Tuple +from typing import Literal, List, Tuple, Optional import logging import os import weakref @@ -233,7 +233,7 @@ def validate_integration_type(integration_type: str) -> IntegrationType: return integration_type # type: ignore @staticmethod - def load_files_for_sending_lazy(files: List[str], workspace: str = None, max_open_files: int = 100, base_path: str = None, base_paths: List[str] = None) -> List[Tuple[str, Tuple[str, LazyFileLoader]]]: + def load_files_for_sending_lazy(files: List[str], workspace: Optional[str] = None, max_open_files: int = 100, base_path: Optional[str] = None, base_paths: Optional[List[str]] = None) -> List[Tuple[str, Tuple[str, LazyFileLoader]]]: """ Prepares files for sending to the Socket API using lazy loading. @@ -268,78 +268,75 @@ def load_files_for_sending_lazy(files: List[str], workspace: str = None, max_ope # Normalize file path if "\\" in file_path: file_path = file_path.replace("\\", "/") - - for file_path in files: - # Normalize file path - if "\\" in file_path: - file_path = file_path.replace("\\", "/") - # Skip directories - if os.path.isdir(file_path): - continue + # Skip directories + if os.path.isdir(file_path): + continue - # Handle file path splitting safely - if "/" in file_path: - _, name = file_path.rsplit("/", 1) - else: - name = file_path + # Handle file path splitting safely + if "/" in file_path: + _, name = file_path.rsplit("/", 1) + else: + name = file_path - # Calculate the key name for the form data - key = file_path - path_stripped = False + # Calculate the key name for the form data + key = file_path + path_stripped = False - # If base_paths is provided, try to strip one of the paths from the file path - if base_paths: - for bp in base_paths: - normalized_base_path = bp.rstrip("/") + "/" if not bp.endswith("/") else bp - if key.startswith(normalized_base_path): - key = key[len(normalized_base_path):] - path_stripped = True - break - elif key.startswith(bp.rstrip("/")): - stripped_base = bp.rstrip("/") - if key.startswith(stripped_base + "/") or key == stripped_base: - key = key[len(stripped_base):] - key = key.lstrip("/") - path_stripped = True - break - elif base_path: - normalized_base_path = base_path.rstrip("/") + "/" if not base_path.endswith("/") else base_path + # If base_paths is provided, try to strip one of the paths from the file path + if base_paths: + for bp in base_paths: + normalized_base_path = bp.rstrip("/") + "/" if not bp.endswith("/") else bp if key.startswith(normalized_base_path): key = key[len(normalized_base_path):] path_stripped = True - elif key.startswith(base_path.rstrip("/")): - stripped_base = base_path.rstrip("/") + break + elif key.startswith(bp.rstrip("/")): + stripped_base = bp.rstrip("/") if key.startswith(stripped_base + "/") or key == stripped_base: key = key[len(stripped_base):] key = key.lstrip("/") path_stripped = True - - # If workspace is provided and no base paths matched, fall back to workspace logic - if not path_stripped and workspace and file_path.startswith(workspace): - key = file_path[len(workspace):] - # Remove all leading slashes (for absolute paths) - while key.startswith("/"): - key = key[1:] + break + elif base_path: + normalized_base_path = base_path.rstrip("/") + "/" if not base_path.endswith("/") else base_path + if key.startswith(normalized_base_path): + key = key[len(normalized_base_path):] path_stripped = True + elif key.startswith(base_path.rstrip("/")): + stripped_base = base_path.rstrip("/") + if key.startswith(stripped_base + "/") or key == stripped_base: + key = key[len(stripped_base):] + key = key.lstrip("/") + path_stripped = True + + # If workspace is provided and no base paths matched, fall back to workspace logic + if not path_stripped and workspace and file_path.startswith(workspace): + key = file_path[len(workspace):] + # Remove all leading slashes (for absolute paths) + while key.startswith("/"): + key = key[1:] + path_stripped = True + + # Clean up relative path prefixes, but preserve filename dots + while key.startswith("./"): + key = key[2:] + while key.startswith("../"): + key = key[3:] + # Remove any remaining leading slashes (for absolute paths) + while key.startswith("/"): + key = key[1:] - # Clean up relative path prefixes, but preserve filename dots - while key.startswith("./"): - key = key[2:] - while key.startswith("../"): - key = key[3:] - # Remove any remaining leading slashes (for absolute paths) + # Remove Windows drive letter if present (C:/...) + if len(key) > 2 and key[1] == ':' and (key[2] == '/' or key[2] == '\\'): + key = key[2:] while key.startswith("/"): key = key[1:] - # Remove Windows drive letter if present (C:/...) - if len(key) > 2 and key[1] == ':' and (key[2] == '/' or key[2] == '\\'): - key = key[2:] - while key.startswith("/"): - key = key[1:] + # Create lazy file loader instead of opening file immediately + lazy_file = LazyFileLoader(file_path, key) + payload = (key, (key, lazy_file)) + send_files.append(payload) - # Create lazy file loader instead of opening file immediately - lazy_file = LazyFileLoader(file_path, key) - payload = (key, (key, lazy_file)) - send_files.append(payload) - return send_files + log.debug(f"Prepared {len(send_files)} files for lazy loading") + return send_files diff --git a/socketdev/version.py b/socketdev/version.py index c5af2d9..cf5900b 100644 --- a/socketdev/version.py +++ b/socketdev/version.py @@ -1 +1 @@ -__version__ = "3.0.14" +__version__ = "3.0.16"