Skip to content

Commit

Permalink
LIU-388: Code review updates
Browse files Browse the repository at this point in the history
Improve error reporting for failures.
  • Loading branch information
myxie committed Jun 7, 2024
1 parent 32cbacd commit 9818b30
Showing 1 changed file with 154 additions and 61 deletions.
215 changes: 154 additions & 61 deletions daliuge-translator/test/dropmake/test_pg_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA

import sys
import unittest
import json
import pickle
Expand All @@ -28,16 +29,20 @@
from dlg.dropmake.lg import LG
from dlg.dropmake.pgt import PGT, GPGTNoNeedMergeException
from dlg.dropmake.pgtp import MetisPGTP, MySarkarPGTP, MinNumPartsPGTP
from ..dropmake import (SARKAR_PARTITION_RESULTS, SARKAR_PARTITION_RESULTS_GEN_ISLAND,
MINPARTS_RESULTS)

"""
python -m unittest test.dropmake.test_pg_gen
"""
TEST_SSID = 'test_pg_gen'


def get_lg_fname(type, f_name):
def get_lg_fpath(type, f_name):
"""
Get the test data file path based on the logical graph name and type of file we want
:param type: str, type of test data (logical_graph, pickle, pg_spec) we are comparing
:param f_name: name of the original logical graph created in Eagle
:return: str, full path of the file
"""
f_dir = 'logical_graphs'
if type == 'pickle':
f_name = f_name.split('.')[0] + '.pkl'
Expand Down Expand Up @@ -80,23 +85,41 @@ def test_lg_unroll(self):
"test_grpby_gather.graph": 21,
"chiles_simple.graph": 22,
"Plasma_test.graph": 6,
"SharedMemoryTest_update.graph": 8,
}

for lgn, num_keys in lg_names.items():
fp = get_lg_fname("logical_graphs", lgn)
for lg_name, num_keys in lg_names.items():
fp = get_lg_fpath("logical_graphs", lg_name)
lg = LG(fp, ssid=TEST_SSID)
self.assertEqual(len(lg._done_dict.keys()), num_keys)
self.assertEqual(num_keys,
len(lg._done_dict.keys()),
f"Incorrect number of elements when constructing LG "
f"object using: {lg_name}")

drop_list = lg.unroll_to_tpl()
with open(get_lg_fname('pickle', lgn), 'rb') as pk_fp:
with open(get_lg_fpath('pickle', lg_name), 'rb') as pk_fp:
test_unroll = pickle.load(pk_fp)

self.assertEqual(test_unroll, drop_list)
if lgn == "SharedMemoryTest_update.graph":
for drop in drop_list:
if drop["categoryType"] in [CategoryType.DATA, "data"]:
self.assertEqual("SharedMemory", drop["category"])
self.assertListEqual(test_unroll,
drop_list,
f"unroll_to_tpl failed for: {lg_name}")

def test_lg_unroll_sharedmemory(self):
"""
Confirm the SharedMemory data type is correctly unrolled.
"""
lg_name = "SharedMemoryTest_update.graph"
num_keys = 8
fp = get_lg_fpath("logical_graphs", lg_name)
lg = LG(fp, ssid=TEST_SSID)
self.assertEqual(num_keys,
len(lg._done_dict.keys()),
f"Incorrect number of elements when constructing LG "
f"object using: {lg_name}")

drop_list = lg.unroll_to_tpl()
for drop in drop_list:
if drop["categoryType"] in [CategoryType.DATA, "data"]:
self.assertEqual("SharedMemory", drop["category"])


class TestPGGen(unittest.TestCase):
Expand Down Expand Up @@ -126,15 +149,17 @@ def test_pgt_to_json(self):
# "simpleMKN_update.graph", # Currently broken
]

for lgn in lgnames:
fp = get_lg_fname('logical_graphs', lgn)
for lg_name in lgnames:
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp, ssid=TEST_SSID)
drop_list = lg.unroll_to_tpl()
pgt = PGT(drop_list)
pg_json = pgt.to_gojs_json(visual=True, string_rep=False)
with open(get_lg_fname('pg_spec', lgn), 'r') as json_fp:
with open(get_lg_fpath('pg_spec', lg_name), 'r') as json_fp:
test_json = json.load(json_fp)
self.assertEqual(test_json, pg_json)
self.assertDictEqual(test_json, pg_json,
f"pgt.to_gojs_json failed for: {lg_name}")


class TestPGPartition(unittest.TestCase):
"""
Expand All @@ -150,6 +175,65 @@ class TestPGPartition(unittest.TestCase):
failures are caused by known-breaking changes, as opposed to legitimate bugs!).
"""

SARKAR_PARTITION_RESULTS = {
"testLoop.graph": {
'algo': 'Edge Zero', 'min_exec_time': 30, 'total_data_movement': 50,
'exec_time': 80, 'num_parts': 0
},
"cont_img_mvp.graph": {
'algo': 'Edge Zero', 'min_exec_time': 144, 'total_data_movement': 932,
'exec_time': 444, 'num_parts': 0
},
"test_grpby_gather.graph": {
'algo': 'Edge Zero', 'min_exec_time': 12, 'total_data_movement': 70,
'exec_time': 47, 'num_parts': 0
},
"chiles_simple.graph": {
'algo': 'Edge Zero', 'min_exec_time': 45, 'total_data_movement': 1080,
'exec_time': 285, 'num_parts': 0
}
}

SARKAR_PARTITION_RESULTS_GEN_ISLAND = {
"testLoop.graph": {
'algo': 'Edge Zero',
'min_exec_time': 30,
'total_data_movement': 0,
'exec_time': 30, 'num_parts': 1
},
"cont_img_mvp.graph": {
'algo': 'Edge Zero', 'min_exec_time': 144, 'total_data_movement': 135,
'exec_time': 179, 'num_islands': 2, 'num_parts': 4
},
"test_grpby_gather.graph": {
'algo': 'Edge Zero', 'min_exec_time': 12,
'total_data_movement': 0, 'exec_time': 12, 'num_parts': 1
},
"chiles_simple.graph": {
'algo': 'Edge Zero', 'min_exec_time': 45, 'total_data_movement': 0,
'exec_time': 45, 'num_parts': 1
}
}

MINPARTS_RESULTS = {
"testLoop.graph": {
'algo': 'Lookahead', 'min_exec_time': 30, 'total_data_movement': 50,
'exec_time': 80, 'num_parts': 0
},
"cont_img_mvp.graph": {
'algo': 'Lookahead', 'min_exec_time': 144,
'total_data_movement': 932, 'exec_time': 444, 'num_parts': 0
},
"test_grpby_gather.graph": {
'algo': 'Lookahead', 'min_exec_time': 12,
'total_data_movement': 70, 'exec_time': 47, 'num_parts': 0
},
"chiles_simple.graph": {
'algo': 'Lookahead', 'min_exec_time': 45, 'total_data_movement': 1080,
'exec_time': 285, 'num_parts': 0
}
}

def setUp(self):
self.partitionMethodLGs = [
"testLoop.graph",
Expand All @@ -168,8 +252,8 @@ def test_metis_pgtp(self):
'total_data_movement': None, 'exec_time': None,
'num_parts': 1}

for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
for lg_names in self.partitionMethodLGs:
fp = get_lg_fpath('logical_graphs', lg_names)
lg = LG(fp)
drop_list = lg.unroll_to_tpl()
pgtp = MetisPGTP(drop_list)
Expand All @@ -191,21 +275,21 @@ def test_metis_pgtp_gen_pg(self):
"test_grpby_gather.graph": 20,
"chiles_simple.graph": 20,
}
for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
for lg_name in self.partitionMethodLGs:
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp)
drop_list = lg.unroll_to_tpl()
pgtp = MetisPGTP(drop_list, 3, merge_parts=True)
result = pgtp.result()
self.assertEqual(None,
result['total_data_movement'])
self.assertIsNone(result['total_data_movement'],
f"Incorrect partitioning for: {lg_name}")
self.assertEqual(3, result['num_parts'])
pgtp.to_gojs_json(visual=False)
pgtp.to_pg_spec(node_list)
result = pgtp.result()
self.assertEqual(total_data_movement_pgspec[lgn],
result['total_data_movement'])

self.assertEqual(total_data_movement_pgspec[lg_name],
result['total_data_movement'],
f"Incorrect partitioning for: {lg_name}")
def test_metis_pgtp_gen_pg_island(self):
"""
Regression testing to confirm that partitioning, then generating a PGT spec works
Expand All @@ -221,28 +305,33 @@ def test_metis_pgtp_gen_pg_island(self):
]
nb_islands = 2
nb_nodes = len(node_list) - nb_islands
for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
for lg_name in self.partitionMethodLGs:
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp)
drop_list = lg.unroll_to_tpl()
pgtp = MetisPGTP(drop_list, nb_nodes, merge_parts=True)
self.assertFalse('num_islands' in pgtp.result())
pgtp.to_gojs_json(visual=False)
pgtp.to_pg_spec(node_list, num_islands=nb_islands)
self.assertTrue('num_islands' in pgtp.result())
self.assertEqual(2, pgtp.result()['num_islands'])
self.assertTrue('num_islands' in pgtp.result(),
f"No islands in PG spec for: {lg_name}")
self.assertEqual(2, pgtp.result()['num_islands'],
f"Incorrect number of islands in PG spec for: {lg_name}")

def test_mysarkar_pgtp(self):
"""
Confirm that basic Sarkar paritioning has not regressed
"""

for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
for lg_name in self.partitionMethodLGs:
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp)
drop_list = lg.unroll_to_tpl()
pgtp = MySarkarPGTP(drop_list)
self.assertEqual(SARKAR_PARTITION_RESULTS[lgn], pgtp.result())
self.assertEqual(
self.SARKAR_PARTITION_RESULTS[lg_name],
pgtp.result(),
f"Partition results do not match test case for: {lg_name}")

def test_mysarkar_pgtp_gen_pg(self):
"""
Expand All @@ -254,16 +343,18 @@ def test_mysarkar_pgtp_gen_pg(self):
result in speed up.
"""
node_list = ["10.128.0.11", "10.128.0.12", "10.128.0.13"]
for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
for lg_name in self.partitionMethodLGs:
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp)
drop_list = lg.unroll_to_tpl()
pgtp = MySarkarPGTP(drop_list, 3, merge_parts=True)
pre_spec_result = pgtp.result()
pgtp.to_gojs_json(visual=False)
pgtp.to_pg_spec(node_list)
# Confirm that partitioning improves the execution speed.
self.assertGreater(pre_spec_result['exec_time'], pgtp.result()['exec_time'])
self.assertGreater(
pre_spec_result['exec_time'], pgtp.result()['exec_time'],
f"Partition of {lg_name} should cause speed up, but this did not occur.")

def test_mysarkar_pgtp_gen_pg_island(self):
"""
Expand All @@ -278,43 +369,51 @@ def test_mysarkar_pgtp_gen_pg_island(self):
"10.128.0.15",
"10.128.0.16",
]
for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
nb_islands = 2
new_num_parts = len(node_list) - nb_islands
for lg_name in self.partitionMethodLGs:
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp, ssid=TEST_SSID)
drop_list = lg.unroll_to_tpl()
pgtp = MySarkarPGTP(drop_list, None, merge_parts=True)
pgtp.to_gojs_json(visual=True, string_rep=False)
nb_islands = 2
new_num_parts = len(node_list) - nb_islands

if lgn != "cont_img_mvp.graph":
if lg_name != "cont_img_mvp.graph":
self.assertRaises(
GPGTNoNeedMergeException,
pgtp.merge_partitions,
new_num_parts=new_num_parts,
form_island=False)
new_num_parts,
False,
f"Exception was not raised for: {lg_name}")
partition_results = pgtp.result()
self.assertEqual(SARKAR_PARTITION_RESULTS_GEN_ISLAND[lgn],
self.assertEqual(self.SARKAR_PARTITION_RESULTS_GEN_ISLAND[lg_name],
partition_results)
else:
pgtp.merge_partitions(len(node_list) - nb_islands, form_island=False)
pgtp.to_pg_spec(node_list, num_islands=nb_islands)
self.assertEqual(SARKAR_PARTITION_RESULTS_GEN_ISLAND[lgn],
pgtp.result())
self.assertEqual(self.SARKAR_PARTITION_RESULTS_GEN_ISLAND[lg_name],
pgtp.result(),
f"Incorrect partition results for: {lg_name}")

def test_minnumparts_pgtp(self):
tgt_deadline = [200, 300, 90, 80, 160]
for i, lgn in enumerate(self.partitionMethodLGs):
fp = get_lg_fname('logical_graphs', lgn)
for i, lg_name in enumerate(self.partitionMethodLGs):
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp)
drop_list = lg.unroll_to_tpl()
pgtp = MinNumPartsPGTP(drop_list, tgt_deadline[i])
self.assertEqual(MINPARTS_RESULTS[lgn], pgtp.result())
self.assertEqual(self.MINPARTS_RESULTS[lg_name],
pgtp.result(),
f"Incorrect partition results for: {lg_name}")


if __name__ == '__main__':
import sys
"""
Used to generate the pickle and logical graph files used for testing.
IMPORTANT: Run this _only_ when the expected output of unroll_to_tpl has been
_knowingly_ changed.
"""
try:
arg = sys.argv[1]
if arg.lower() == "test-gen":
Expand All @@ -331,12 +430,6 @@ def test_minnumparts_pgtp(self):
"regression testing for translator behaviour.")
exit()

"""
Used to generate the pickle and logical graph files used for testing.
IMPORTANT: Run this _only_ when the expected output of unroll_to_tpl has been
_knowingly_ changed.
"""
pickle_dir = "pickle"
physical_graph_spec = "pg_spec"
lgnames = [
Expand All @@ -353,13 +446,13 @@ def test_minnumparts_pgtp(self):
# "simpleMKN_update.graph", # Currently broken
]

for lgn in lgnames:
print('\t', lgn)
fp = get_lg_fname('logical_graphs', lgn)
for lg_name in lgnames:
print('\t', lg_name)
fp = get_lg_fpath('logical_graphs', lg_name)
lg = LG(fp, ssid=TEST_SSID)

lg_unroll = lg.unroll_to_tpl()
fn_pkl = lgn.split('.')[0] + '.pkl'
fn_pkl = lg_name.split('.')[0] + '.pkl'
pkl_path = pkg_resources.resource_filename(
__name__, f"{pickle_dir}/{fn_pkl}"
)
Expand All @@ -368,7 +461,7 @@ def test_minnumparts_pgtp(self):

pgt = PGT(lg_unroll)
pg_json = pgt.to_gojs_json(visual=True, string_rep=False)
fn_json = lgn.split('.')[0] + '.json'
fn_json = lg_name.split('.')[0] + '.json'
pg_path = pkg_resources.resource_filename(
__name__, f"{physical_graph_spec}/{fn_json}"
)
Expand Down

0 comments on commit 9818b30

Please sign in to comment.