Skip to content

Commit

Permalink
build: v2.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
danangjoyoo committed Dec 28, 2022
1 parent 4f2a287 commit 38d287d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,36 @@ To check it, you can just print out the `os.getpid()`
import os
from worker import worker, process
@worker(multiproc=True)
def run_in_new_process_from_worker(parent_pid):
print(f"from {parent_pid} running in a new process {os.getpid()} - from worker.mutliproc==True")
return "return from process"
@process
def run_in_new_process(parent_pid):
print(f"from {parent_pid} running in a new process {os.getpid()}")
print(f"from {parent_pid} running in a new process {os.getpid()} - from process")
return "return from process"
@worker
def run_in_new_thread(parent_pid):
print(f"from {parent_pid} running in a new thread {os.getpid()}")
print(f"from {parent_pid} running in a new thread {os.getpid()} - from worker.multiproc==False")
return "return from thread"
print(f"this is on main thread {os.getpid()}")
run_in_new_process_from_worker(os.getpid())
run_in_new_process(os.getpid())
run_in_new_thread(os.getpid())
```

then run the script
```
(venv) danangjoyoo@danangjoyoo:~/dev/expython/multiproc$ python test2.py
this is on main thread 58951
from 58951 running in a new thread 58951
from 58951 running in a new process 58953
this is on main thread 29535
from 29535 running in a new process 29537 - from worker.mutliproc==True
from 29535 running in a new thread 29535 - from worker.multiproc==False
from 29535 running in a new process 29538 - from process
```

you can see the different of process id between running in a new process and thread
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()

VERSION = '2.0.2'
VERSION = '2.1.0'
DESCRIPTION = 'Simplify and master control (run and stop) the python threads (workers)'

# Setting up
Expand Down
11 changes: 8 additions & 3 deletions worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ def worker(function: FunctionType) -> ThreadedFunction: ...
def worker(
name: Optional[str] = "",
on_abort: Optional[FunctionType] = None,
keyboard_interrupt: Optional[bool] = True
keyboard_interrupt: Optional[bool] = True,
multiproc: bool = False
) -> ThreadedFunction: ...


@overload
def worker(
on_abort: Optional[FunctionType] = None,
keyboard_interrupt: Optional[bool] = True
keyboard_interrupt: Optional[bool] = True,
multiproc: bool = False
) -> ThreadedFunction: ...


Expand All @@ -34,7 +36,7 @@ def worker(
name: Optional[str] = "",
on_abort: Optional[FunctionType] = None,
keyboard_interrupt: Optional[bool] = True,
timeout: float = 0,
multiproc: bool = False,
**kargs
):
"""
Expand All @@ -47,6 +49,9 @@ def worker(
- @worker(name="looping backapp", keyboard_interrupt=True)
- @worker(keyboard_interrupt=True, on_abort: lambda: print("its over"))
"""
if multiproc:
return process

if args:
if type(args[0]) == FunctionType:
assert not inspect.iscoroutinefunction(args[0]), "please use `async_worker` instead for coroutine function"
Expand Down
37 changes: 29 additions & 8 deletions worker/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def execute_in_process_event_loop(self, conn: Connection):
input_params = conn.recv()
args = input_params.get("args", [])
kwargs = input_params.get("kwargs", {})
result = self.raw_func(*args, **kwargs)
result = await self.raw_func(*args, **kwargs)
try:
json.dumps(result)
except Exception as e:
Expand All @@ -64,9 +64,23 @@ async def execute_in_process_event_loop(self, conn: Connection):
def create_and_run(self, *args, **kwargs):
self.parent_con, self.child_con = Pipe()
if self.is_async:
self.proc = Process(target=asyncio.run(self.execute_in_process_event_loop(self.child_con)))
self.proc = Process(
target=lambda: (
asyncio.run(
self.execute_in_process_event_loop(
self.child_con
)
)
)
)
else:
self.proc = Process(target=lambda: self.execute_in_process(self.child_con))
self.proc = Process(
target=lambda: (
self.execute_in_process(
self.child_con
)
)
)
self.proc.start()
self.parent_con.send({"args": args, "kwargs": kwargs})
return self.parent_con
Expand All @@ -87,8 +101,15 @@ def receive(self):
@staticmethod
def create_process(function: FunctionType):
assert isinstance(function, FunctionType), "only accept function for process"
def run_in_different_proc(*args, **kwargs):
pc = ProcessConnector(function)
pc.create_and_run(*args, **kwargs)
return pc
return run_in_different_proc
if inspect.iscoroutinefunction(function):
async def run_in_different_proc(*args, **kwargs):
pc = ProcessConnector(function)
pc.create_and_run(*args, **kwargs)
return pc
return run_in_different_proc
else:
def run_in_different_proc(*args, **kwargs):
pc = ProcessConnector(function)
pc.create_and_run(*args, **kwargs)
return pc
return run_in_different_proc

0 comments on commit 38d287d

Please sign in to comment.