77from dataclasses import dataclass , field
88from enum import Enum
99
10+ from cortex .validators import DANGEROUS_PATTERNS
11+
1012
1113class TaskStatus (Enum ):
1214 PENDING = "pending"
@@ -55,36 +57,23 @@ async def run_single_task(task: ParallelTask, executor, timeout: int, log_callba
5557 log_callback (f"Starting { task .name } …" , "info" )
5658
5759 # Validate command for dangerous patterns
58- DANGEROUS_PATTERNS = [
59- r'rm\s+-rf\s+[/\*]' ,
60- r'rm\s+--no-preserve-root' ,
61- r'dd\s+if=.*of=/dev/' ,
62- r'curl\s+.*\|\s*sh' ,
63- r'curl\s+.*\|\s*bash' ,
64- r'wget\s+.*\|\s*sh' ,
65- r'wget\s+.*\|\s*bash' ,
66- r'\beval\s+' ,
67- r'base64\s+-d\s+.*\|' ,
68- r'>\s*/etc/' ,
69- r'chmod\s+777' ,
70- r'chmod\s+\+s' ,
71- ]
72-
7360 for pattern in DANGEROUS_PATTERNS :
7461 if re .search (pattern , task .command , re .IGNORECASE ):
7562 task .status = TaskStatus .FAILED
76- task .error = f "Command blocked: matches dangerous pattern"
63+ task .error = "Command blocked: matches dangerous pattern"
7764 task .end_time = time .time ()
7865 if log_callback :
7966 log_callback (f"Finished { task .name } (failed)" , "error" )
8067 return False
8168
8269 try :
8370 # Run command in executor (thread pool) to avoid blocking the event loop
84- loop = asyncio .get_event_loop ()
71+ loop = asyncio .get_running_loop ()
8572 result = await asyncio .wait_for (
8673 loop .run_in_executor (
8774 executor ,
75+ # Use shell=True carefully - commands are validated against dangerous patterns above.
76+ # shell=True is required to support complex shell commands (e.g., pipes, redirects).
8877 lambda : subprocess .run (
8978 task .command ,
9079 shell = True ,
@@ -148,7 +137,7 @@ async def run_parallel_install(
148137 log_callback: Optional callback for logging (called with message and level)
149138
150139 Returns:
151- Tuple of (success: bool, tasks: List[ParallelTask])
140+ tuple[ bool, List[ParallelTask]]: Success status and list of all tasks
152141 """
153142 if not commands :
154143 return True , []
@@ -190,14 +179,31 @@ async def run_parallel_install(
190179 while pending or running :
191180 # Start tasks whose dependencies are met
192181 ready_to_start = []
193- for task_name in list ( pending ):
182+ for task_name in pending . copy ( ):
194183 task = tasks [task_name ]
195- deps_met = all (dep in completed for dep in task .dependencies )
184+ # When stop_on_error=False, accept both completed and failed dependencies
185+ if stop_on_error :
186+ deps_met = all (dep in completed for dep in task .dependencies )
187+ else :
188+ deps_met = all (dep in completed or dep in failed for dep in task .dependencies )
196189
197190 if deps_met :
198191 ready_to_start .append (task_name )
199192 pending .remove (task_name )
200193
194+ # If no tasks can be started and none are running, we're stuck (deadlock/cycle detection)
195+ if not ready_to_start and not running and pending :
196+ # Mark remaining tasks as skipped - they have unresolvable dependencies
197+ for task_name in pending :
198+ task = tasks [task_name ]
199+ if task .status == TaskStatus .PENDING :
200+ task .status = TaskStatus .SKIPPED
201+ task .error = "Task could not run because dependencies never completed"
202+ if log_callback :
203+ log_callback (f"{ task_name } skipped due to unresolved dependencies" , "error" )
204+ failed .update (pending )
205+ break
206+
201207 # Create tasks for ready items
202208 for task_name in ready_to_start :
203209 coro = run_single_task (tasks [task_name ], executor , timeout , log_callback )
@@ -217,10 +223,20 @@ async def run_parallel_install(
217223 # Process completed tasks
218224 for task_coro in done :
219225 # Find which task this is
220- for task_name , running_coro in list ( running .items () ):
226+ for task_name , running_coro in running .items ():
221227 if running_coro is task_coro :
222228 task = tasks [task_name ]
223- success = task_coro .result ()
229+
230+ # Handle cancelled tasks
231+ try :
232+ success = task_coro .result ()
233+ except asyncio .CancelledError :
234+ # Task was cancelled due to stop_on_error
235+ task .status = TaskStatus .SKIPPED
236+ task .error = "Task cancelled due to dependency failure"
237+ failed .add (task_name )
238+ del running [task_name ]
239+ break
224240
225241 if success :
226242 completed .add (task_name )
0 commit comments