From d08c26777e6b7fd89d9f74c80c4f104ab9dc2288 Mon Sep 17 00:00:00 2001 From: swong3 Date: Thu, 11 Sep 2025 20:22:08 +0000 Subject: [PATCH 1/5] WIP for re-register --- python/gigl/distributed/dist_partitioner.py | 24 +++ .../distributed_partitioner_test.py | 149 ++++++++++++++++++ 2 files changed, 173 insertions(+) diff --git a/python/gigl/distributed/dist_partitioner.py b/python/gigl/distributed/dist_partitioner.py index d1eb4b7ac..074cb7be7 100644 --- a/python/gigl/distributed/dist_partitioner.py +++ b/python/gigl/distributed/dist_partitioner.py @@ -361,6 +361,10 @@ def register_node_ids( self._assert_and_get_rpc_setup() + # Check if node data has already been registered + if self._node_ids is not None or len(self._node_types) > 0: + raise ValueError("Node IDs have already been registered. Cannot re-register node data.") + logger.info("Registering Nodes ...") input_node_ids = self._convert_node_entity_to_heterogeneous_format( input_node_entity=node_ids @@ -424,6 +428,10 @@ def register_edge_index( self._assert_and_get_rpc_setup() + # Check if edge data has already been registered + if self._edge_index is not None or len(self._edge_types) > 0: + raise ValueError("Edge indices have already been registered. Cannot re-register edge data.") + logger.info("Registering Edge Indices ...") input_edge_index = self._convert_edge_entity_to_heterogeneous_format( @@ -495,6 +503,10 @@ def register_node_features( self._assert_and_get_rpc_setup() + # Check if node features have already been registered + if self._node_feat is not None or self._node_feat_dim is not None: + raise ValueError("Node features have already been registered. Cannot re-register node feature data.") + logger.info("Registering Node Features ...") input_node_features = self._convert_node_entity_to_heterogeneous_format( @@ -526,6 +538,10 @@ def register_edge_features( self._assert_and_get_rpc_setup() + # Check if edge features have already been registered + if self._edge_feat is not None or self._edge_feat_dim is not None: + raise ValueError("Edge features have already been registered. Cannot re-register edge feature data.") + logger.info("Registering Edge Features ...") input_edge_features = self._convert_edge_entity_to_heterogeneous_format( @@ -561,6 +577,14 @@ def register_labels( self._assert_and_get_rpc_setup() + # Check if labels have already been registered + if is_positive: + if self._positive_label_edge_index is not None: + raise ValueError("Positive labels have already been registered. Cannot re-register positive label data.") + else: + if self._negative_label_edge_index is not None: + raise ValueError("Negative labels have already been registered. Cannot re-register negative label data.") + input_label_edge_index = self._convert_edge_entity_to_heterogeneous_format( input_edge_entity=label_edge_index ) diff --git a/python/tests/unit/distributed/distributed_partitioner_test.py b/python/tests/unit/distributed/distributed_partitioner_test.py index 18358259a..5f1b3548b 100644 --- a/python/tests/unit/distributed/distributed_partitioner_test.py +++ b/python/tests/unit/distributed/distributed_partitioner_test.py @@ -1075,6 +1075,155 @@ def test_partitioning_invalid_node_ids( with self.assertRaises(ValueError): partitioner.partition_node_features(node_pb) + def test_register_node_ids_re_registration_prevention(self) -> None: + """Test that re-registering node IDs raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # First registration should work + node_ids = torch.tensor([0, 1, 2]) + partitioner.register_node_ids(node_ids=node_ids) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Node IDs have already been registered"): + partitioner.register_node_ids(node_ids=node_ids) + + def test_register_edge_index_re_registration_prevention(self) -> None: + """Test that re-registering edge indices raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # First registration should work + edge_index = torch.tensor([[0, 1], [1, 2]]) + partitioner.register_edge_index(edge_index=edge_index) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): + partitioner.register_edge_index(edge_index=edge_index) + + def test_register_node_features_re_registration_prevention(self) -> None: + """Test that re-registering node features raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # First registration should work + node_features = torch.ones(3, 5) + partitioner.register_node_features(node_features=node_features) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Node features have already been registered"): + partitioner.register_node_features(node_features=node_features) + + def test_register_edge_features_re_registration_prevention(self) -> None: + """Test that re-registering edge features raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # First registration should work + edge_features = torch.ones(2, 10) + partitioner.register_edge_features(edge_features=edge_features) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Edge features have already been registered"): + partitioner.register_edge_features(edge_features=edge_features) + + def test_register_labels_re_registration_prevention(self) -> None: + """Test that re-registering labels raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # First registration should work + pos_labels = torch.tensor([[0, 1], [1, 2]]) + partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Positive labels have already been registered"): + partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) + + # Test negative labels separately + partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) + neg_labels = torch.tensor([[0, 1], [1, 2]]) + partitioner2.register_labels(label_edge_index=neg_labels, is_positive=False) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Negative labels have already been registered"): + partitioner2.register_labels(label_edge_index=neg_labels, is_positive=False) + + def test_register_heterogeneous_re_registration_prevention(self) -> None: + """Test re-registration prevention for heterogeneous data.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # Test heterogeneous node IDs + node_ids = { + USER_NODE_TYPE: torch.tensor([0, 1, 2]), + ITEM_NODE_TYPE: torch.tensor([0, 1, 2]) + } + partitioner.register_node_ids(node_ids=node_ids) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Node IDs have already been registered"): + partitioner.register_node_ids(node_ids=node_ids) + + # Test heterogeneous edge indices + partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) + edge_index = { + USER_TO_USER_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]]) + } + partitioner2.register_edge_index(edge_index=edge_index) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): + partitioner2.register_edge_index(edge_index=edge_index) + if __name__ == "__main__": unittest.main() From f70cb256422b1dc8b21e0fb23ff0813cf7b0ddcf Mon Sep 17 00:00:00 2001 From: swong3 Date: Fri, 12 Sep 2025 22:12:31 +0000 Subject: [PATCH 2/5] Added re-registration checks for partitioner and unit tests --- .../distributed_partitioner_test.py | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/python/tests/unit/distributed/distributed_partitioner_test.py b/python/tests/unit/distributed/distributed_partitioner_test.py index 5f1b3548b..57df20ee1 100644 --- a/python/tests/unit/distributed/distributed_partitioner_test.py +++ b/python/tests/unit/distributed/distributed_partitioner_test.py @@ -1075,7 +1075,7 @@ def test_partitioning_invalid_node_ids( with self.assertRaises(ValueError): partitioner.partition_node_features(node_pb) - def test_register_node_ids_re_registration_prevention(self) -> None: + def test_node_ids_re_registration(self) -> None: """Test that re-registering node IDs raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1096,7 +1096,7 @@ def test_register_node_ids_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Node IDs have already been registered"): partitioner.register_node_ids(node_ids=node_ids) - def test_register_edge_index_re_registration_prevention(self) -> None: + def test_edge_index_re_registration(self) -> None: """Test that re-registering edge indices raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1109,6 +1109,9 @@ def test_register_edge_index_re_registration_prevention(self) -> None: partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + # This would be set during registration of node ids + partitioner._is_input_homogeneous = True + # First registration should work edge_index = torch.tensor([[0, 1], [1, 2]]) partitioner.register_edge_index(edge_index=edge_index) @@ -1117,7 +1120,7 @@ def test_register_edge_index_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): partitioner.register_edge_index(edge_index=edge_index) - def test_register_node_features_re_registration_prevention(self) -> None: + def test_node_features_re_registration(self) -> None: """Test that re-registering node features raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1138,7 +1141,7 @@ def test_register_node_features_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Node features have already been registered"): partitioner.register_node_features(node_features=node_features) - def test_register_edge_features_re_registration_prevention(self) -> None: + def test_edge_features_re_registration(self) -> None: """Test that re-registering edge features raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1151,6 +1154,9 @@ def test_register_edge_features_re_registration_prevention(self) -> None: partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + # This would be set during registration of node ids + partitioner._is_input_homogeneous = True + # First registration should work edge_features = torch.ones(2, 10) partitioner.register_edge_features(edge_features=edge_features) @@ -1159,7 +1165,7 @@ def test_register_edge_features_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Edge features have already been registered"): partitioner.register_edge_features(edge_features=edge_features) - def test_register_labels_re_registration_prevention(self) -> None: + def test_labels_re_registration(self) -> None: """Test that re-registering labels raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1172,16 +1178,23 @@ def test_register_labels_re_registration_prevention(self) -> None: partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + # This would be set during registration of node ids + partitioner._is_input_homogeneous = True + # First registration should work pos_labels = torch.tensor([[0, 1], [1, 2]]) partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) - # Second registration should raise an error + # # Second registration should raise an error with self.assertRaisesRegex(ValueError, "Positive labels have already been registered"): partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) - # Test negative labels separately + # Negative labels test partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) + + # This would be set during registration of node ids + partitioner2._is_input_homogeneous = True + neg_labels = torch.tensor([[0, 1], [1, 2]]) partitioner2.register_labels(label_edge_index=neg_labels, is_positive=False) @@ -1189,7 +1202,7 @@ def test_register_labels_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Negative labels have already been registered"): partitioner2.register_labels(label_edge_index=neg_labels, is_positive=False) - def test_register_heterogeneous_re_registration_prevention(self) -> None: + def test_heterogeneous_re_registration(self) -> None: """Test re-registration prevention for heterogeneous data.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1202,7 +1215,7 @@ def test_register_heterogeneous_re_registration_prevention(self) -> None: partitioner = DistPartitioner(should_assign_edges_by_src_node=True) - # Test heterogeneous node IDs + # Heterogeneous node IDs test node_ids = { USER_NODE_TYPE: torch.tensor([0, 1, 2]), ITEM_NODE_TYPE: torch.tensor([0, 1, 2]) @@ -1213,8 +1226,12 @@ def test_register_heterogeneous_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Node IDs have already been registered"): partitioner.register_node_ids(node_ids=node_ids) - # Test heterogeneous edge indices + # Heterogeneous edge indices test partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) + + # This would be set during registration of node ids + partitioner2._is_input_homogeneous = False + edge_index = { USER_TO_USER_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]]) } @@ -1224,6 +1241,5 @@ def test_register_heterogeneous_re_registration_prevention(self) -> None: with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): partitioner2.register_edge_index(edge_index=edge_index) - if __name__ == "__main__": unittest.main() From 2736448a4cbd6a9a145da54cc95cc53ddd5402c8 Mon Sep 17 00:00:00 2001 From: swong3 Date: Sat, 13 Sep 2025 00:45:56 +0000 Subject: [PATCH 3/5] fixes for reregistration --- python/gigl/distributed/dist_partitioner.py | 8 ++-- .../distributed_partitioner_test.py | 39 ++++++++++++------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/python/gigl/distributed/dist_partitioner.py b/python/gigl/distributed/dist_partitioner.py index 074cb7be7..f67e946d7 100644 --- a/python/gigl/distributed/dist_partitioner.py +++ b/python/gigl/distributed/dist_partitioner.py @@ -362,7 +362,7 @@ def register_node_ids( self._assert_and_get_rpc_setup() # Check if node data has already been registered - if self._node_ids is not None or len(self._node_types) > 0: + if self._node_ids is not None: raise ValueError("Node IDs have already been registered. Cannot re-register node data.") logger.info("Registering Nodes ...") @@ -429,7 +429,7 @@ def register_edge_index( self._assert_and_get_rpc_setup() # Check if edge data has already been registered - if self._edge_index is not None or len(self._edge_types) > 0: + if self._edge_index is not None: raise ValueError("Edge indices have already been registered. Cannot re-register edge data.") logger.info("Registering Edge Indices ...") @@ -504,7 +504,7 @@ def register_node_features( self._assert_and_get_rpc_setup() # Check if node features have already been registered - if self._node_feat is not None or self._node_feat_dim is not None: + if self._node_feat is not None: raise ValueError("Node features have already been registered. Cannot re-register node feature data.") logger.info("Registering Node Features ...") @@ -539,7 +539,7 @@ def register_edge_features( self._assert_and_get_rpc_setup() # Check if edge features have already been registered - if self._edge_feat is not None or self._edge_feat_dim is not None: + if self._edge_feat is not None: raise ValueError("Edge features have already been registered. Cannot re-register edge feature data.") logger.info("Registering Edge Features ...") diff --git a/python/tests/unit/distributed/distributed_partitioner_test.py b/python/tests/unit/distributed/distributed_partitioner_test.py index 57df20ee1..bee22e4bd 100644 --- a/python/tests/unit/distributed/distributed_partitioner_test.py +++ b/python/tests/unit/distributed/distributed_partitioner_test.py @@ -1109,8 +1109,8 @@ def test_edge_index_re_registration(self) -> None: partitioner = DistPartitioner(should_assign_edges_by_src_node=True) - # This would be set during registration of node ids - partitioner._is_input_homogeneous = True + # In order to set the _is_input_homogeneous flag to True + partitioner.register_node_ids(torch.tensor([0, 1, 2])) # First registration should work edge_index = torch.tensor([[0, 1], [1, 2]]) @@ -1154,8 +1154,8 @@ def test_edge_features_re_registration(self) -> None: partitioner = DistPartitioner(should_assign_edges_by_src_node=True) - # This would be set during registration of node ids - partitioner._is_input_homogeneous = True + # In order to set the _is_input_homogeneous flag to True + partitioner.register_node_features(torch.ones(3, 5)) # First registration should work edge_features = torch.ones(2, 10) @@ -1165,7 +1165,7 @@ def test_edge_features_re_registration(self) -> None: with self.assertRaisesRegex(ValueError, "Edge features have already been registered"): partitioner.register_edge_features(edge_features=edge_features) - def test_labels_re_registration(self) -> None: + def test_positive_labels_re_registration(self) -> None: """Test that re-registering labels raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1176,10 +1176,11 @@ def test_labels_re_registration(self) -> None: num_rpc_threads=4, ) + # Positive labels test partitioner = DistPartitioner(should_assign_edges_by_src_node=True) - # This would be set during registration of node ids - partitioner._is_input_homogeneous = True + # In order to set the _is_input_homogeneous flag to True + partitioner.register_node_ids(torch.tensor([0, 1, 2])) # First registration should work pos_labels = torch.tensor([[0, 1], [1, 2]]) @@ -1189,18 +1190,29 @@ def test_labels_re_registration(self) -> None: with self.assertRaisesRegex(ValueError, "Positive labels have already been registered"): partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) + def test_negative_labels_re_registration(self) -> None: + """Test that re-registering labels raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + # Negative labels test - partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) - # This would be set during registration of node ids - partitioner2._is_input_homogeneous = True + # In order to set the _is_input_homogeneous flag to True + partitioner.register_node_ids(torch.tensor([0, 1, 2])) neg_labels = torch.tensor([[0, 1], [1, 2]]) - partitioner2.register_labels(label_edge_index=neg_labels, is_positive=False) + partitioner.register_labels(label_edge_index=neg_labels, is_positive=False) # Second registration should raise an error with self.assertRaisesRegex(ValueError, "Negative labels have already been registered"): - partitioner2.register_labels(label_edge_index=neg_labels, is_positive=False) + partitioner.register_labels(label_edge_index=neg_labels, is_positive=False) def test_heterogeneous_re_registration(self) -> None: """Test re-registration prevention for heterogeneous data.""" @@ -1229,9 +1241,6 @@ def test_heterogeneous_re_registration(self) -> None: # Heterogeneous edge indices test partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) - # This would be set during registration of node ids - partitioner2._is_input_homogeneous = False - edge_index = { USER_TO_USER_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]]) } From 7c6d7d5d8aa2ef2caf2f19cb8d392f67a605f191 Mon Sep 17 00:00:00 2001 From: swong3 Date: Mon, 15 Sep 2025 20:04:36 +0000 Subject: [PATCH 4/5] Adding support for node labels registration --- python/gigl/distributed/dist_partitioner.py | 4 +++ .../distributed_partitioner_test.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/python/gigl/distributed/dist_partitioner.py b/python/gigl/distributed/dist_partitioner.py index a13a3c2ce..c60b0cacb 100644 --- a/python/gigl/distributed/dist_partitioner.py +++ b/python/gigl/distributed/dist_partitioner.py @@ -558,6 +558,10 @@ def register_node_labels( self._assert_and_get_rpc_setup() + # Check if node labels have already been registered + if self._node_labels is not None: + raise ValueError("Node labels have already been registered. Cannot re-register node label data.") + logger.info("Registering Node Labels ...") input_node_labels = self._convert_node_entity_to_heterogeneous_format( diff --git a/python/tests/unit/distributed/distributed_partitioner_test.py b/python/tests/unit/distributed/distributed_partitioner_test.py index 3b62dd3f4..05d151ecc 100644 --- a/python/tests/unit/distributed/distributed_partitioner_test.py +++ b/python/tests/unit/distributed/distributed_partitioner_test.py @@ -1219,6 +1219,27 @@ def test_node_features_re_registration(self) -> None: with self.assertRaisesRegex(ValueError, "Node features have already been registered"): partitioner.register_node_features(node_features=node_features) + def test_node_labels_re_registration(self) -> None: + """Test that re-registering node labels raises an error.""" + master_port = glt.utils.get_free_port(self._master_ip_address) + + init_worker_group(world_size=1, rank=0, group_name=get_process_group_name(0)) + init_rpc( + master_addr=self._master_ip_address, + master_port=master_port, + num_rpc_threads=4, + ) + + partitioner = DistPartitioner(should_assign_edges_by_src_node=True) + + # First registration should work + node_labels = torch.tensor([[0, 1], [1, 0], [0, 1]]) + partitioner.register_node_labels(node_labels=node_labels) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Node labels have already been registered"): + partitioner.register_node_labels(node_labels=node_labels) + def test_edge_features_re_registration(self) -> None: """Test that re-registering edge features raises an error.""" master_port = glt.utils.get_free_port(self._master_ip_address) @@ -1328,5 +1349,17 @@ def test_heterogeneous_re_registration(self) -> None: with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): partitioner2.register_edge_index(edge_index=edge_index) + # Heterogeneous node labels test + partitioner3 = DistPartitioner(should_assign_edges_by_src_node=True) + node_labels = { + USER_NODE_TYPE: torch.tensor([[0, 1], [1, 0]]), + ITEM_NODE_TYPE: torch.tensor([[1, 0], [0, 1], [1, 1]]) + } + partitioner3.register_node_labels(node_labels=node_labels) + + # Second registration should raise an error + with self.assertRaisesRegex(ValueError, "Node labels have already been registered"): + partitioner3.register_node_labels(node_labels=node_labels) + if __name__ == "__main__": unittest.main() From 1cd857b98977da6814f108f64b1e5bd6974f8ff6 Mon Sep 17 00:00:00 2001 From: swong3 Date: Mon, 15 Sep 2025 20:39:21 +0000 Subject: [PATCH 5/5] Fixing formatting issues --- python/gigl/distributed/dist_partitioner.py | 28 ++++++++--- .../distributed_partitioner_test.py | 49 +++++++++++++------ 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/python/gigl/distributed/dist_partitioner.py b/python/gigl/distributed/dist_partitioner.py index c60b0cacb..7f86f18df 100644 --- a/python/gigl/distributed/dist_partitioner.py +++ b/python/gigl/distributed/dist_partitioner.py @@ -370,7 +370,9 @@ def register_node_ids( # Check if node data has already been registered if self._node_ids is not None: - raise ValueError("Node IDs have already been registered. Cannot re-register node data.") + raise ValueError( + "Node IDs have already been registered. Cannot re-register node data." + ) logger.info("Registering Nodes ...") input_node_ids = self._convert_node_entity_to_heterogeneous_format( @@ -437,7 +439,9 @@ def register_edge_index( # Check if edge data has already been registered if self._edge_index is not None: - raise ValueError("Edge indices have already been registered. Cannot re-register edge data.") + raise ValueError( + "Edge indices have already been registered. Cannot re-register edge data." + ) logger.info("Registering Edge Indices ...") @@ -517,7 +521,9 @@ def register_node_features( # Check if node features have already been registered if self._node_feat is not None: - raise ValueError("Node features have already been registered. Cannot re-register node feature data.") + raise ValueError( + "Node features have already been registered. Cannot re-register node feature data." + ) logger.info("Registering Node Features ...") @@ -560,7 +566,9 @@ def register_node_labels( # Check if node labels have already been registered if self._node_labels is not None: - raise ValueError("Node labels have already been registered. Cannot re-register node label data.") + raise ValueError( + "Node labels have already been registered. Cannot re-register node label data." + ) logger.info("Registering Node Labels ...") @@ -596,7 +604,9 @@ def register_edge_features( # Check if edge features have already been registered if self._edge_feat is not None: - raise ValueError("Edge features have already been registered. Cannot re-register edge feature data.") + raise ValueError( + "Edge features have already been registered. Cannot re-register edge feature data." + ) logger.info("Registering Edge Features ...") @@ -636,10 +646,14 @@ def register_labels( # Check if labels have already been registered if is_positive: if self._positive_label_edge_index is not None: - raise ValueError("Positive labels have already been registered. Cannot re-register positive label data.") + raise ValueError( + "Positive labels have already been registered. Cannot re-register positive label data." + ) else: if self._negative_label_edge_index is not None: - raise ValueError("Negative labels have already been registered. Cannot re-register negative label data.") + raise ValueError( + "Negative labels have already been registered. Cannot re-register negative label data." + ) input_label_edge_index = self._convert_edge_entity_to_heterogeneous_format( input_edge_entity=label_edge_index diff --git a/python/tests/unit/distributed/distributed_partitioner_test.py b/python/tests/unit/distributed/distributed_partitioner_test.py index 05d151ecc..62ce3074a 100644 --- a/python/tests/unit/distributed/distributed_partitioner_test.py +++ b/python/tests/unit/distributed/distributed_partitioner_test.py @@ -1171,7 +1171,9 @@ def test_node_ids_re_registration(self) -> None: partitioner.register_node_ids(node_ids=node_ids) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Node IDs have already been registered"): + with self.assertRaisesRegex( + ValueError, "Node IDs have already been registered" + ): partitioner.register_node_ids(node_ids=node_ids) def test_edge_index_re_registration(self) -> None: @@ -1195,7 +1197,9 @@ def test_edge_index_re_registration(self) -> None: partitioner.register_edge_index(edge_index=edge_index) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): + with self.assertRaisesRegex( + ValueError, "Edge indices have already been registered" + ): partitioner.register_edge_index(edge_index=edge_index) def test_node_features_re_registration(self) -> None: @@ -1216,7 +1220,9 @@ def test_node_features_re_registration(self) -> None: partitioner.register_node_features(node_features=node_features) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Node features have already been registered"): + with self.assertRaisesRegex( + ValueError, "Node features have already been registered" + ): partitioner.register_node_features(node_features=node_features) def test_node_labels_re_registration(self) -> None: @@ -1237,7 +1243,9 @@ def test_node_labels_re_registration(self) -> None: partitioner.register_node_labels(node_labels=node_labels) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Node labels have already been registered"): + with self.assertRaisesRegex( + ValueError, "Node labels have already been registered" + ): partitioner.register_node_labels(node_labels=node_labels) def test_edge_features_re_registration(self) -> None: @@ -1261,7 +1269,9 @@ def test_edge_features_re_registration(self) -> None: partitioner.register_edge_features(edge_features=edge_features) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Edge features have already been registered"): + with self.assertRaisesRegex( + ValueError, "Edge features have already been registered" + ): partitioner.register_edge_features(edge_features=edge_features) def test_positive_labels_re_registration(self) -> None: @@ -1286,7 +1296,9 @@ def test_positive_labels_re_registration(self) -> None: partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) # # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Positive labels have already been registered"): + with self.assertRaisesRegex( + ValueError, "Positive labels have already been registered" + ): partitioner.register_labels(label_edge_index=pos_labels, is_positive=True) def test_negative_labels_re_registration(self) -> None: @@ -1310,7 +1322,9 @@ def test_negative_labels_re_registration(self) -> None: partitioner.register_labels(label_edge_index=neg_labels, is_positive=False) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Negative labels have already been registered"): + with self.assertRaisesRegex( + ValueError, "Negative labels have already been registered" + ): partitioner.register_labels(label_edge_index=neg_labels, is_positive=False) def test_heterogeneous_re_registration(self) -> None: @@ -1329,37 +1343,42 @@ def test_heterogeneous_re_registration(self) -> None: # Heterogeneous node IDs test node_ids = { USER_NODE_TYPE: torch.tensor([0, 1, 2]), - ITEM_NODE_TYPE: torch.tensor([0, 1, 2]) + ITEM_NODE_TYPE: torch.tensor([0, 1, 2]), } partitioner.register_node_ids(node_ids=node_ids) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Node IDs have already been registered"): + with self.assertRaisesRegex( + ValueError, "Node IDs have already been registered" + ): partitioner.register_node_ids(node_ids=node_ids) # Heterogeneous edge indices test partitioner2 = DistPartitioner(should_assign_edges_by_src_node=True) - edge_index = { - USER_TO_USER_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]]) - } + edge_index = {USER_TO_USER_EDGE_TYPE: torch.tensor([[0, 1], [1, 2]])} partitioner2.register_edge_index(edge_index=edge_index) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Edge indices have already been registered"): + with self.assertRaisesRegex( + ValueError, "Edge indices have already been registered" + ): partitioner2.register_edge_index(edge_index=edge_index) # Heterogeneous node labels test partitioner3 = DistPartitioner(should_assign_edges_by_src_node=True) node_labels = { USER_NODE_TYPE: torch.tensor([[0, 1], [1, 0]]), - ITEM_NODE_TYPE: torch.tensor([[1, 0], [0, 1], [1, 1]]) + ITEM_NODE_TYPE: torch.tensor([[1, 0], [0, 1], [1, 1]]), } partitioner3.register_node_labels(node_labels=node_labels) # Second registration should raise an error - with self.assertRaisesRegex(ValueError, "Node labels have already been registered"): + with self.assertRaisesRegex( + ValueError, "Node labels have already been registered" + ): partitioner3.register_node_labels(node_labels=node_labels) + if __name__ == "__main__": unittest.main()