# Annex A Execution

A notebook implementing the execution rules from KerML Annex A with PyMBE.

In [1]:
import json
import pymbe.api as pm

from pathlib import Path

from typing import Any, Collection, Dict, List, Tuple, Union

from pymbe.model import Model, Element
from pymbe.model_modification import build_from_classifier_pattern, \
                                    new_element_ownership_pattern, \
                                    build_from_binary_relationship_pattern, \
                                    build_unioning_superset_classifier, \
                                    build_from_feature_pattern, \
                                    build_from_binary_assoc_pattern, \
                                    build_from_binary_connector_pattern, \
                                    apply_covered_feature_pattern

from pymbe.query.metamodel_navigator import is_type_undefined_mult, \
                                    is_multiplicity_one, \
                                    is_multiplicity_specific_finite, \
                                    get_finite_multiplicity_types, \
                                    identify_connectors_one_side, \
                                    get_lower_multiplicty, \
                                    get_upper_multiplicty

from uuid import uuid4

## Atom Metadata Load

Bring up the Atom metadata.

In [2]:
filename = "A-2-Atoms"

if not filename.endswith(".json"):
    filename += ".json"

json_file = Path(Path.cwd()) / "annex_a_data" / filename

atoms_data = pm.Model.load_from_post_file(json_file)
atoms_data

  warn(f"Cannot process {expression._metatype} elements yet!")
  warn(f"Cannot process {expression._metatype} elements yet!")


<SysML v2 Model (C:\Users\Bjorn Cole\Documents\GitHub\pyMBE\dev_docs\core_algos\annex_a_data\A-2-Atoms.json)>

## Annex A.3.2 Without Connectors Case

In [3]:
filename = "A-3-2-WithoutConnectors"

if not filename.endswith(".json"):
    filename += ".json"

json_file = Path(Path.cwd()) / "annex_a_data" / filename

without_connectors_data = pm.Model.load_from_post_file(json_file)
without_connectors_data

<SysML v2 Model (C:\Users\Bjorn Cole\Documents\GitHub\pyMBE\dev_docs\core_algos\annex_a_data\A-3-2-WithoutConnectors.json)>

In [4]:
packages = [ele for ele in without_connectors_data.elements.values() if ele._metatype == 'Package']
packages

[WithoutConnectorsModelToBeExecuted «Package»,
 WithoutConnectorsExecution «Package»]

In [5]:
without_connectors_to_execute_classifiers = \
    [ele for ele in packages[0].throughOwningMembership if ele._metatype == 'Classifier']

In [6]:
without_connectors_to_execute_classifiers

[Bicycle «Classifier», Wheel «Classifier», BikeFork «Classifier»]

### Check for Connectors to Features

In [7]:
def is_feature_connected(feature):
    print(f"...Inspecting {feature.declaredName} for connector references.")
    if hasattr(feature, "reverseReferenceSubsetting"):
        print(f"...Found link to connector end {feature.reverseReferenceSubsetting[0]}")
        return True
    else:
        print(f"...Found no reverse edge outgoing to connector end.")
        return False

In [8]:
is_feature_connected(without_connectors_to_execute_classifiers[0].throughFeatureMembership[0])

...Inspecting rollsOn for connector references.
...Found no reverse edge outgoing to connector end.


False

### Identify Important Metatypes

In [9]:
def feature_metas():
    return {'Feature', 'Step'}

def classifier_metas():
    return {'Classifier', 'Behavior', 'Structure'}

def assoc_metas():
    return {'Association'}

def connector_metas():
    return {'Connector', 'Succession'}

### Print atoms as if in KerML Files

In [10]:
def serialize_kerml_atom(atom_to_print):
    
    atom_string = "#atom\n"
    
    if hasattr(atom_to_print, "throughUnioning"):
        atom_string = ""
    
    if atom_to_print._metatype == "Classifier":
        atom_string = atom_string + "classifier "
    elif atom_to_print._metatype == "Behavior":
        atom_string = atom_string + "behavior "
    elif atom_to_print._metatype == "Association":
        atom_string = atom_string + "association "
    elif atom_to_print._metatype == "Succession":
        atom_string = atom_string + "succession "
    
    atom_string = atom_string + atom_to_print.declaredName
    
    if hasattr(atom_to_print, "throughSubclassification"):
        atom_string = atom_string + " specializes " + \
            ", ".join([sub.declaredName for sub in atom_to_print.throughSubclassification])
        
    if hasattr(atom_to_print, "throughUnioning"):
        atom_string = atom_string + " unions " + \
            ", ".join([sub.declaredName for sub in atom_to_print.throughUnioning])
    
    if hasattr(atom_to_print, "throughFeatureMembership"):
        atom_string = atom_string + " {\n"
        for feature in atom_to_print.throughFeatureMembership:
            if feature._metatype == "Feature":
                atom_string = atom_string + "   feature "
            elif feature._metatype == "Step":
                atom_string = atom_string + "   step "
            elif feature._metatype == "Connector":
                atom_string = atom_string + "   connector "
                
            if hasattr(feature, "throughFeatureTyping"):
                atom_string = atom_string + feature.declaredName + " : " + feature.throughFeatureTyping[0].declaredName + ";\n"
            else:
                atom_string = atom_string + feature.declaredName + ";\n"
        atom_string = atom_string + "}\n"
    elif hasattr(atom_to_print, "throughEndFeatureMembership"):
        atom_string = atom_string + " {\n"
        for feature in atom_to_print.throughEndFeatureMembership:
            if feature._metatype == "Feature":
                atom_string = atom_string + "   end feature "
            elif feature._metatype == "Step":
                atom_string = atom_string + "   step "
            elif feature._metatype == "Connector":
                atom_string = atom_string + "   connector "
                
            if hasattr(feature, "throughFeatureTyping"):
                atom_string = atom_string + feature.declaredName + " : " + feature.throughFeatureTyping[0].declaredName + ";\n"
            else:
                atom_string = atom_string + feature.declaredName + ";\n"
        atom_string = atom_string + "}\n"
    else:
        atom_string = atom_string + ";\n"
        
    return atom_string

### Atom Generation Procedure for Steps 1 through 5

In [11]:
def atom_generation_annex_a(input_model, package_to_execute, package_to_populate):
    
    # TODO: Need to make this recursive for deeply nested features
    
    bound_features_to_atom_values_dict = {}
    
    package_to_execute_classifiers = \
        [ele for ele in package_to_execute.throughOwningMembership if ele._metatype in classifier_metas()]
    
    for classifier in package_to_execute_classifiers:
        # Step 1 - create new atom from the classifier
        base_name = classifier.declaredName
        new_classifier = build_from_classifier_pattern(
            owner=package_to_populate,
            name=f"My{base_name}",
            model=input_model,
            specific_fields={},
            metatype=classifier._metatype,
            superclasses=[classifier]
        )

        print(f"Executing step 1. Working from {base_name} to create {new_classifier.declaredName}")

        # Step 2 - find Features with lower multiplicity > 0 that are not connectors

        if hasattr(classifier, "throughFeatureMembership"):
            candidate_features = classifier.throughFeatureMembership
            
            # Step 3 - create Atoms to go with the Features

            for cf in candidate_features:
                # TODO: Want two passes here - do only non-connectors, then a whole separate loop for connectors rather than
                # interleave as we have currently
                if cf._metatype not in connector_metas():
                    lm = get_lower_multiplicty(cf)
                    if lm > -1:
                        # need to test multiplicity
                        print(f"Executing step 2. Identified {cf.declaredName} as a non-connector Feature. Lower multiplicity is {lm}")

                        feature_value_atoms = []
                        
                        used_typ = None

                        for i in range(0, lm):

                            typ = cf.throughFeatureTyping

                            if len(typ) > 0:
                                used_typ = typ[0]
                                new_ft_classifier = build_from_classifier_pattern(
                                    owner=package_to_populate,
                                    name=f"My{used_typ.declaredName}{i + 1}",
                                    model=input_model,
                                    specific_fields={},
                                    metatype=used_typ._metatype,
                                    superclasses=[used_typ]
                                )

                                print(f"Executing step 3a. Creating atom #{i + 1} to be value for {cf.declaredName} and specializing " + 
                                          f"{used_typ.declaredName}")

                                feature_value_atoms.append(new_ft_classifier)
                                
                                if is_feature_connected(cf):
                                    if cf._id in bound_features_to_atom_values_dict:
                                        bound_features_to_atom_values_dict[cf._id].append(new_ft_classifier)
                                    else:
                                        bound_features_to_atom_values_dict.update({cf._id: [new_ft_classifier]})

                        # use the covering pattern on the Feature

                        print(f"...(Atom style)Creating a unioned classifier to line up to feature values for {cf.declaredName} called " +
                              f"Values for {cf.declaredName}")
                        
                        redefined_feature = apply_covered_feature_pattern(
                            one_member_classifiers=feature_value_atoms,
                            feature_to_cover=cf,
                            type_to_apply_pattern_on=new_classifier,
                            model=input_model,
                            new_types_owner=package_to_populate,
                            covering_classifier_prefix="Values for ",
                            covering_classifier_suffix="",
                            redefining_feature_prefix="",
                            redefining_feature_suffix=" (Closed)",
                        )

                        #unioning_class = build_unioning_superset_classifier(
                        #    classes=feature_value_atoms,
                        #    super_name=f"Values for {cf.declaredName}",
                        #    owner=package_to_populate,
                        #    model=input_model,
                        #    added_fields={},
                        #    unioned=True
                        #)

                        #build_from_binary_relationship_pattern(
                        #    source=unioning_class,
                        #    target=used_typ,
                        #    model=input_model,
                        #    metatype='Subclassification',
                        #    owned_by_source=True,
                        #    owns_target=False,
                        #    alternative_owner=None,
                        #    specific_fields={}
                        #)
                        
            for cf in candidate_features:
                # Step 4 - find Connectors with appropriate end multiplicities
                
                if cf._metatype in connector_metas():
                    print(f"Executing step 4. Identified {cf.declaredName} as a Connector")
                    
                    # check the feature ends of the connector
                    
                    connector_ends = cf.throughEndFeatureMembership
                    
                    lm_end1 = get_lower_multiplicty(connector_ends[0])
                    lm_end2 = get_lower_multiplicty(connector_ends[1])
                    
                    if lm_end1 == 1 and lm_end2 == 1:
                        print(f"Executing step 5. Identified {cf} as a Connector with 1-to-1 ends")
                        
                        end1_connected_mult = get_lower_multiplicty(connector_ends[0].throughReferenceSubsetting[0])
                        end2_connected_mult = get_lower_multiplicty(connector_ends[1].throughReferenceSubsetting[0])
                        
                        # check to see if the already built instances have a finite value
                        
                        if end1_connected_mult == -1:
                            if connector_ends[0].throughReferenceSubsetting[0] in bound_features_to_atom_values_dict:
                                end1_connected_mult = len(bound_features_to_atom_values_dict[connector_ends[0].throughReferenceSubsetting[0]])
                                print(f"...Using already created atoms to set number of values for {connector_ends[0].throughReferenceSubsetting[0]}")
                        if end2_connected_mult == -1:
                            if connector_ends[1].throughReferenceSubsetting[0] in bound_features_to_atom_values_dict:
                                end2_connected_mult = len(bound_features_to_atom_values_dict[connector_ends[1].throughReferenceSubsetting[0]])
                                print(f"...Using already created atoms to set number of values for {connector_ends[1].throughReferenceSubsetting[0]}")
                        
                        print(f"...End 1 is linked to Feature {connector_ends[0].throughReferenceSubsetting[0]}" + \
                             f" with multiplicity {end1_connected_mult}")
                        print(f"...End 2 is linked to Feature {connector_ends[1].throughReferenceSubsetting[0]}" + \
                             f" with multiplicity {end2_connected_mult}")
                        
                        number_to_make = max(end1_connected_mult, end2_connected_mult)
                        
                        feature_value_atoms = []
                        
                        used_typ = None
                        
                        for i in range(0, number_to_make):
                            print(f"Executing step 5b. Creating atom #{i + 1} to be value for {cf}")
                            
                            typ = []
                            used_typ = None
                            used_name = cf.declaredName
                            used_metatype = "Association"
                            
                            if hasattr(cf, "throughFeatureTyping"):
                                typ = cf.throughFeatureTyping
                            
                            if len(typ) > 0:
                                used_typ = typ[0]
                                used_name = used_typ.declaredName
                                used_metatype = used_typ._metatype
                                
                            feature_value_atoms.append(new_ft_classifier)

                            # need to do this for Association types and also nested end features

                            ends_to_process = []

                            if hasattr(cf, "throughEndFeatureMembership"):
                                for cf_ele in cf.throughEndFeatureMembership:
                                    ends_to_process.append(cf_ele)
                            if used_typ is not None and hasattr(used_typ, "throughEndFeatureMembership"):
                                for cf_ele in used_typ.throughEndFeatureMembership:
                                    ends_to_process.append(cf_ele)

                            for con_end in ends_to_process:
                                print(f"...Inspecting {con_end} for connected features.")
                                if hasattr(con_end, "throughReferenceSubsetting"):
                                    bound_feature = con_end.throughReferenceSubsetting[0]
                                    bound_feature_type = bound_feature.throughFeatureTyping

                                    print(f"...Found connected feature {bound_feature}.")

                                    if len(bound_feature_type) > 0:
                                        feature_used_type = bound_feature_type[0]
                                        
                                        if bound_feature._id in bound_features_to_atom_values_dict:
                                            if (i + 1) > len(bound_features_to_atom_values_dict[bound_feature._id]):
                                                print(f"Executing step 5b (1-to-1 variant). Creating atom #{i + 1} to " +
                                                      f"be value for {con_end} and also {bound_feature}")
                                                new_ft_classifier = build_from_classifier_pattern(
                                                    owner=package_to_populate,
                                                    name=f"My{feature_used_type.declaredName}{i + 1}",
                                                    model=input_model,
                                                    specific_fields={},
                                                    metatype=feature_used_type._metatype,
                                                    superclasses=[used_typ]
                                                )
                                                bound_features_to_atom_values_dict[bound_feature._id].append(new_ft_classifier)
                                        else:
                                            print(f"Executing step 5b (1-to-1 variant). Creating atom #{i + 1} to " +
                                                f"be value for {con_end} and also {bound_feature}")
                                            new_ft_classifier = build_from_classifier_pattern(
                                                owner=package_to_populate,
                                                name=f"My{feature_used_type.declaredName}{i + 1}",
                                                model=input_model,
                                                specific_fields={},
                                                metatype=feature_used_type._metatype,
                                                superclasses=[used_typ]
                                            )
                                            bound_features_to_atom_values_dict.update({bound_feature._id: [new_ft_classifier]})

                            if len(ends_to_process) == 2:
                                source_end = ends_to_process[0]
                                target_end = ends_to_process[1]

                                source_bound_feature = source_end.throughReferenceSubsetting[0]
                                target_bound_feature = target_end.throughReferenceSubsetting[0]

                                source_atom = bound_features_to_atom_values_dict[source_bound_feature._id][i]
                                target_atom = bound_features_to_atom_values_dict[target_bound_feature._id][i]

                                print(f"...Typing atom association ends from {source_atom} to {target_atom} under " + 
                                     f"{used_name}{i + 1}")

                                new_cn_classifier = build_from_binary_assoc_pattern(
                                    name=f"{used_name}{i + 1}",
                                    source_role_name=connector_ends[0].throughReferenceSubsetting[0].declaredName,
                                    target_role_name=connector_ends[1].throughReferenceSubsetting[0].declaredName,
                                    source_type=source_atom,
                                    target_type=target_atom,
                                    model=input_model,
                                    metatype=used_metatype,
                                    owner=package_to_populate,
                                    specific_fields={}
                                )
                                                
    for bound_feature_id in bound_features_to_atom_values_dict.keys():
        bound_feature = input_model.get_element(element_id=bound_feature_id)
        print(f"...Working to connect to the bound feature {bound_feature} to generated types via covering pattern.")
        if classifier in bound_feature.reverseFeatureMembership:
            redefined_bound_feature = apply_covered_feature_pattern(
                one_member_classifiers=bound_features_to_atom_values_dict[bound_feature_id],
                feature_to_cover=bound_feature,
                type_to_apply_pattern_on=new_classifier,
                model=input_model,
                new_types_owner=package_to_populate,
                covering_classifier_prefix="Values for ",
                covering_classifier_suffix="",
                redefining_feature_prefix="",
                redefining_feature_suffix=" (Closed)",
            )
                                
    #print(f"Bound features dict: \n{bound_features_to_atom_values_dict}")

In [12]:
def atom_generation_annex_a(input_model, package_to_execute, package_to_populate):
    
    # TODO: Need to make this recursive for deeply nested features
    
    bound_features_to_atom_values_dict = {}
    
    package_to_execute_classifiers = \
        [ele for ele in package_to_execute.throughOwningMembership if ele._metatype in classifier_metas()]
    
    for classifier in package_to_execute_classifiers:
        bound_features_to_atom_values_dict_update = \
            atom_generation_annex_a_one_classifier(input_model,
                                                   package_to_execute,
                                                   package_to_populate,
                                                   classifier)
        
        for k, values in bound_features_to_atom_values_dict_update.items():
            if k in bound_features_to_atom_values_dict:
                for v in values:
                    bound_features_to_atom_values_dict_update[k].append(v)
            else:
                bound_features_to_atom_values_dict.update({k: values})
                                                
    #for bound_feature_id in bound_features_to_atom_values_dict.keys():
    #    bound_feature = input_model.get_element(element_id=bound_feature_id)
    #    print(f"...Working to connect to the bound feature {bound_feature} to generated types via covering pattern.")
    #    if classifier in bound_feature.reverseFeatureMembership:
    #        redefined_bound_feature = apply_covered_feature_pattern(
    #            one_member_classifiers=bound_features_to_atom_values_dict[bound_feature_id],
    #            feature_to_cover=bound_feature,
    #            type_to_apply_pattern_on=new_classifier,
    #            model=input_model,
    #            new_types_owner=package_to_populate,
    #            covering_classifier_prefix="Values for ",
    #            covering_classifier_suffix="",
    #            redefining_feature_prefix="",
    #            redefining_feature_suffix=" (Closed)",
    #        )
                                
    #print(f"Bound features dict: \n{bound_features_to_atom_values_dict}")

In [13]:
def inspect_type_for_atom(input_model, package_to_execute, package_to_populate, atom_index, cf, considered_type):
    
    bound_features_to_atom_values_dict_update = {}
    
    type_name = "None"
    if considered_type is not None:
        type_name = considered_type.declaredName
    
    if not hasattr(considered_type, "throughFeatureMembership"):
        print(f"...Atom for {cf.declaredName} has type {type_name} that has no further features to make atoms for.")

        new_ft_classifier = build_from_classifier_pattern(
            owner=package_to_populate,
            name=f"My{considered_type.declaredName}{atom_index + 1}",
            model=input_model,
            specific_fields={},
            metatype=considered_type._metatype,
            superclasses=[considered_type]
        )

        print(f"Executing step 3a. Creating atom #{atom_index + 1} to be value for {cf.declaredName} and specializing " + 
                  f"{type_name}")

        #if is_feature_connected(cf):
        if cf._id in bound_features_to_atom_values_dict_update:
            bound_features_to_atom_values_dict_update[cf._id].append(new_ft_classifier)
        else:
            bound_features_to_atom_values_dict_update.update({cf._id: [new_ft_classifier]})
    else:
        print(f"...Atom for {cf.declaredName} has type {type_name} that needs further elaboration for atoms.")

        bound_features_to_atom_values_dict_update = atom_generation_annex_a_one_classifier(input_model,
                                                                                           package_to_execute,
                                                                                           package_to_populate,
                                                                                           considered_type)
        
    return bound_features_to_atom_values_dict_update

In [14]:
def atom_generation_annex_a_one_classifier(input_model, package_to_execute, package_to_populate, base_classifier):
    
    print(f"Applying atom algorithm to {base_classifier.declaredName}")
    
    bound_features_to_atom_values_dict = {}
    
    classifier = base_classifier
    
    # Step 1 - create new atom from the classifier
    base_name = classifier.declaredName
    new_classifier = build_from_classifier_pattern(
        owner=package_to_populate,
        name=f"My{base_name}",
        model=input_model,
        specific_fields={},
        metatype=classifier._metatype,
        superclasses=[classifier]
    )

    print(f"Executing step 1. Working from {base_name} to create {new_classifier.declaredName}")

    # Step 2 - find Features with lower multiplicity > 0 that are not connectors

    if hasattr(classifier, "throughFeatureMembership"):
        candidate_features = classifier.throughFeatureMembership

        # Step 3 - create Atoms to go with the Features

        for cf in candidate_features:
            # TODO: Want two passes here - do only non-connectors, then a whole separate loop for connectors rather than
            # interleave as we have currently
            if cf._metatype not in connector_metas():
                lm = get_lower_multiplicty(cf)
                if lm > -1:
                    # need to test multiplicity
                    print(f"Executing step 2. Identified {cf.declaredName} as a non-connector Feature. Lower multiplicity is {lm}")

                    feature_value_atoms = []

                    used_typ = None

                    for i in range(0, lm):

                        typ = cf.throughFeatureTyping

                        if len(typ) > 0:
                            used_typ = typ[0]
                            
                            bound_features_to_atom_values_dict_update = inspect_type_for_atom(input_model,
                                                                                              package_to_execute,
                                                                                              package_to_populate,
                                                                                              i,
                                                                                              cf,
                                                                                              used_typ
                                                                                             )
                            

                            for k, values in bound_features_to_atom_values_dict_update.items():
                                if k in bound_features_to_atom_values_dict:
                                    for v in values:
                                        bound_features_to_atom_values_dict[k].append(v)
                                        feature_value_atoms.append(v)
                                else:
                                    bound_features_to_atom_values_dict.update({k: values})
                                    for v in values:
                                        feature_value_atoms.append(v)

                    # use the covering pattern on the Feature
                    

        for cf in candidate_features:
            # Step 4 - find Connectors with appropriate end multiplicities

            if cf._metatype in connector_metas():
                print(f"Executing step 4. Identified {cf.declaredName} as a Connector")

                # check the feature ends of the connector

                connector_ends = cf.throughEndFeatureMembership

                lm_end1 = get_lower_multiplicty(connector_ends[0])
                lm_end2 = get_lower_multiplicty(connector_ends[1])

                if lm_end1 == 1 and lm_end2 == 1:
                    print(f"Executing step 5. Identified {cf} as a Connector with 1-to-1 ends")

                    end1_connected_mult = get_lower_multiplicty(connector_ends[0].throughReferenceSubsetting[0])
                    end2_connected_mult = get_lower_multiplicty(connector_ends[1].throughReferenceSubsetting[0])

                    # check to see if the already built instances have a finite value

                    if end1_connected_mult == -1:
                        if connector_ends[0].throughReferenceSubsetting[0] in bound_features_to_atom_values_dict:
                            end1_connected_mult = len(bound_features_to_atom_values_dict[connector_ends[0].throughReferenceSubsetting[0]])
                            print(f"...Using already created atoms to set number of values for {connector_ends[0].throughReferenceSubsetting[0]}")
                    if end2_connected_mult == -1:
                        if connector_ends[1].throughReferenceSubsetting[0] in bound_features_to_atom_values_dict:
                            end2_connected_mult = len(bound_features_to_atom_values_dict[connector_ends[1].throughReferenceSubsetting[0]])
                            print(f"...Using already created atoms to set number of values for {connector_ends[1].throughReferenceSubsetting[0]}")

                    print(f"...End 1 is linked to Feature {connector_ends[0].throughReferenceSubsetting[0]}" + \
                         f" with multiplicity {end1_connected_mult}")
                    print(f"...End 2 is linked to Feature {connector_ends[1].throughReferenceSubsetting[0]}" + \
                         f" with multiplicity {end2_connected_mult}")

                    number_to_make = max(end1_connected_mult, end2_connected_mult)

                    feature_value_atoms = []

                    used_typ = None

                    for i in range(0, number_to_make):
                        print(f"Executing step 5b. Creating atom #{i + 1} to be value for {cf}")

                        typ = []
                        used_typ = None
                        used_name = cf.declaredName
                        used_metatype = "Association"
                        
                        elaboration_end = False

                        if hasattr(cf, "throughFeatureTyping"):
                            typ = cf.throughFeatureTyping

                        if len(typ) > 0:
                            used_typ = typ[0]
                            used_name = used_typ.declaredName
                            used_metatype = used_typ._metatype

                        #feature_value_atoms.append(new_ft_classifier)

                        # need to do this for Association types and also nested end features

                        ends_to_process = []

                        if hasattr(cf, "throughEndFeatureMembership"):
                            for cf_ele in cf.throughEndFeatureMembership:
                                ends_to_process.append(cf_ele)
                        if used_typ is not None and hasattr(used_typ, "throughEndFeatureMembership"):
                            for cf_ele in used_typ.throughEndFeatureMembership:
                                ends_to_process.append(cf_ele)

                        for con_end in ends_to_process:
                            print(f"...Inspecting {con_end} for connected features.")
                            if hasattr(con_end, "throughReferenceSubsetting"):
                                bound_feature = con_end.throughReferenceSubsetting[0]
                                bound_feature_type = bound_feature.throughFeatureTyping

                                print(f"...Found connected feature {bound_feature}.")

                                if len(bound_feature_type) > 0:
                                    feature_used_type = bound_feature_type[0]

                                    if bound_feature._id in bound_features_to_atom_values_dict:
                                        if (i + 1) > len(bound_features_to_atom_values_dict[bound_feature._id]):
                                            print(f"Executing step 5b (1-to-1 variant). Creating atom #{i + 1} to " +
                                                  f"be value for {con_end} and also {bound_feature}")
                                            
                                            bound_features_to_atom_values_dict_update = inspect_type_for_atom(input_model,
                                                                                              package_to_execute,
                                                                                              package_to_populate,
                                                                                              i,
                                                                                              bound_feature,
                                                                                              feature_used_type
                                                                                             )
                                            
                                            for k, values in bound_features_to_atom_values_dict_update.items():
                                                if k in bound_features_to_atom_values_dict:
                                                    for v in values:
                                                        bound_features_to_atom_values_dict[k].append(v)
                                                else:
                                                    bound_features_to_atom_values_dict.update({k: values})
                                            
                                    else:
                                        print(f"Executing step 5b (1-to-1 variant). Creating atom #{i + 1} to " +
                                            f"be value for {con_end} and also {bound_feature}")
                                        
                                        bound_features_to_atom_values_dict_update = inspect_type_for_atom(input_model,
                                                                                              package_to_execute,
                                                                                              package_to_populate,
                                                                                              i,
                                                                                              bound_feature,
                                                                                              feature_used_type
                                                                                             )
                                            
                                        for k, values in bound_features_to_atom_values_dict_update.items():
                                            if k in bound_features_to_atom_values_dict:
                                                for v in values:
                                                    bound_features_to_atom_values_dict[k].append(v)
                                            else:
                                                bound_features_to_atom_values_dict.update({k: values})
                                        

                        if len(ends_to_process) == 2:
                            source_end = ends_to_process[0]
                            target_end = ends_to_process[1]

                            source_bound_feature = source_end.throughReferenceSubsetting[0]
                            target_bound_feature = target_end.throughReferenceSubsetting[0]

                            source_atom = bound_features_to_atom_values_dict[source_bound_feature._id][i]
                            target_atom = bound_features_to_atom_values_dict[target_bound_feature._id][i]

                            print(f"...Typing atom association ends from {source_atom} to {target_atom} under " + 
                                 f"{used_name}{i + 1}")

                            new_cn_classifier = build_from_binary_assoc_pattern(
                                name=f"{used_name}{i + 1}",
                                source_role_name=connector_ends[0].throughReferenceSubsetting[0].declaredName,
                                target_role_name=connector_ends[1].throughReferenceSubsetting[0].declaredName,
                                source_type=source_atom,
                                target_type=target_atom,
                                model=input_model,
                                metatype=used_metatype,
                                owner=package_to_populate,
                                specific_fields={}
                            )
        
        
    for bound_feature_id in bound_features_to_atom_values_dict.keys():
        bound_feature = input_model.get_element(element_id=bound_feature_id)
        print(f"(Atom style)...Working to connect the feature {bound_feature} to generated types via " + \
              f"covering pattern with values {bound_features_to_atom_values_dict[bound_feature_id]}.")
        if classifier in bound_feature.reverseFeatureMembership:
            redefined_bound_feature = apply_covered_feature_pattern(
                one_member_classifiers=bound_features_to_atom_values_dict[bound_feature_id],
                feature_to_cover=bound_feature,
                type_to_apply_pattern_on=new_classifier,
                model=input_model,
                new_types_owner=package_to_populate,
                covering_classifier_prefix="Values for ",
                covering_classifier_suffix="",
                redefining_feature_prefix="",
                redefining_feature_suffix=" (Closed)",
            )
            
    
    return bound_features_to_atom_values_dict
    
    #print(f"Bound features dict: \n{bound_features_to_atom_values_dict}")

In [15]:
packages[0].throughOwningMembership

[fd3d0478-00fa-4dbb-be30-185bd3fb746a «Documentation»,
 Bicycle «Classifier»,
 Wheel «Classifier»,
 BikeFork «Classifier»]

In [16]:
atom_generation_annex_a(without_connectors_data, packages[0], packages[1])

Applying atom algorithm to Bicycle
Executing step 1. Working from Bicycle to create MyBicycle
Executing step 2. Identified rollsOn as a non-connector Feature. Lower multiplicity is 2
...Atom for rollsOn has type Wheel that has no further features to make atoms for.
Executing step 3a. Creating atom #1 to be value for rollsOn and specializing Wheel
...Atom for rollsOn has type Wheel that has no further features to make atoms for.
Executing step 3a. Creating atom #2 to be value for rollsOn and specializing Wheel
(Atom style)...Working to connect the feature rollsOn: Wheel «Feature» to generated types via covering pattern with values [MyWheel1 «Classifier», MyWheel2 «Classifier»].
Applying atom algorithm to Wheel
Executing step 1. Working from Wheel to create MyWheel
Applying atom algorithm to BikeFork
Executing step 1. Working from BikeFork to create MyBikeFork


In [17]:
for item in packages[1].throughOwningMembership:
    print(serialize_kerml_atom(item))

#atom
classifier MyBicycle specializes Bicycle {
   feature rollsOn (Closed) : Values for rollsOn;
}

#atom
classifier MyWheel1 specializes Wheel, Values for rollsOn;

#atom
classifier MyWheel2 specializes Wheel, Values for rollsOn;

classifier Values for rollsOn unions MyWheel1, MyWheel2;

#atom
classifier MyWheel specializes Wheel;

#atom
classifier MyBikeFork specializes BikeFork;



## Annex A.3.3 One-To-One Connectors Case

In [18]:
filename = "A-3-3-OneToOneConnectors"

if not filename.endswith(".json"):
    filename += ".json"

json_file = Path(Path.cwd()) / "annex_a_data" / filename

one_2_one_connectors_data = pm.Model.load_from_post_file(json_file)
one_2_one_connectors_data

<SysML v2 Model (C:\Users\Bjorn Cole\Documents\GitHub\pyMBE\dev_docs\core_algos\annex_a_data\A-3-3-OneToOneConnectors.json)>

In [19]:
packages = [ele for ele in one_2_one_connectors_data.elements.values() if ele._metatype == 'Package']
packages

[Atoms «Package»,
 WithoutConnectorsModelToBeExecuted «Package»,
 OneToOneConnectorsModelToBeExecuted «Package»,
 OneToOneConnectorsExecution «Package»]

In [20]:
atom_generation_annex_a(one_2_one_connectors_data, packages[2], packages[3])

Applying atom algorithm to Bicycle
Executing step 1. Working from Bicycle to create MyBicycle
Executing step 2. Identified rollsOn as a non-connector Feature. Lower multiplicity is 2
...Atom for rollsOn has type Wheel that has no further features to make atoms for.
Executing step 3a. Creating atom #1 to be value for rollsOn and specializing Wheel
...Atom for rollsOn has type Wheel that has no further features to make atoms for.
Executing step 3a. Creating atom #2 to be value for rollsOn and specializing Wheel
Executing step 4. Identified fixWheel as a Connector
Executing step 5. Identified <Connector([rollsOn: Wheel «Feature»] ←→ [holdsWheel: BikeFork «Feature»])> as a Connector with 1-to-1 ends
...End 1 is linked to Feature rollsOn: Wheel «Feature» with multiplicity 2
...End 2 is linked to Feature holdsWheel: BikeFork «Feature» with multiplicity -1
Executing step 5b. Creating atom #1 to be value for <Connector([rollsOn: Wheel «Feature»] ←→ [holdsWheel: BikeFork «Feature»])>
...Inspect

In [21]:
for item in packages[3].throughOwningMembership:
    print(serialize_kerml_atom(item))

#atom
classifier MyBicycle specializes Bicycle {
   feature rollsOn (Closed) : Values for rollsOn;
   feature holdsWheel (Closed) : Values for holdsWheel;
}

#atom
classifier MyWheel1 specializes Wheel, Values for rollsOn;

#atom
classifier MyWheel2 specializes Wheel, Values for rollsOn;

#atom
classifier MyBikeFork1 specializes BikeFork, Values for holdsWheel;

#atom
association BikeWheelFixed1 {
   end feature rollsOn : MyWheel1;
   end feature holdsWheel : MyBikeFork1;
}

#atom
classifier MyBikeFork2 specializes BikeFork, Values for holdsWheel;

#atom
association BikeWheelFixed2 {
   end feature rollsOn : MyWheel2;
   end feature holdsWheel : MyBikeFork2;
}

classifier Values for rollsOn unions MyWheel1, MyWheel2;

classifier Values for holdsWheel unions MyBikeFork1, MyBikeFork2;



## Annex A.3.6 Timing for Behaviors, Sequences

In [22]:
filename = "A-3-6-Sequences"

if not filename.endswith(".json"):
    filename += ".json"

json_file = Path(Path.cwd()) / "annex_a_data" / filename

sequences_data = pm.Model.load_from_post_file(json_file)
sequences_data

<SysML v2 Model (C:\Users\Bjorn Cole\Documents\GitHub\pyMBE\dev_docs\core_algos\annex_a_data\A-3-6-Sequences.json)>

In [23]:
packages = [ele for ele in sequences_data.elements.values() if ele._metatype == 'Package']
packages

[SequencesModelToBeExecuted «Package», SequencesExecution «Package»]

In [24]:
packages[0].ownedElement

[38b62785-6ce9-40d6-87a8-e0f556581f56 «Documentation»,
 Manufacture «Behavior»,
 Paint «Behavior»,
 Dry «Behavior»,
 Ship «Behavior»]

In [25]:
atom_generation_annex_a(sequences_data, packages[0], packages[1])

Applying atom algorithm to Manufacture
Executing step 1. Working from Manufacture to create MyManufacture
Executing step 2. Identified paint as a non-connector Feature. Lower multiplicity is 1
...Atom for paint has type Paint that has no further features to make atoms for.
Executing step 3a. Creating atom #1 to be value for paint and specializing Paint
Executing step 4. Identified p_before_d as a Connector
Executing step 5. Identified <Succession([paint: Paint «Step»] ←→ [dry: Dry «Step»])> as a Connector with 1-to-1 ends
...End 1 is linked to Feature paint: Paint «Step» with multiplicity 1
...End 2 is linked to Feature dry: Dry «Step» with multiplicity -1
Executing step 5b. Creating atom #1 to be value for <Succession([paint: Paint «Step»] ←→ [dry: Dry «Step»])>
...Inspecting earlierOccurrence: Paint «Feature» for connected features.
...Found connected feature paint: Paint «Step».
...Inspecting laterOccurrence: Dry «Feature» for connected features.
...Found connected feature dry: Dry 

In [26]:
for item in packages[1].throughOwningMembership:
    print(serialize_kerml_atom(item))

#atom
behavior MyManufacture specializes Manufacture {
   feature paint (Closed) : MyPaint1;
   feature dry (Closed) : MyDry1;
   feature ship (Closed) : MyShip1;
}

#atom
behavior MyPaint1 specializes Paint;

#atom
behavior MyDry1 specializes Dry;

#atom
association p_before_d1 {
   end feature paint : MyPaint1;
   end feature dry : MyDry1;
}

#atom
behavior MyShip1 specializes Ship;

#atom
association d_before_s1 {
   end feature dry : MyDry1;
   end feature ship : MyShip1;
}

#atom
behavior MyPaint specializes Paint;

#atom
behavior MyDry specializes Dry;

#atom
behavior MyShip specializes Ship;



In [27]:
packages[1].throughOwningMembership[0].throughFeatureMembership

[paint (Closed): MyPaint1 «Feature»,
 dry (Closed): MyDry1 «Feature»,
 ship (Closed): MyShip1 «Feature»]

## Annex A.3.8 Feature Value Changes

In [28]:
filename = "A-3-8-ChangingFeatureValues"

if not filename.endswith(".json"):
    filename += ".json"

json_file = Path(Path.cwd()) / "annex_a_data" / filename

values_data = pm.Model.load_from_post_file(json_file)
values_data

<SysML v2 Model (C:\Users\Bjorn Cole\Documents\GitHub\pyMBE\dev_docs\core_algos\annex_a_data\A-3-8-ChangingFeatureValues.json)>

In [29]:
packages = [ele for ele in values_data.elements.values() if ele._metatype == 'Package']
packages

[ChangingFeatureValuesModelToBeExecuted «Package»,
 ChangingFeatureValuesExecution «Package»]

In [30]:
#atom_generation_annex_a(values_data, packages[0], packages[1])

In [31]:
packages[0].throughOwningMembership

[230ac55e-dda8-4697-825d-25eebf939e89 «Documentation»,
 Manufacture «Behavior»,
 Product «Structure»,
 Paint «Behavior»,
 Dry «Behavior»,
 Ship «Behavior»]

In [32]:
atom_generation_annex_a_one_classifier(values_data, packages[0], packages[1], packages[0].throughOwningMembership[1])

Applying atom algorithm to Manufacture
Executing step 1. Working from Manufacture to create MyManufacture
Executing step 2. Identified objectToFinish as a non-connector Feature. Lower multiplicity is 1
...Atom for objectToFinish has type Product that needs further elaboration for atoms.
Applying atom algorithm to Product
Executing step 1. Working from Product to create MyProduct
Executing step 2. Identified isPainted as a non-connector Feature. Lower multiplicity is 1
...Atom for isPainted has type Boolean that has no further features to make atoms for.
Executing step 3a. Creating atom #1 to be value for isPainted and specializing Boolean
Executing step 2. Identified isDry as a non-connector Feature. Lower multiplicity is 1
...Atom for isDry has type Boolean that has no further features to make atoms for.
Executing step 3a. Creating atom #1 to be value for isDry and specializing Boolean
Executing step 2. Identified isShipped as a non-connector Feature. Lower multiplicity is 1
...Atom f

KeyError: '0886abcc-c5b1-4400-8cae-2a5e10570923'

In [33]:
for item in packages[1].throughOwningMembership:
    print(serialize_kerml_atom(item))

#atom
behavior MyManufacture specializes Manufacture;

#atom
MyProduct specializes Product {
   feature isPainted (Closed) : MyBoolean1;
   feature isDry (Closed) : MyBoolean1;
   feature isShipped (Closed) : MyBoolean1;
}

#atom
MyBoolean1 specializes Boolean;

#atom
MyBoolean1 specializes Boolean;

#atom
MyBoolean1 specializes Boolean;

#atom
behavior MyPaint specializes Paint {
   feature painting (Closed) : MyFeatureWritePerformance1;
   feature painted (Closed) : MyFeatureWritePerformance1;
}

#atom
MyProduct specializes Product {
   feature isPainted (Closed) : MyBoolean1;
   feature isDry (Closed) : MyBoolean1;
   feature isShipped (Closed) : MyBoolean1;
}

#atom
MyBoolean1 specializes Boolean;

#atom
MyBoolean1 specializes Boolean;

#atom
MyBoolean1 specializes Boolean;

#atom
behavior MyFeatureWritePerformance1 specializes FeatureWritePerformance;

#atom
behavior MyFeatureWritePerformance1 specializes FeatureWritePerformance;

#atom
association p_before_p1 {
   end feature pai