/
test_c2_fvec.py
124 lines (101 loc) · 5.55 KB
/
test_c2_fvec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import unittest, sys, time
sys.path.extend(['.','..','../..','py'])
import h2o, h2o_cmd, h2o_import as h2i, h2o_glm, h2o_common, h2o_exec as h2e
import h2o_print
DO_GLM = False
LOG_MACHINE_STATS = False
print "Assumes you ran ../build_for_clone.py in this directory"
print "Using h2o-nodes.json. Also the sandbox dir"
class releaseTest(h2o_common.ReleaseCommon, unittest.TestCase):
def sub_c2_fvec_long(self):
# a kludge
h2o.setup_benchmark_log()
avgMichalSize = 116561140
bucket = 'home-0xdiag-datasets'
### importFolderPath = 'more1_1200_link'
importFolderPath = 'manyfiles-nflx-gz'
print "Using .gz'ed files in", importFolderPath
csvFilenameList= [
("*[1][0-4][0-9].dat.gz", "file_50_A.dat.gz", 50 * avgMichalSize, 1800),
# ("*[1][0-9][0-9].dat.gz", "file_100_A.dat.gz", 100 * avgMichalSize, 3600),
]
if LOG_MACHINE_STATS:
benchmarkLogging = ['cpu', 'disk', 'network']
else:
benchmarkLogging = []
pollTimeoutSecs = 120
retryDelaySecs = 10
for trial, (csvFilepattern, csvFilename, totalBytes, timeoutSecs) in enumerate(csvFilenameList):
csvPathname = importFolderPath + "/" + csvFilepattern
# double import still causing problems?
# (importResult, importPattern) = h2i.import_only(bucket=bucket, path=csvPathname, schema='local')
# importFullList = importResult['files']
# importFailList = importResult['fails']
# print "\n Problem if this is not empty: importFailList:", h2o.dump_json(importFailList)
# this accumulates performance stats into a benchmark log over multiple runs
# good for tracking whether we're getting slower or faster
h2o.cloudPerfH2O.change_logfile(csvFilename)
h2o.cloudPerfH2O.message("")
h2o.cloudPerfH2O.message("Parse " + csvFilename + " Start--------------------------------")
start = time.time()
parseResult = h2i.import_parse(bucket=bucket, path=csvPathname, schema='local',
hex_key=csvFilename + ".hex", timeoutSecs=timeoutSecs,
retryDelaySecs=retryDelaySecs,
pollTimeoutSecs=pollTimeoutSecs,
benchmarkLogging=benchmarkLogging)
elapsed = time.time() - start
print "Parse #", trial, "completed in", "%6.2f" % elapsed, "seconds.", \
"%d pct. of timeout" % ((elapsed*100)/timeoutSecs)
print "Parse result['destination_key']:", parseResult['destination_key']
h2o_cmd.columnInfoFromInspect(parseResult['destination_key'], exceptionOnMissingValues=False)
if totalBytes is not None:
fileMBS = (totalBytes/1e6)/elapsed
msg = '{!s} jvms, {!s}GB heap, {:s} {:s} {:6.2f} MB/sec for {:.2f} secs'.format(
len(h2o.nodes), h2o.nodes[0].java_heap_GB, csvFilepattern, csvFilename, fileMBS, elapsed)
print msg
h2o.cloudPerfH2O.message(msg)
if DO_GLM:
# these are all the columns that are enums in the dataset...too many for GLM!
x = range(542) # don't include the output column
# remove the output too! (378)
ignore_x = []
for i in [3,4,5,6,7,8,9,10,11,14,16,17,18,19,20,424,425,426,540,541]:
x.remove(i)
ignore_x.append(i)
# plus 1 because we are no longer 0 offset
x = ",".join(map(lambda x: "C" + str(x+1), x))
ignore_x = ",".join(map(lambda x: "C" + str(x+1), ignore_x))
GLMkwargs = {
'ignored_cols': ignore_x,
'family': 'binomial',
'response': 'C379',
'max_iter': 4,
'n_folds': 1,
'family': 'binomial',
'alpha': 0.2,
'lambda': 1e-5
}
# convert to binomial
execExpr="A.hex=%s" % parseResult['destination_key']
h2e.exec_expr(execExpr=execExpr, timeoutSecs=180)
execExpr="A.hex[,%s]=(A.hex[,%s]>%s)" % ('379', '379', 15)
h2e.exec_expr(execExpr=execExpr, timeoutSecs=180)
aHack = {'destination_key': "A.hex"}
start = time.time()
glm = h2o_cmd.runGLM(parseResult=aHack, timeoutSecs=timeoutSecs, **GLMkwargs)
elapsed = time.time() - start
h2o.check_sandbox_for_errors()
h2o_glm.simpleCheckGLM(self, glm, None, **GLMkwargs)
msg = '{:d} jvms, {:d}GB heap, {:s} {:s} GLM: {:6.2f} secs'.format(
len(h2o.nodes), h2o.nodes[0].java_heap_GB, csvFilepattern, csvFilename, elapsed)
print msg
h2o.cloudPerfH2O.message(msg)
h2o_cmd.checkKeyDistribution()
#***********************************************************************
# these will be tracked individual by jenkins, which is nice
#***********************************************************************
def test_B_c2_fvec_long(self):
h2o.beta_features = True
self.sub_c2_fvec_long()
if __name__ == '__main__':
h2o.unit_main()