Skip to content

Commit

Permalink
Json merge (#586)
Browse files Browse the repository at this point in the history
* slowly getting there

* ok let's push it up, looks done to me

* travis cannot handle more that one job at a time

* fix autobounds issue
  • Loading branch information
doutriaux1 committed Nov 15, 2018
1 parent 0adc60a commit 9cf5870
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 23 deletions.
181 changes: 158 additions & 23 deletions pcmdi_metrics/io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@
basestring = str


# Group merged axes
def groupAxes(axes, final=[], ids=None, separator="_"):
if axes == []:
return cdms2.createAxis(final, id=separator.join(ids))
if final == []:
final = [val for val in axes[0]]
ids = [ax.id for ax in axes]
return groupAxes(axes[1:], final, ids)
axis = axes[0]
original_length = len(final)
final = final * len(axis)
idx = 0
for val in axis:
for i in range(original_length):
final[idx] = "{}{}{}".format(final[idx], separator, val)
idx += 1
return groupAxes(axes[1:], final, ids)


# cdutil region object need a serializer
def update_dict(d, u):
for k, v in u.items():
Expand All @@ -45,7 +64,10 @@ def update_dict(d, u):
# Platform
def populate_prov(prov, cmd, pairs, sep=None, index=1, fill_missing=False):
try:
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except Exception:
return
out, stde = p.communicate()
Expand Down Expand Up @@ -136,7 +158,9 @@ def generateProvenance():
}
prov["openGL"] = collections.OrderedDict()
populate_prov(prov["openGL"], "glxinfo", pairs, sep=":", index=-1)
prov["openGL"]["GLX"] = {"server": collections.OrderedDict(), "client": collections.OrderedDict()}
prov["openGL"]["GLX"] = {
"server": collections.OrderedDict(),
"client": collections.OrderedDict()}
pairs = {
"version": "GLX version",
}
Expand All @@ -145,12 +169,22 @@ def generateProvenance():
"vendor": "server glx vendor string",
"version": "server glx version string",
}
populate_prov(prov["openGL"]["GLX"]["server"], "glxinfo", pairs, sep=":", index=-1)
populate_prov(
prov["openGL"]["GLX"]["server"],
"glxinfo",
pairs,
sep=":",
index=-1)
pairs = {
"vendor": "client glx vendor string",
"version": "client glx version string",
}
populate_prov(prov["openGL"]["GLX"]["client"], "glxinfo", pairs, sep=":", index=-1)
populate_prov(
prov["openGL"]["GLX"]["client"],
"glxinfo",
pairs,
sep=":",
index=-1)
return prov


Expand Down Expand Up @@ -198,8 +232,15 @@ def write(self, data, type='json', *args, **kwargs):
'Could not create output directory: %s' % dir_path)

if self.type == 'json':
json_version = float(kwargs.get("json_version", data.get("json_version", 3.0)))
json_structure = kwargs.get("json_structure", data.get("json_structure", None))
json_version = float(
kwargs.get(
"json_version",
data.get(
"json_version",
3.0)))
json_structure = kwargs.get(
"json_structure", data.get(
"json_structure", None))
if json_version >= 3.0 and json_structure is None:
raise Exception(
"json_version 3.0 of PMP requires json_structure to be passed" +
Expand Down Expand Up @@ -232,7 +273,9 @@ def write(self, data, type='json', *args, **kwargs):
logging.getLogger("pcmdi_metrics").error('Unknown type: %s' % type)
raise RuntimeError('Unknown type: %s' % type)

logging.getLogger("pcmdi_metrics").info('Results saved to a %s file: %s' % (type, file_name))
logging.getLogger("pcmdi_metrics").info(
'Results saved to a %s file: %s' %
(type, file_name))

def get(self, var, var_in_file=None,
region={}, *args, **kwargs):
Expand Down Expand Up @@ -342,7 +385,8 @@ def set_target_grid(self, target, regrid_tool='esmf',
self.target_grid = target
self.target_grid_name = target
else:
logging.getLogger("pcmdi_metrics").error('Unknown grid: %s' % target)
logging.getLogger("pcmdi_metrics").error(
'Unknown grid: %s' % target)
raise RuntimeError('Unknown grid: %s' % target)

def setup_cdms2(self):
Expand Down Expand Up @@ -370,7 +414,8 @@ def addDict2Self(self, json_dict, json_struct, json_version):
m = V[model]
for ref in list(m.keys()):
aref = m[ref]
if not(isinstance(aref, dict) and "source" in aref): # not an obs key
if not(isinstance(aref, dict) and
"source" in aref): # not an obs key
continue
reals = list(aref.keys())
src = reals.pop(reals.index("source"))
Expand Down Expand Up @@ -399,15 +444,19 @@ def addDict2Self(self, json_dict, json_struct, json_version):
domain = "global"
sp = new_key.split("_")
stat = "_".join(sp[:-1])
stat_dict = areal2[region2 + domain].get(stat, {})
stat_dict = areal2[region2 +
domain].get(stat, {})
season = sp[-1]
season_dict = stat_dict
stat_dict[season] = reg[k]
if stat in areal2[region2 + domain]:
areal2[region2 + domain][stat].update(stat_dict)
areal2[region2 +
domain][stat].update(stat_dict)
else:
areal2[region2 + domain][stat] = stat_dict
# Now we can replace the realization with the correctly formatted one
areal2[region2 +
domain][stat] = stat_dict
# Now we can replace the realization with the correctly
# formatted one
aref[real] = areal2
# restore ref into model
m[ref] = aref
Expand All @@ -417,7 +466,8 @@ def addDict2Self(self, json_dict, json_struct, json_version):
m = V[model]
for ref in list(m.keys()):
aref = m[ref]
if not(isinstance(aref, dict) and "source" in aref): # not an obs key
if not(isinstance(aref, dict) and
"source" in aref): # not an obs key
continue
reals = list(aref.keys())
src = reals.pop(reals.index("source"))
Expand Down Expand Up @@ -453,10 +503,12 @@ def addDict2Self(self, json_dict, json_struct, json_version):

def get_axes_values_recursive(self, depth, max_depth, data, values):
for k in list(data.keys()):
if k not in self.ignored_keys and (isinstance(data[k], dict) or depth == max_depth):
if k not in self.ignored_keys and (
isinstance(data[k], dict) or depth == max_depth):
values[depth].add(k)
if depth != max_depth:
self.get_axes_values_recursive(depth + 1, max_depth, data[k], values)
self.get_axes_values_recursive(
depth + 1, max_depth, data[k], values)

def get_array_values_from_dict_recursive(self, out, ids, nms, axval, axes):
if len(axes) > 0:
Expand All @@ -477,7 +529,8 @@ def get_array_values_from_dict_recursive(self, out, ids, nms, axval, axes):
except Exception:
out[tuple(ids)] = 1.e20

def __init__(self, files=[], structure=[], ignored_keys=[], oneVariablePerFile=True):
def __init__(self, files=[], structure=[], ignored_keys=[],
oneVariablePerFile=True):
self.json_version = 3.0
self.json_struct = structure
self.data = {}
Expand Down Expand Up @@ -530,7 +583,8 @@ def getAxisList(self):
axes = []
for a in self.json_struct:
values.append(set())
self.get_axes_values_recursive(0, len(self.json_struct) - 1, self.data, values)
self.get_axes_values_recursive(
0, len(self.json_struct) - 1, self.data, values)
autoBounds = cdms2.getAutoBounds()
cdms2.setAutoBounds("off")
for i, nm in enumerate(self.json_struct):
Expand All @@ -539,17 +593,34 @@ def getAxisList(self):
cdms2.setAutoBounds(autoBounds)
return self.axes

def __call__(self, **kargs):
def __call__(self, merge=[], **kargs):
""" Returns the array of values"""
ab = cdms2.getAutoBounds()
cdms2.setAutoBounds("off")
axes = self.getAxisList()
axes_ids = self.getAxisIds()
sh = []
ids = []
used_ids = []
for a in axes:
# Regular axis not a merged one
sh.append(len(a)) # store length to construct array shape
ids.append(a.id) # store ids

used_ids.append(a.id)

# first let's see which vars are actually asked for
# for now assume all keys means restriction on dims
if "merge" in kargs:
merge = kargs["merge"]
del(kargs["merge"])
if not isinstance(merge, (list, tuple)):
raise RuntimeError(
"merge keyword must be a list of dimensions to merge together")

if len(merge) > 0 and not isinstance(merge[0], (list, tuple)):
merge = [merge, ]

for axis_id in kargs:
if axis_id not in ids:
raise ValueError("Invalid axis '%s'" % axis_id)
Expand Down Expand Up @@ -579,7 +650,71 @@ def __call__(self, **kargs):
# Now let's fill this array
self.get_array_values_from_dict_recursive(array, [], [], [], axes)

array = MV2.masked_greater(array, 9.e19)
array.id = "pmp"
array.setAxisList(axes)
return array
# Ok at this point we need to take care of merged axes
# First let's create the merged axes
new_axes = [groupAxes([self.getAxis(x) for x in merger])
for merger in merge]
sh2 = list(sh)
for merger in merge:
for merger in merge: # loop through all possible merging
merged_indices = []
for id in merger:
merged_indices.append(axes_ids.index(id))
for indx in merged_indices:
sh2[indx] = 1
smallest = min(merged_indices)
for indx in merged_indices:
sh2[smallest] *= sh[indx]

myorder = []
for index in range(len(sh)):
if index in myorder:
continue
for merger in merge:
merger = [axes_ids.index(x) for x in merger]
if index in merger and index not in myorder:
for indx in merger:
myorder.append(indx)
if index not in myorder: # ok did not find this one anywhere
myorder.append(index)

outData = numpy.transpose(array, myorder)
outData = numpy.reshape(outData, sh2)

yank = []
for merger in merge:
merger = [axes_ids.index(x) for x in merger]
mn = min(merger)
merger.remove(mn)
yank += merger
yank = sorted(yank, reverse=True)
for yk in yank:
extract = (slice(0, None),) * yk
extract += (0,)
outData = outData[extract]
# Ok now let's apply the newaxes
sub = 0
outData = MV2.array(outData)
merged_axis_done = []
for index in range(len(array.shape)):
foundInMerge = False
for imerge, merger in enumerate(merge):
merger = [axes_ids.index(x) for x in merger]
if index in merger:
foundInMerge = True
if imerge not in merged_axis_done:
merged_axis_done.append(imerge)
setMergedAxis = imerge
else:
setMergedAxis = -1
if not foundInMerge:
outData.setAxis(index - sub, axes[index])
else:
if setMergedAxis == -1:
sub += 1
else:
outData.setAxis(index - sub, new_axes[setMergedAxis])
outData = MV2.masked_greater(outData, 9.e19)
outData.id = "pmp"
cdms2.setAutoBounds(ab)
return outData
53 changes: 53 additions & 0 deletions tests/test_pmp_merge_dims_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pcmdi_metrics
import json
import unittest
import numpy


class JSONTest(unittest.TestCase):
def testMerge(self):
data = {}
for i in range(8):
data[i] = {}
for j in range(7):
data[i][j] = {}
for k in range(6):
data[i][j][k] = {}
for l in range(5):
data[i][j][k][l] = {}
for m in range(4):
data[i][j][k][l][m] = i + j/10. + k/100. + l/1000. + m/10000.

out = {"RESULTS": data,
"json_structure": ["i", "j", "k", "l", "m"],
"json_version": 3.0
}

with open("data.json", "w") as f:
json.dump(out, f)


J = pcmdi_metrics.io.base.JSONs(["data.json"], oneVariablePerFile=False)


regular = J()
self.assertEqual(regular.shape, (8,7,6,5,4))
merged_one = J(merge=[["k", "l"]])
self.assertEqual(merged_one.shape, (8,7,30,4))

merged_two = J(merge=[["j", "m"], ["k", "l"]])
self.assertEqual(merged_two.shape, (8,28,30))
merged_three = J(merge=[["k", "l"], ["j","m"]])
self.assertEqual(merged_three.shape, (8,28,30))
merged_four = J(merge=[["j","m"],["k","i"]])
self.assertEqual(merged_four.shape, (48, 28, 5))

merged_five = J(merge=[["i","k","l"], ["j", "m"]])
self.assertEqual(merged_five.shape, (240, 28))
regular = regular.filled()
merged_six = J(merge=["l","m"]).filled()
self.assertEqual(merged_six.shape, (8,7,6,20))
self.assertTrue(numpy.allclose(regular[0,0,0].flat, merged_six[0,0,0].flat))

merged_seven = J(merge=["m","l"]).filled()
self.assertTrue(numpy.allclose(numpy.transpose(regular[0,0,0]).flat, merged_seven[0,0,0].flat))

0 comments on commit 9cf5870

Please sign in to comment.