/
MemoryMgmt.chpl
184 lines (151 loc) · 6.69 KB
/
MemoryMgmt.chpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
module MemoryMgmt {
use Subprocess;
use Logging;
use ArkoudaMemDiagnosticsCompat;
use ArkoudaFileCompat;
private config const logLevel = LogLevel.DEBUG;
private config const logChannel = LogChannel.CONSOLE;
const mmLogger = new Logger(logLevel,logChannel);
/*
* Indicates whether static locale host memory or dynamically-captured
* memory allocated to the Arkouda process is used to estimate whether
* sufficient memory is available to execute the requested command.
*/
enum MemMgmtType {STATIC,DYNAMIC}
/*
* The percentage of currently available memory on each locale host
* that is the limit for memory allocated to each Arkouda locale.
*/
config const availableMemoryPct: real = 90;
/*
* Config param that indicates whether the static memory mgmt logic in
* ServerConfig or dynamic memory mgmt in this module will be used. The
* default is static memory mgmt.
*/
config const memMgmtType = MemMgmtType.STATIC;
record LocaleMemoryStatus {
var total_mem: uint(64);
var avail_mem: uint(64);
var pct_avail_mem: int;
var arkouda_mem_alloc: uint(64);
var mem_used: uint(64);
var locale_id: int;
var locale_hostname: string;
}
proc getArkoudaPid() : string throws {
var pid = spawn(["pgrep","arkouda_server"], stdout=pipeStyle.pipe);
var pid_string:string;
var line:string;
while pid.stdout.readLine(line) {
pid_string = line.strip();
}
pid.close();
return pid_string;
}
proc getArkoudaMemAlloc() : uint(64) throws {
var pid = getArkoudaPid();
var sub = spawn(["pmap", pid], stdout=pipeStyle.pipe);
var malloc_string:string;
var line:string;
while sub.stdout.readLine(line) {
if line.find("total") > 0 {
var splits = line.split('total');
malloc_string = splits(1).strip().strip('K');
}
}
sub.close();
return malloc_string:uint(64) * 1000;
}
proc getAvailMemory() : uint(64) throws {
var aFile = open('/proc/meminfo', ioMode.r);
var lines = aFile.reader().lines();
var line : string;
var memAvail:uint(64);
for line in lines do {
if line.find('MemAvailable:') >= 0 {
var splits = line.split('MemAvailable:');
memAvail = splits[1].strip().strip(' kB'):uint(64);
break;
}
}
return (AutoMath.round(availableMemoryPct/100 * memAvail)*1000):uint(64);
}
proc getTotalMemory() : uint(64) throws {
var aFile = open('/proc/meminfo', ioMode.r);
var lines = aFile.reader().lines();
var line : string;
var totalMem:uint(64);
for line in lines do {
if line.find('MemTotal:') >= 0 {
var splits = line.split('MemTotal:');
totalMem = splits[1].strip().strip(' kB'):uint(64);
break;
}
}
return totalMem*1000:uint(64);
}
proc getLocaleMemoryStatuses() throws {
var memStatuses: [0..numLocales-1] LocaleMemoryStatus;
coforall loc in Locales with (ref memStatuses) {
on loc {
var availMem = getAvailMemory();
var totalMem = getTotalMemory();
var pctAvailMem = (availMem:real/totalMem)*100:int;
memStatuses[here.id] = new LocaleMemoryStatus(total_mem=totalMem,
avail_mem=availMem,
pct_avail_mem=pctAvailMem:int,
arkouda_mem_alloc=getArkoudaMemAlloc(),
mem_used=memoryUsed(),
locale_id=here.id,
locale_hostname=here.hostname);
}
}
return memStatuses;
}
proc localeMemAvailable(reqMemory) : bool throws {
var arkMemAlloc = getArkoudaMemAlloc();
var arkMemUsed = memoryUsed();
var availMemory = getAvailMemory();
mmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"locale: %s reqMemory: %i arkMemAlloc: %i arkMemUsed: %i availMemory: %i".doFormat(here.id,
reqMemory,
arkMemAlloc,
arkMemUsed,
availMemory));
var newArkoudaMemory = reqMemory:int + arkMemUsed:int;
if newArkoudaMemory:int <= arkMemAlloc:int {
return true;
} else {
if newArkoudaMemory:int <= availMemory {
return true;
} else {
var msg = "Arkouda memory request %i on locale %s exceeds available memory %i".doFormat(newArkoudaMemory,
here.id,
availMemory);
mmLogger.error(getModuleName(),getRoutineName(),getLineNumber(),msg);
return false;
}
}
}
/*
* Returns a boolean indicating whether there is either sufficient memory within the
* memory allocated to Arkouda on each locale host; if true for all locales, returns true.
*
* If the reqMemory exceeds the memory currently allocated to at least one locale, each locale
* host is checked to see if there is memory available to allocate more memory to each
* corresponding locale. If there is insufficient memory available on at least one locale,
* returns false. If there is sufficient memory on all locales to allocate sufficient,
* additional memory to Arkouda to execute the command, returns true.
*/
proc isMemAvailable(reqMemory) : bool throws {
var overMemLimit : bool = false;
coforall loc in Locales with (ref overMemLimit) {
on loc {
if !localeMemAvailable(reqMemory) {
overMemLimit = true;
}
}
}
return if overMemLimit then false else true;
}
}