diff --git a/package-parser/package_parser/processing/migration/__init__.py b/package-parser/package_parser/processing/migration/__init__.py index 3d1036636..9235c3315 100644 --- a/package-parser/package_parser/processing/migration/__init__.py +++ b/package-parser/package_parser/processing/migration/__init__.py @@ -1 +1,9 @@ from ._differ import AbstractDiffer, SimpleDiffer +from ._mapping import ( + APIMapping, + ManyToManyMapping, + ManyToOneMapping, + Mapping, + OneToManyMapping, + OneToOneMapping, +) diff --git a/package-parser/package_parser/processing/migration/_mapping.py b/package-parser/package_parser/processing/migration/_mapping.py new file mode 100644 index 000000000..d50c41f38 --- /dev/null +++ b/package-parser/package_parser/processing/migration/_mapping.py @@ -0,0 +1,260 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Callable, List, Optional, TypeVar, Union + +from package_parser.processing.api.model import ( + API, + Attribute, + Class, + Function, + Parameter, + Result, +) + +from ._differ import AbstractDiffer + +api_element = Union[Attribute, Class, Function, Parameter, Result] +API_ELEMENTS = TypeVar("API_ELEMENTS", Attribute, Class, Function, Parameter, Result) + + +@dataclass +class Mapping(ABC): + similarity: float + + @abstractmethod + def get_apiv1_elements(self) -> list[api_element]: + pass + + @abstractmethod + def get_apiv2_elements(self) -> list[api_element]: + pass + + def get_similarity(self) -> float: + return self.similarity + + +@dataclass +class OneToOneMapping(Mapping): + apiv1_element: api_element + apiv2_element: api_element + + def get_apiv1_elements(self) -> list[api_element]: + return [self.apiv1_element] + + def get_apiv2_elements(self) -> list[api_element]: + return [self.apiv2_element] + + +@dataclass +class OneToManyMapping(Mapping): + apiv1_element: api_element + apiv2_elements: list[api_element] + + def get_apiv1_elements(self) -> list[api_element]: + return [self.apiv1_element] + + def get_apiv2_elements(self) -> list[api_element]: + return self.apiv2_elements + + +@dataclass +class ManyToOneMapping(Mapping): + apiv1_elements: list[api_element] + apiv2_element: api_element + + def get_apiv1_elements(self) -> list[api_element]: + return self.apiv1_elements + + def get_apiv2_elements(self) -> list[api_element]: + return [self.apiv2_element] + + +@dataclass +class ManyToManyMapping(Mapping): + apiv1_elements: list[api_element] + apiv2_elements: list[api_element] + + def get_apiv1_elements(self) -> list[api_element]: + return self.apiv1_elements + + def get_apiv2_elements(self) -> list[api_element]: + return self.apiv2_elements + + +def merge_mappings(mapping_a: Mapping, mapping_b: Mapping) -> Mapping: + similarity = (mapping_a.similarity + mapping_b.similarity) / 2 + codomain: list[api_element] = list( + set(mapping_a.get_apiv2_elements()) | set(mapping_b.get_apiv2_elements()) + ) + domain: list[api_element] = list( + set(mapping_a.get_apiv1_elements()) | set(mapping_b.get_apiv1_elements()) + ) + if len(domain) == 1 and len(codomain) == 1: + return OneToOneMapping(similarity, domain[0], codomain[0]) + if len(domain) == 1: + return OneToManyMapping(similarity, domain[0], codomain) + if len(codomain) == 1: + return ManyToOneMapping(similarity, domain, codomain[0]) + return ManyToManyMapping(similarity, domain, codomain) + + +class APIMapping: + threshold_of_similarity_between_mappings: float + threshold_of_similarity_for_creation_of_mappings: float + apiv1: API + apiv2: API + differ: AbstractDiffer + + def __init__( + self, + apiv1: API, + apiv2: API, + differ: AbstractDiffer, + threshold_of_similarity_for_creation_of_mappings=0.5, + threshold_of_similarity_between_mappings=0.05, + ): + self.apiv1 = apiv1 + self.apiv2 = apiv2 + self.differ = differ + self.threshold_of_similarity_for_creation_of_mappings = ( + threshold_of_similarity_for_creation_of_mappings + ) + self.threshold_of_similarity_between_mappings = ( + threshold_of_similarity_between_mappings + ) + + def _get_mappings_for_api_elements( + self, + api_elementv1_list: List[API_ELEMENTS], + api_elementv2_list: List[API_ELEMENTS], + compute_similarity: Callable[[API_ELEMENTS, API_ELEMENTS], float], + ) -> list[Mapping]: + element_mappings: list[Mapping] = [] + for api_elementv1 in api_elementv1_list: + mapping_for_class_1: list[Mapping] = [] + for api_elementv2 in api_elementv2_list: + similarity = compute_similarity(api_elementv1, api_elementv2) + if similarity >= self.threshold_of_similarity_for_creation_of_mappings: + mapping_for_class_1.append( + OneToOneMapping(similarity, api_elementv1, api_elementv2) + ) + mapping_for_class_1.sort(key=Mapping.get_similarity, reverse=True) + new_mapping = self._merge_similar_mappings(mapping_for_class_1) + if new_mapping is not None: + self._merge_mappings_with_same_elements(new_mapping, element_mappings) + return element_mappings + + def map_api(self) -> List[Mapping]: + mappings: List[Mapping] = [] + mappings.extend( + self._get_mappings_for_api_elements( + list(self.apiv1.classes.values()), + list(self.apiv2.classes.values()), + self.differ.compute_class_similarity, + ) + ) + mappings.extend( + self._get_mappings_for_api_elements( + list(self.apiv1.functions.values()), + list(self.apiv2.functions.values()), + self.differ.compute_function_similarity, + ) + ) + mappings.extend( + self._get_mappings_for_api_elements( + list(self.apiv1.parameters().values()), + list(self.apiv2.parameters().values()), + self.differ.compute_parameter_similarity, + ) + ) + + mappings.extend( + self._get_mappings_for_api_elements( + [ + attribute + for class_ in self.apiv1.classes.values() + for attribute in class_.instance_attributes + ], + [ + attribute + for class_ in self.apiv2.classes.values() + for attribute in class_.instance_attributes + ], + self.differ.compute_attribute_similarity, + ) + ) + + mappings.extend( + self._get_mappings_for_api_elements( + [ + result + for function in self.apiv1.functions.values() + for result in function.results + ], + [ + result + for function in self.apiv2.functions.values() + for result in function.results + ], + self.differ.compute_result_similarity, + ) + ) + + mappings.sort(key=Mapping.get_similarity, reverse=True) + return mappings + + def _merge_similar_mappings(self, mappings: List[Mapping]) -> Optional[Mapping]: + """ + Given a list of OneToOne(Many)Mappings which apiv1 element is the same, this method returns the best mapping + from this apiv1 element to apiv2 elements by merging the first and second elements recursively, + if the difference in similarity is smaller than THRESHOLD_OF_SIMILARITY_BETWEEN_MAPPINGS. + + :param mappings: mappings sorted by decreasing similarity, which apiv1 element is the same + :return: the first element of the sorted list that could be a result of merged similar mappings + """ + if len(mappings) == 0: + return None + if len(mappings) == 1: + return mappings[0] + if ( + mappings[0].similarity - mappings[1].similarity + < self.threshold_of_similarity_between_mappings + ): + mappings[0] = merge_mappings(mappings[0], mappings[1]) + mappings.pop(1) + return self._merge_similar_mappings(mappings) + return mappings[0] + + def _merge_mappings_with_same_elements( + self, mapping_to_be_appended: Mapping, mappings: list[Mapping] + ): + """ + This method prevents that an element in a mapping appears multiple times in a list of mappings + by merging the affected mappings and include the result in the list. If there is no such element, + the mapping will be included without any merge. + + :param mapping_to_be_appended: the mapping that should be included in mappings + :param mappings: the list, in which mapping_to_be_appended should be appended + """ + duplicated: list[Mapping] = [] + for mapping in mappings: + duplicated_element = False + for element in mapping.get_apiv2_elements(): + for element_2 in mapping_to_be_appended.get_apiv2_elements(): + if element == element_2: + duplicated_element = True + break + if duplicated_element: + duplicated.append(mapping) + + if len(duplicated) == 0: + mappings.append(mapping_to_be_appended) + return + + for conflicted_mapping in duplicated: + mapping_to_be_appended = merge_mappings( + mapping_to_be_appended, conflicted_mapping + ) + mappings.remove(conflicted_mapping) + + mappings.append(mapping_to_be_appended) diff --git a/package-parser/tests/processing/migration/test_mapping.py b/package-parser/tests/processing/migration/test_mapping.py new file mode 100644 index 000000000..d8f671a42 --- /dev/null +++ b/package-parser/tests/processing/migration/test_mapping.py @@ -0,0 +1,180 @@ +from inspect import cleandoc + +import pytest +from package_parser.processing.api.model import API, Class, ClassDocumentation +from package_parser.processing.migration import ( + AbstractDiffer, + APIMapping, + ManyToManyMapping, + ManyToOneMapping, + OneToManyMapping, + OneToOneMapping, +) +from test_differ import differ_list + + +@pytest.mark.parametrize( + "differ", + differ_list, +) +def test_one_to_one_mapping(differ: AbstractDiffer): + apiv1 = API("test", "test", "1.0") + apiv2 = API("test", "test", "2.0") + class_1 = Class( + "test/test.Test", + "Test", + [], + [], + True, + [], + ClassDocumentation("This is a test", "This is a test"), + "", + [], + ) + apiv1.add_class(class_1) + apiv2.add_class(class_1) + mappings = APIMapping(apiv1, apiv2, differ).map_api() + + assert len(mappings) == 1 + assert isinstance(mappings[0], OneToOneMapping) + assert mappings[0].get_apiv1_elements() == mappings[0].get_apiv2_elements() + assert mappings[0].get_apiv1_elements() == [class_1] + + +@pytest.mark.parametrize( + "differ", + differ_list, +) +def test_one_to_many_and_many_to_one_mappings(differ: AbstractDiffer): + apiv1, apiv2, class_1, class_2, class_3 = create_apis() + + mappings = APIMapping(apiv1, apiv2, differ).map_api() + assert len(mappings) == 1 + assert isinstance(mappings[0], OneToManyMapping) + assert mappings[0].get_apiv1_elements()[0] == class_1 + assert len(mappings[0].get_apiv2_elements()) == 2 + assert set(mappings[0].get_apiv2_elements()) == {class_2, class_3} + + apiv1, apiv2 = apiv2, apiv1 + mappings = APIMapping(apiv1, apiv2, differ).map_api() + assert len(mappings) == 1 + assert isinstance(mappings[0], ManyToOneMapping) + assert len(mappings[0].get_apiv1_elements()) == 2 + assert set(mappings[0].get_apiv1_elements()) == {class_2, class_3} + assert mappings[0].get_apiv2_elements()[0] == class_1 + + +@pytest.mark.parametrize( + "differ", + differ_list, +) +def test_many_to_many_mapping(differ: AbstractDiffer): + apiv1, apiv2, class_1, class_2, class_3 = create_apis() + class_4 = Class( + "test/test.TestC", + "TestC", + [], + [], + True, + [], + ClassDocumentation("This is a test", "This is a test"), + "", + [], + ) + apiv1.add_class(class_4) + mappings = APIMapping(apiv1, apiv2, differ).map_api() + assert len(mappings) == 1 + assert isinstance(mappings[0], ManyToManyMapping) + assert len(mappings[0].get_apiv1_elements()) == 2 + assert len(mappings[0].get_apiv2_elements()) == 2 + assert set(mappings[0].get_apiv1_elements()) == {class_1, class_4} + assert set(mappings[0].get_apiv2_elements()) == {class_2, class_3} + + +@pytest.mark.parametrize( + "differ", + differ_list, +) +def test_too_different_mapping(differ: AbstractDiffer): + apiv1 = API("test", "test", "1.0") + class_1 = Class( + "test/test.Test", + "Test", + [], + [], + True, + [], + ClassDocumentation("This is a test", "This is a test"), + "", + [], + ) + apiv1.add_class(class_1) + apiv2 = API("test", "test", "2.0") + class_2 = Class( + "test/test.NotSimilarClass", + "NotSimilarClass", + [], + [], + True, + [], + ClassDocumentation( + "not similar to the other class", "not similar to the other class" + ), + cleandoc( + """ + class NotSimilar: + pass + """ + ), + [], + ) + apiv2.add_class(class_2) + api_mapping = APIMapping(apiv1, apiv2, differ) + mappings = api_mapping.map_api() + assert ( + differ.compute_class_similarity(class_1, class_2) + < api_mapping.threshold_of_similarity_for_creation_of_mappings + ) + assert len(mappings) == 0 + + +def create_apis(): + class_1 = Class( + "test/test.Test", + "Test", + [], + [], + True, + [], + ClassDocumentation("This is a test", "This is a test"), + "", + [], + ) + apiv1 = API("test", "test", "1.0") + apiv1.add_class(class_1) + class_2 = Class( + "test/test.TestA", + "TestA", + [], + [], + True, + [], + ClassDocumentation("This is a test", "This is a test"), + "", + [], + ) + class_3 = Class( + "test/test.TestB", + "TestB", + [], + [], + True, + [], + ClassDocumentation("This is a test", "This is a test"), + "", + [], + ) + apiv2 = API("test", "test", "2.0") + apiv2.add_class(class_2) + apiv2.add_class(class_3) + return apiv1, apiv2, class_1, class_2, class_3