Skip to content

Commit

Permalink
Implemented helper to use multiple kernels
Browse files Browse the repository at this point in the history
created extern for pythons multiprocessing.Process class
  • Loading branch information
clue4gd committed May 16, 2017
1 parent 661d519 commit 593c870
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 2 deletions.
75 changes: 73 additions & 2 deletions duell.py
Expand Up @@ -24,6 +24,7 @@
from os import path as python_lib_os_Path
from subprocess import Popen as python_lib_subprocess_Popen
import urllib.parse as python_lib_urllib_Parse
from multiprocessing import Process as python_multiprocessing_Process
from sys import path as python_sys_Path
from urllib import request as python_urllib_Request

Expand Down Expand Up @@ -3991,6 +3992,76 @@ def cutoutMetadata(value):
_hx_classes["duell.helpers.LogHelper"] = duell_helpers_LogHelper


class duell_helpers_ParallelProcessHelper:
_hx_class_name = "duell.helpers.ParallelProcessHelper"
_hx_fields = ["processList"]
_hx_methods = ["run", "getLogicalKernels"]

def __init__(self):
self.processList = None

def run(self,jobs):
if (jobs is None):
return
numberKernels = self.getLogicalKernels()
self.processList = list()
duell_helpers_LogHelper.info("",(("Running jobs on " + Std.string(numberKernels)) + " Kernels ... "))
_g1 = 0
_g = len(jobs)
while (_g1 < _g):
i = _g1
_g1 = (_g1 + 1)
job = (jobs[i] if i >= 0 and i < len(jobs) else None)
args = None
if hasattr(job,(("_hx_" + "args") if ("args" in python_Boot.keywords) else (("_hx_" + "args") if (((((len("args") > 2) and ((ord("args"[0]) == 95))) and ((ord("args"[1]) == 95))) and ((ord("args"[(len("args") - 1)]) != 95)))) else "args"))):
args = tuple(Reflect.field(job,"args"))
else:
args = None
p = python_multiprocessing_Process(Reflect.field(job,"group"), job.job, Reflect.field(job,"name"), args)
_this = self.processList
_this.append(p)
p.start()
if ((numberKernels == len(self.processList)) or ((i == ((len(jobs) - 1))))):
_g2 = 0
_g3 = self.processList
while (_g2 < len(_g3)):
proc = (_g3[_g2] if _g2 >= 0 and _g2 < len(_g3) else None)
_g2 = (_g2 + 1)
proc.join()
self.processList = []

def getLogicalKernels(self):
numberKernelsString = ""
numberKernels = 0
if (duell_helpers_PlatformHelper.get_hostPlatform() == duell_helpers_Platform.WINDOWS):
dp = duell_objects_DuellProcess("", "wmlc", ["cpu", "get", "NumberOfLogicalProcessors"], _hx_AnonObject({'block': True, 'systemCommand': True, 'errorMessage': "grabbing number of kernels"}))
dpOutput = dp.getCompleteStdout().toString()
rows = dpOutput.split("\n")
if (len(rows) > 1):
numberKernelsString = StringTools.trim((rows[1] if 1 < len(rows) else None))
else:
numberKernelsString = ""
else:
dp1 = duell_objects_DuellProcess("", "sysctl", ["-n", "hw.logicalcpu_max"], _hx_AnonObject({'block': True, 'systemCommand': True, 'errorMessage': "grabbing number of kernels"}))
numberKernelsString = dp1.getCompleteStdout().toString()
try:
numberKernels = Std.parseInt(numberKernelsString)
except Exception as _hx_e:
_hx_e1 = _hx_e.val if isinstance(_hx_e, _HxException) else _hx_e
e = _hx_e1
duell_helpers_LogHelper.warn(("Error grabbing number of kernels! " + Std.string(e)))
if ((numberKernels != 0) and ((numberKernels is not None))):
return numberKernels
else:
return 1

@staticmethod
def _hx_empty_init(_hx_o):
_hx_o.processList = None
duell_helpers_ParallelProcessHelper._hx_class = duell_helpers_ParallelProcessHelper
_hx_classes["duell.helpers.ParallelProcessHelper"] = duell_helpers_ParallelProcessHelper


class duell_helpers_PathHelper:
_hx_class_name = "duell.helpers.PathHelper"
_hx_statics = ["mkdir", "unescape", "escape", "expand", "stripQuotes", "isLink", "removeDirectory", "getRecursiveFileListUnderFolder", "getRecursiveFolderListUnderFolder", "getFolderListUnderFolder", "getHomeFolder", "isPathRooted"]
Expand Down Expand Up @@ -4990,8 +5061,8 @@ def run(self,e):
while (_g_head is not None):
e3 = None
def _hx_local_0():
nonlocal _g_head
nonlocal _g_val
nonlocal _g_head
_g_val = (_g_head[0] if 0 < len(_g_head) else None)
_g_head = (_g_head[1] if 1 < len(_g_head) else None)
return _g_val
Expand Down Expand Up @@ -5041,8 +5112,8 @@ def _hx_local_0():
while (_g_head1 is not None):
p = None
def _hx_local_3():
nonlocal _g_head1
nonlocal _g_val1
nonlocal _g_head1
_g_val1 = (_g_head1[0] if 0 < len(_g_head1) else None)
_g_head1 = (_g_head1[1] if 1 < len(_g_head1) else None)
return _g_val1
Expand Down
1 change: 1 addition & 0 deletions duell/Duell.hx
Expand Up @@ -39,6 +39,7 @@ import duell.objects.DuellConfigJSON;
import duell.helpers.AskHelper;
import duell.helpers.DuellLibHelper;
import duell.objects.DuellLib;
import duell.helpers.ParallelProcessHelper;

import duell.commands.RunCommand;
import duell.commands.BuildCommand;
Expand Down
73 changes: 73 additions & 0 deletions duell/helpers/ParallelProcessHelper.hx
@@ -0,0 +1,73 @@
package duell.helpers;

import haxe.Constraints.Function;

import python.Tuple;
import python.multiprocessing.Process;

import duell.objects.DuellProcess;
import duell.helpers.PlatformHelper;
import duell.helpers.LogHelper;

using StringTools;

typedef ParallelJob = {
@:optional var group:Int;
var job:Function;
@:optional var name:String;
@:optional var args:Array<Dynamic>;
}

class ParallelProcessHelper {

private var processList:Array<Process>;

public function new() {}

public function run( jobs:Array<ParallelJob> ):Void {
if( jobs == null ) return;

var numberKernels = getLogicalKernels();
processList = new Array<Process>();
LogHelper.info('', 'Running jobs on $numberKernels Kernels ... ');
for( i in 0...jobs.length ) {
var job = jobs[ i ];
var args = Reflect.hasField( job, 'args' ) ? new Tuple( job.args ) : null;
var p = new Process( Reflect.field( job, 'group' ),
job.job,
Reflect.field( job, 'name'),
args);
processList.push( p );
p.start();

if( numberKernels == processList.length || i == jobs.length-1 ) {
for ( proc in processList ) proc.join();

processList = [];
}
}
}

private function getLogicalKernels():Int {
var numberKernelsString = "";
var numberKernels = 0;

if (PlatformHelper.hostPlatform == Platform.WINDOWS){
var dp = new DuellProcess("", "wmlc", ["cpu", "get", "NumberOfLogicalProcessors"], {block:true, systemCommand:true, errorMessage: "grabbing number of kernels"});
var dpOutput = dp.getCompleteStdout().toString();
var rows = dpOutput.split("\n");
numberKernelsString = rows.length > 1 ? rows[1].trim() : "";
} else {
var dp = new DuellProcess("", "sysctl", ["-n", "hw.logicalcpu_max"], {block:true, systemCommand:true, errorMessage: "grabbing number of kernels"});
numberKernelsString = dp.getCompleteStdout().toString();
}

try {
numberKernels = Std.parseInt( numberKernelsString );
} catch( e:Dynamic ) {
LogHelper.warn("Error grabbing number of kernels! " + e);
}

return numberKernels != 0 && numberKernels != null ? numberKernels : 1;
}
}
46 changes: 46 additions & 0 deletions python/multiprocessing/Process.hx
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2003-2015, GameDuell GmbH
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE,a DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package python.multiprocessing;

import python.Tuple;

import haxe.Constraints.Function;

@:pythonImport("multiprocessing", "Process")
extern class Process {

var name:String;
var pid:Int;
var exitcode:Int;

function new<T>(group:Null<Int>=null, target:Function=null, name:String=null, ?args:Tuple<Dynamic> );
function run():Void;
function start():Void;
function join( ?timeout:Int ):Void;
function is_alive():Bool;
function terminate():Void;
}

0 comments on commit 593c870

Please sign in to comment.