-
Notifications
You must be signed in to change notification settings - Fork 191
/
app_dev_dag_zhuozhao.py
159 lines (143 loc) · 4.07 KB
/
app_dev_dag_zhuozhao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# _*_ coding : utf-8 _*_
#
# Parsl DAG Application for strategy performance
# based on app_dag_zhuzhao.py
# Author: Takuya Kurihana
#
import argparse
import numpy as np
import psutil
import parsl
from parsl import *
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher
import logging
from parsl.app.app import python_app
# Argument settings
p = argparse.ArgumentParser()
p.add_argument(
"--executor",
help="executor option for configuration setting ",
type=str,
default='HighThroughput'
)
args = p.parse_args()
print('Argparse: --executor='+args.executor, flush=True)
# Here we can add htex_strategy for option
# config
print(type(args.executor) )
if args.executor == 'ThreadPool':
config = Config(
executors=[ThreadPoolExecutor(
#label='threads',
label='htex_local',
max_threads=5)
],
)
elif args.executor == 'HighThroughput':
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
# tasks_per_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
#strategy='htex_totaltime',
strategy='simple',
)
# TODO:
#try:
#except:
# raise NameError("Invalid parsed argument")
# Load config
print(config)
dfk = parsl.load(config)
@App('python', dfk)
def sleeper(dur=5):
import time
time.sleep(dur)
@App('python', dfk)
def cpu_stress(dur=30):
import time
s = 0
start = time.time()
for i in range(10**8):
s += i
if time.time() - start >= dur:
break
return s
@python_app
def inc(inputs=[]):
import time
import psutil
import numpy as np
start = time.time()
sleep_duration = 60.0
_inputs = np.asarray(inputs)
mems = _inputs[0].tolist()
cpus = _inputs[1].tolist()
x = 0
while True:
x += 1
end = time.time()
if (end - start) % 10 == 0:
mems += [psutil.virtual_memory().percent]
cpus += [psutil.cpu_percent()]
if end - start >= sleep_duration:
break
mems += [psutil.virtual_memory().percent]
cpus += [psutil.cpu_percent()]
return [mems, cpus]
@python_app
def add_inc(inputs=[]):
import time
import psutil
import numpy as np
start = time.time()
sleep_duration = 60.0
res = 0
_inputs = np.asarray(inputs)
mems = _inputs[0].tolist()
cpus = _inputs[1].tolist()
while True:
res += 1
end = time.time()
if (end - start) % 10 == 0:
mems += [psutil.virtual_memory().percent]
cpus += [psutil.cpu_percent()]
if end - start >= sleep_duration:
break
mems += [psutil.virtual_memory().percent]
cpus += [psutil.cpu_percent()]
return [mems, cpus]
if __name__ == "__main__":
total = 10
half = int(total / 2)
one_third = int(total / 3)
two_third = int(total / 3 * 2)
mems = [psutil.virtual_memory().percent]
cpus = [psutil.cpu_percent()]
inputs = [mems, cpus]
futures_1 = [inc(inputs) for i in range(total)]
futures_2 = [add_inc(inputs=futures_1[0:half]), add_inc(inputs=futures_1[half:total])]
futures_3 = [inc(futures_2[0]) for _ in range(half)] + [inc(futures_2[1]) for _ in range(half)]
futures_4 = [add_inc(inputs=futures_3[0:one_third]), add_inc(inputs=futures_3[one_third:two_third]), add_inc(inputs=futures_3[two_third:total])]
print([f.result() for f in futures_4])
print("Done")
# plotting
outputs = np.asarray([f.result() for f in futures_4])
print(outputs.shape)
cdir='/home/tkurihana/scratch-midway2/parsl/parsl/dataflow/workspace'
np.save(cdir+'/'+"outputs", outputs)