Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "hatchling.build"

[project]
name = "socketsecurity"
version = "2.2.33"
version = "2.2.35"
requires-python = ">= 3.10"
license = {"file" = "LICENSE"}
dependencies = [
Expand Down
2 changes: 1 addition & 1 deletion socketsecurity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__author__ = 'socket.dev'
__version__ = '2.2.33'
__version__ = '2.2.35'
USER_AGENT = f'SocketPythonCLI/{__version__}'
88 changes: 70 additions & 18 deletions socketsecurity/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,13 @@ def empty_head_scan_file() -> List[str]:
Returns:
List containing path to a temporary empty file
"""
# Create a temporary empty file
temp_fd, temp_path = tempfile.mkstemp(suffix='.empty', prefix='socket_baseline_')
# Create a temporary directory and then create our specific filename
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, '.socket.facts.json')

# Close the file descriptor since we just need the path
# The file is already created and empty
os.close(temp_fd)
# Create the empty file
with open(temp_path, 'w') as f:
pass # Creates an empty file

log.debug(f"Created temporary empty file for baseline scan: {temp_path}")
return [temp_path]
Expand Down Expand Up @@ -524,18 +525,42 @@ def create_full_scan_with_report_url(
if save_manifest_tar_path and all_files and paths:
self.save_manifest_tar(all_files, save_manifest_tar_path, paths[0])

# If no supported files found, create empty scan
if not all_files:
return diff

try:
# Create new scan
new_scan_start = time.time()
new_full_scan = self.create_full_scan(all_files, params, base_paths=base_paths)
new_scan_end = time.time()
log.info(f"Total time to create new full scan: {new_scan_end - new_scan_start:.2f}")
except APIFailure as e:
log.error(f"Failed to create full scan: {e}")
raise
log.info("No supported manifest files found - creating empty scan")
empty_files = Core.empty_head_scan_file()
try:
# Create new scan
new_scan_start = time.time()
new_full_scan = self.create_full_scan(empty_files, params, base_paths=base_paths)
new_scan_end = time.time()
log.info(f"Total time to create empty full scan: {new_scan_end - new_scan_start:.2f}")

# Clean up the temporary empty file
for temp_file in empty_files:
try:
os.unlink(temp_file)
log.debug(f"Cleaned up temporary file: {temp_file}")
except OSError as e:
log.warning(f"Failed to clean up temporary file {temp_file}: {e}")
except Exception as e:
# Clean up temp files even if scan creation fails
for temp_file in empty_files:
try:
os.unlink(temp_file)
except OSError:
pass
raise e
else:
try:
# Create new scan
new_scan_start = time.time()
new_full_scan = self.create_full_scan(all_files, params, base_paths=base_paths)
new_scan_end = time.time()
log.info(f"Total time to create new full scan: {new_scan_end - new_scan_start:.2f}")
except APIFailure as e:
log.error(f"Failed to create full scan: {e}")
raise

# Construct report URL
base_socket = "https://socket.dev/dashboard/org"
Expand Down Expand Up @@ -888,8 +913,11 @@ def create_new_diff(
if save_manifest_tar_path and all_files and paths:
self.save_manifest_tar(all_files, save_manifest_tar_path, paths[0])

# If no supported files found, create empty scan for comparison
scan_files = all_files
if not all_files:
return Diff(id="NO_DIFF_RAN", diff_url="", report_url="")
log.info("No supported manifest files found - creating empty scan for diff comparison")
scan_files = Core.empty_head_scan_file()

try:
# Get head scan ID
Expand Down Expand Up @@ -932,19 +960,43 @@ def create_new_diff(
raise e

# Create new scan
temp_files_to_cleanup = []
if not all_files: # We're using empty scan files
temp_files_to_cleanup = scan_files

try:
new_scan_start = time.time()
new_full_scan = self.create_full_scan(all_files, params, base_paths=base_paths)
new_full_scan = self.create_full_scan(scan_files, params, base_paths=base_paths)
new_scan_end = time.time()
log.info(f"Total time to create new full scan: {new_scan_end - new_scan_start:.2f}")
except APIFailure as e:
log.error(f"API Error: {e}")
# Clean up temp files if any
for temp_file in temp_files_to_cleanup:
try:
os.unlink(temp_file)
except OSError:
pass
sys.exit(1)
except Exception as e:
import traceback
log.error(f"Error creating new full scan: {str(e)}")
log.error(f"Stack trace:\n{traceback.format_exc()}")
# Clean up temp files if any
for temp_file in temp_files_to_cleanup:
try:
os.unlink(temp_file)
except OSError:
pass
raise
finally:
# Clean up temporary empty files if they were created
for temp_file in temp_files_to_cleanup:
try:
os.unlink(temp_file)
log.debug(f"Cleaned up temporary file: {temp_file}")
except OSError as e:
log.warning(f"Failed to clean up temporary file {temp_file}: {e}")

# Handle diff generation - now we always have both scans
scans_ready = self.check_full_scans_status(head_full_scan_id, new_full_scan.id)
Expand Down
5 changes: 5 additions & 0 deletions socketsecurity/core/tools/reachability.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def run_reachability_analysis(
concurrency: Optional[int] = None,
additional_params: Optional[List[str]] = None,
allow_unverified: bool = False,
enable_debug: bool = False,
) -> Dict[str, Any]:
"""
Run reachability analysis.
Expand All @@ -123,6 +124,7 @@ def run_reachability_analysis(
concurrency: Concurrency level for analysis (must be >= 1)
additional_params: Additional parameters to pass to coana CLI
allow_unverified: Disable SSL certificate verification (sets NODE_TLS_REJECT_UNAUTHORIZED=0)
enable_debug: Enable debug mode (passes -d flag to coana CLI)

Returns:
Dict containing scan_id and report_path
Expand Down Expand Up @@ -173,6 +175,9 @@ def run_reachability_analysis(
if concurrency:
cmd.extend(["--concurrency", str(concurrency)])

if enable_debug:
cmd.append("-d")

# Add any additional parameters provided by the user
if additional_params:
cmd.extend(additional_params)
Expand Down
3 changes: 2 additions & 1 deletion socketsecurity/socketcli.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ def main_code():
version=config.reach_version,
concurrency=config.reach_concurrency,
additional_params=config.reach_additional_params,
allow_unverified=config.allow_unverified
allow_unverified=config.allow_unverified,
enable_debug=config.enable_debug
)

log.info(f"Reachability analysis completed successfully")
Expand Down
Loading