From 593c870426bed99eea4354a20c708b1ec32ce0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Lu=CC=88hrs?= Date: Tue, 16 May 2017 10:36:40 +0200 Subject: [PATCH] Implemented helper to use multiple kernels created extern for pythons multiprocessing.Process class --- duell.py | 75 +++++++++++++++++++++++++- duell/Duell.hx | 1 + duell/helpers/ParallelProcessHelper.hx | 73 +++++++++++++++++++++++++ python/multiprocessing/Process.hx | 46 ++++++++++++++++ 4 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 duell/helpers/ParallelProcessHelper.hx create mode 100644 python/multiprocessing/Process.hx diff --git a/duell.py b/duell.py index 2b839d5..fee60c6 100644 --- a/duell.py +++ b/duell.py @@ -24,6 +24,7 @@ from os import path as python_lib_os_Path from subprocess import Popen as python_lib_subprocess_Popen import urllib.parse as python_lib_urllib_Parse +from multiprocessing import Process as python_multiprocessing_Process from sys import path as python_sys_Path from urllib import request as python_urllib_Request @@ -3991,6 +3992,76 @@ def cutoutMetadata(value): _hx_classes["duell.helpers.LogHelper"] = duell_helpers_LogHelper +class duell_helpers_ParallelProcessHelper: + _hx_class_name = "duell.helpers.ParallelProcessHelper" + _hx_fields = ["processList"] + _hx_methods = ["run", "getLogicalKernels"] + + def __init__(self): + self.processList = None + + def run(self,jobs): + if (jobs is None): + return + numberKernels = self.getLogicalKernels() + self.processList = list() + duell_helpers_LogHelper.info("",(("Running jobs on " + Std.string(numberKernels)) + " Kernels ... ")) + _g1 = 0 + _g = len(jobs) + while (_g1 < _g): + i = _g1 + _g1 = (_g1 + 1) + job = (jobs[i] if i >= 0 and i < len(jobs) else None) + args = None + if hasattr(job,(("_hx_" + "args") if ("args" in python_Boot.keywords) else (("_hx_" + "args") if (((((len("args") > 2) and ((ord("args"[0]) == 95))) and ((ord("args"[1]) == 95))) and ((ord("args"[(len("args") - 1)]) != 95)))) else "args"))): + args = tuple(Reflect.field(job,"args")) + else: + args = None + p = python_multiprocessing_Process(Reflect.field(job,"group"), job.job, Reflect.field(job,"name"), args) + _this = self.processList + _this.append(p) + p.start() + if ((numberKernels == len(self.processList)) or ((i == ((len(jobs) - 1))))): + _g2 = 0 + _g3 = self.processList + while (_g2 < len(_g3)): + proc = (_g3[_g2] if _g2 >= 0 and _g2 < len(_g3) else None) + _g2 = (_g2 + 1) + proc.join() + self.processList = [] + + def getLogicalKernels(self): + numberKernelsString = "" + numberKernels = 0 + if (duell_helpers_PlatformHelper.get_hostPlatform() == duell_helpers_Platform.WINDOWS): + dp = duell_objects_DuellProcess("", "wmlc", ["cpu", "get", "NumberOfLogicalProcessors"], _hx_AnonObject({'block': True, 'systemCommand': True, 'errorMessage': "grabbing number of kernels"})) + dpOutput = dp.getCompleteStdout().toString() + rows = dpOutput.split("\n") + if (len(rows) > 1): + numberKernelsString = StringTools.trim((rows[1] if 1 < len(rows) else None)) + else: + numberKernelsString = "" + else: + dp1 = duell_objects_DuellProcess("", "sysctl", ["-n", "hw.logicalcpu_max"], _hx_AnonObject({'block': True, 'systemCommand': True, 'errorMessage': "grabbing number of kernels"})) + numberKernelsString = dp1.getCompleteStdout().toString() + try: + numberKernels = Std.parseInt(numberKernelsString) + except Exception as _hx_e: + _hx_e1 = _hx_e.val if isinstance(_hx_e, _HxException) else _hx_e + e = _hx_e1 + duell_helpers_LogHelper.warn(("Error grabbing number of kernels! " + Std.string(e))) + if ((numberKernels != 0) and ((numberKernels is not None))): + return numberKernels + else: + return 1 + + @staticmethod + def _hx_empty_init(_hx_o): + _hx_o.processList = None +duell_helpers_ParallelProcessHelper._hx_class = duell_helpers_ParallelProcessHelper +_hx_classes["duell.helpers.ParallelProcessHelper"] = duell_helpers_ParallelProcessHelper + + class duell_helpers_PathHelper: _hx_class_name = "duell.helpers.PathHelper" _hx_statics = ["mkdir", "unescape", "escape", "expand", "stripQuotes", "isLink", "removeDirectory", "getRecursiveFileListUnderFolder", "getRecursiveFolderListUnderFolder", "getFolderListUnderFolder", "getHomeFolder", "isPathRooted"] @@ -4990,8 +5061,8 @@ def run(self,e): while (_g_head is not None): e3 = None def _hx_local_0(): - nonlocal _g_head nonlocal _g_val + nonlocal _g_head _g_val = (_g_head[0] if 0 < len(_g_head) else None) _g_head = (_g_head[1] if 1 < len(_g_head) else None) return _g_val @@ -5041,8 +5112,8 @@ def _hx_local_0(): while (_g_head1 is not None): p = None def _hx_local_3(): - nonlocal _g_head1 nonlocal _g_val1 + nonlocal _g_head1 _g_val1 = (_g_head1[0] if 0 < len(_g_head1) else None) _g_head1 = (_g_head1[1] if 1 < len(_g_head1) else None) return _g_val1 diff --git a/duell/Duell.hx b/duell/Duell.hx index d7e301b..4463386 100644 --- a/duell/Duell.hx +++ b/duell/Duell.hx @@ -39,6 +39,7 @@ import duell.objects.DuellConfigJSON; import duell.helpers.AskHelper; import duell.helpers.DuellLibHelper; import duell.objects.DuellLib; +import duell.helpers.ParallelProcessHelper; import duell.commands.RunCommand; import duell.commands.BuildCommand; diff --git a/duell/helpers/ParallelProcessHelper.hx b/duell/helpers/ParallelProcessHelper.hx new file mode 100644 index 0000000..f1fb541 --- /dev/null +++ b/duell/helpers/ParallelProcessHelper.hx @@ -0,0 +1,73 @@ +package duell.helpers; + +import haxe.Constraints.Function; + +import python.Tuple; +import python.multiprocessing.Process; + +import duell.objects.DuellProcess; +import duell.helpers.PlatformHelper; +import duell.helpers.LogHelper; + +using StringTools; + +typedef ParallelJob = { + @:optional var group:Int; + var job:Function; + @:optional var name:String; + @:optional var args:Array; +} + +class ParallelProcessHelper { + + private var processList:Array; + + public function new() {} + + public function run( jobs:Array ):Void { + if( jobs == null ) return; + + var numberKernels = getLogicalKernels(); + processList = new Array(); + LogHelper.info('', 'Running jobs on $numberKernels Kernels ... '); + for( i in 0...jobs.length ) { + var job = jobs[ i ]; + var args = Reflect.hasField( job, 'args' ) ? new Tuple( job.args ) : null; + var p = new Process( Reflect.field( job, 'group' ), + job.job, + Reflect.field( job, 'name'), + args); + processList.push( p ); + p.start(); + + if( numberKernels == processList.length || i == jobs.length-1 ) { + for ( proc in processList ) proc.join(); + + processList = []; + } + } + } + + private function getLogicalKernels():Int { + var numberKernelsString = ""; + var numberKernels = 0; + + if (PlatformHelper.hostPlatform == Platform.WINDOWS){ + var dp = new DuellProcess("", "wmlc", ["cpu", "get", "NumberOfLogicalProcessors"], {block:true, systemCommand:true, errorMessage: "grabbing number of kernels"}); + var dpOutput = dp.getCompleteStdout().toString(); + var rows = dpOutput.split("\n"); + numberKernelsString = rows.length > 1 ? rows[1].trim() : ""; + } else { + var dp = new DuellProcess("", "sysctl", ["-n", "hw.logicalcpu_max"], {block:true, systemCommand:true, errorMessage: "grabbing number of kernels"}); + numberKernelsString = dp.getCompleteStdout().toString(); + } + + try { + numberKernels = Std.parseInt( numberKernelsString ); + } catch( e:Dynamic ) { + LogHelper.warn("Error grabbing number of kernels! " + e); + } + + return numberKernels != 0 && numberKernels != null ? numberKernels : 1; + } +} \ No newline at end of file diff --git a/python/multiprocessing/Process.hx b/python/multiprocessing/Process.hx new file mode 100644 index 0000000..7113b12 --- /dev/null +++ b/python/multiprocessing/Process.hx @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2003-2015, GameDuell GmbH + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE,a DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package python.multiprocessing; + +import python.Tuple; + +import haxe.Constraints.Function; + +@:pythonImport("multiprocessing", "Process") +extern class Process { + + var name:String; + var pid:Int; + var exitcode:Int; + + function new(group:Null=null, target:Function=null, name:String=null, ?args:Tuple ); + function run():Void; + function start():Void; + function join( ?timeout:Int ):Void; + function is_alive():Bool; + function terminate():Void; +} \ No newline at end of file