# Test case: ESSnet MNO-MINDS
Contains all graphs, data sets and models for the ESSnet MNO-MINDS test case.

In [None]:
import copy
import metadata_analysis as md
import itertools
import pandas as pd
from IPython.display import display, Markdown
import ipywidgets as widgets

# Variables and granularities
Declare all variables and granularities that exist in the case study in `legend_variables`.  

In [None]:
geo_region = {
    "granularities": {
        0: "Neighbourhood",
        1: "Municipality",
        2: "Cell tower"
    },
    "conversion_edges": [(0, 0)],
    "aggregation_edges": [(0, 1)]
}

legend_variables = {
    "a": {
        "name": "MNOOperator",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "b": {
        "name": "BackgroundCharacteristics",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "c": {
        "name": "VehicleCount",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "d": {
        "name": "Destination",
        "granularities": geo_region["granularities"],
        "conversion_edges": geo_region["conversion_edges"],
        "aggregation_edges": geo_region["aggregation_edges"]
    },
    "e": {
        "name": "SampleInclusion",
        "granularities": {
            0: "NTS sampling design"
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "f": {
        "name": "HasSensor",
        "granularities": {
            0: "Has traffic loop sensor"
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "l": {
        "name": "Location",
        "granularities": geo_region["granularities"],
        "conversion_edges": geo_region["conversion_edges"],
        "aggregation_edges": geo_region["aggregation_edges"]
    },
    "m": {
        "name": "Modality",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "n": {
        "name": "SimCount",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "o": {
        "name": "Origin",
        "granularities": geo_region["granularities"],
        "conversion_edges": geo_region["conversion_edges"],
        "aggregation_edges": geo_region["aggregation_edges"]
    },
    "p": {
        "name": "Persons",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "q": {
        "name": "TripPurpose",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "r": {
        "name": "Route",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "s": {
        "name": "RoadSegment",
        "granularities": {
            0: ""
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 0)]
    },
    "t": {
        "name": "Time",
        "granularities": {
            0: "Minute",
            1: "5 minute interval",
            2: "Day part",
            3: "Hour",
            4: "Day",
            5: "Year"
        },
        "conversion_edges": [(0, 0)],
        "aggregation_edges": [(0, 1), (0, 2), (0, 3), (3, 2), (3, 4), (4, 5)]
    }
}

In [None]:
# Loop over all declared variables and create conversion and aggregation graphs 
# based on the edges specified above.

for var_name, var_details in legend_variables.items():
    # Creating the ConversionGraph and AggregationGraph objects here
    # will add them to the list of instances kept globally in the memory
    # So there is no need to save the graphs here separately.
    cg_tmp = md.ConversionGraph(variable_name=var_name,  # MNO Operator
                                granularities=var_details["granularities"],
                                conversion_edges=var_details["conversion_edges"])
    ag_tmp = md.AggregationGraph(variable_name=var_name,  # MNO Operator
                                 granularities=var_details["granularities"],
                                 aggregation_edges=var_details["aggregation_edges"])

# Sets of included units
These are required to define the data sets. The sets of included units are defined as follows:
- I: all people in the population
- II: people included in the sample used for the survey
- III: people that have a single provider
- XI: all road segments in a country
- XII: all road segments where a raffic loop sensor is located

For the scenario, we assume that all data sets are available for the same country and the same year.

In [None]:
# Define the sets of included units:
soiu1 = md.SetOfIncludedUnits(name = "I", unit_type_var = md.Variable("p", 0))  # unit type: people

soiu2 = md.SetOfIncludedUnits(name = "II", unit_type_var = md.Variable("p", 0),   # unit type: people
                              specifying_variables = [md.VariableSpec("e", 0, {1})])  # only people that responded to the survey

soiu3 = md.SetOfIncludedUnits(name = "III", unit_type_var = md.Variable("p", 0),   # unit type: people
                              specifying_variables = [md.VariableSpec("a", 0, {0})])  # a single provider

soiu11 = md.SetOfIncludedUnits(name="XI", unit_type_var=md.Variable("s", 0))  # all road segments

soiu12 = md.SetOfIncludedUnits(name="XII", unit_type_var=md.Variable("s", 0),
                               specifying_variables=[md.VariableSpec("f", 0, {1}),  # must have traffic loop sensor
                                                     # only tranport modes that can be observed on road segments
                                                     md.VariableSpec("m", 0, {"car", "motorbike"})])

In [None]:
all_soius = [soiu1, soiu2, soiu3, soiu11, soiu12]

# Input data sets

The following data sets are pre-defined and can be selected by the user:
- Travel survey: Background information and transportation mode choice for a sample of the population. (M0|P0, T1)II 
- Administrative: Administrative data on individuals containing origin, destination, age and income. (B0, OD0|P0, T2)III 
- OSM and OTP: Open street map and OpenTripPlanner route information. (I0|OD0, M0)I 
- Route information: Segments used in routes between origin-destination pairs. (I0|I1, OD0)I 
- Traffic loop counts: Counts of travellers on road segment per minute. (C0|I0, T0)IV
- MNO data: Counts of observed sim cards per cell tower region. The user may choose optional variables and the included units.


In [None]:
# Define the data sets:
# Note: it is not necesarry to name the md.Datasets, but it helps in understanding the use case (because these named data are used in the datasets as well as 
# in the model definitions)

# Transportation Survey
data_transport_survey = md.Data(name="NTS survey",
                    left_variables =[md.Variable("b", 0),
                                     md.Variable("m", 0),
                                     md.Variable("q", 0)],
                    right_variables =[md.Variable("p", 0),
                                      md.Variable("r", 0),
                                      md.Variable("t", 1)],
                                      set_of_units=soiu2)   # set of included units II
# Administrative: Population Register
data_pop_register = md.Data(name="Population Register",  # abbreviation: TPR 
                     left_variables =[md.Variable("b", 0),
                                      md.Variable("d", 0),
                                      md.Variable("o", 0)],
                     right_variables =[md.Variable("p", 0),
                                       md.Variable("t", 2)],
                     set_of_units=soiu1)  # all people
# Census (aggregated to OD)
data_census = md.Data(name="Census",  
                     left_variables =[md.Variable("b", 0),
                                      md.Variable("p", 0)],
                     right_variables =[md.Variable("o", 0),
                                       md.Variable("d", 0)],
                     set_of_units=soiu1)  # all people

# Traffic loop counts
data_traffic_loops = md.Data(name = "Traffic Loops",
                             left_variables =[md.Variable("c", 0)],
                             right_variables =[md.Variable("s", 0), 
                                               md.Variable("t", 0), 
                                               md.Variable("m", 0)],
                             set_of_units=soiu12)  # road segments with traffic loops for cars
                              
# Route information
data_routes = md.Data(left_variables=[md.Variable("s", 0)],
                      right_variables=[md.Variable("o", 0),
                                       md.Variable("d", 0),
                                       md.Variable("r", 0)],
                      set_of_units=soiu1,  # set_of_units I
                      name="Route data")

In [None]:
# data sets for user choice of start data set
start_set_potential = [data_transport_survey, data_pop_register, data_census, data_traffic_loops, data_routes]

### MNO data
Many of MNO data can exist. Aspects such as coverage and included variables influence the output. Please specify below the aspects you wish to analyse.

In [None]:
# Dictionary to store dropdowns (key: name, value: widget)
dropdowns = {}

# Define dropdown options
dropdowns["mno_variant_provider"] = md.create_dropdown("Select available provider:",
                                                    ["Single provider", "All providers"])

dropdowns["mno_variant_home_location"] = md.create_dropdown("Is home location available?",
                                                         ["Home location available", "No home location"])

In [None]:
def create_mno_data(dropdowns):
    # compose MNO data based on user input from dropdowns
    mno_left_variables = [md.Variable("n", 0)]
    mno_right_variables = [md.Variable("l", 2),
                        md.Variable("t", 3)]

    # Based on user input in the dropdowns, adjust 
    match dropdowns["mno_variant_provider"].value:
        case "Single provider":
            mno_set_of_units = soiu3
        case "All providers":
            mno_set_of_units = soiu1
            mno_right_variables.append(md.Variable("a", 0))  # The providers are known in the combined MNO data set
          
    match dropdowns["mno_variant_home_location"].value:
        case "Home location available":
            mno_right_variables.append(md.Variable("o", 2))  # Home location (origin) is available
        case "No home location":
            pass  # no need to add a variable

    return md.Data(left_variables=mno_left_variables,
                   right_variables=mno_right_variables,
                   set_of_units=mno_set_of_units,
                   name="MNO data")

# Goal definition
This is the target output, defined in terms of a dataset.

In [None]:
goal_mno_1 = md.Data(left_variables =[md.Variable("p", 0)],
                   right_variables =[md.Variable("l", 1),
                                  md.Variable("t", 2)],
                                  set_of_units=soiu1,  # set_of_units I
                                  name = "Commuters location all providers per day-part")

goal_mno_2 = md.Data(left_variables =[md.Variable("p", 0)],
                     right_variables =[md.Variable("o", 1),
                                  md.Variable("l", 1),
                                  md.Variable("t", 2)],
                                  set_of_units=soiu3,  # set_of_units III
                                  name = "Commuters origin-location single provider per day-part")

goal_mno_3 = md.Data(left_variables=[md.Variable("p", 0)],
                     right_variables=[md.Variable("o", 1),
                                      md.Variable("l", 1),
                                      md.Variable("t", 3)],
                     set_of_units=soiu3,  # set_of_units III
                     name="Commuters origin-location single provider per hour")

In [None]:
goal_options = [goal_mno_1, goal_mno_2, goal_mno_3]

# Models
The following models are available:
- Modality choice model
- Shortest path model
- Calibration model (of people to observed cars)
- Sim card to person calibration model

In [None]:
# Modality choice model

class ModelModalityChoice(md.Model):
    def __init__(self):
        self.name = "Modality Choice model"
        self.input_data = [md.Data(left_variables=[md.Variable("b", 0),  # source 0
                                                   md.Variable("o", 0),
                                                   md.Variable("d", 0)],
                                   right_variables=[md.Variable("p", 0),
                                                    md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="Y")),
                           md.Data(left_variables=[md.Variable("m", 0)],  # source 1
                                   right_variables=[md.Variable("p", 0),
                                                    md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="X"))]
        self.output_data = md.Data(left_variables=[md.Variable("p", 0)],
                                   right_variables=[md.Variable("o", 0),
                                                    md.Variable("d", 0),
                                                    md.Variable("m", 0),
                                                    md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="Y"))
        self.units_rule = "custom"  # X is a subset of Y
        
            
    def apply(self, potential_input):
        # If each source in the required input (self.input_data) is present in the potential_input, then the model is applicable
        # We "accept" a data set without regarding the set_of_units, only the left and right variables must match exactly, because the models were defined with dummy set_of_units
        
        # Check if both sources are availble (variables only)
        source_match_0 = any([ds.shrink_variables_only(self.input_data[0]) for ds in potential_input])
        source_match_1 = any([ds.shrink_variables_only(self.input_data[1]) for ds in potential_input])
        
        if source_match_0 and source_match_1:
            # Now the variables match, we'll inspect the population (set_of_units)
            # For all sources that match based on variables, we take the set_of_units so we can apply the set_of_units check(s)
            units_matches_0 = [ds.set_of_units for ds in potential_input if ds.shrink_variables_only(self.input_data[0])]
            units_matches_1 = [ds.set_of_units for ds in potential_input if ds.shrink_variables_only(self.input_data[1])]
            
            # Output list: here we will add any valid outcomes based on the inputs
            output_list = []

            # We have one or more matches for both data sources. Now let's check if there is a combination that satisfies the set_of_units condition 
            # For this model: C1 is a subset of C0, then the set_of_units is C0

            for soiu0, soiu1 in itertools.product(units_matches_0, units_matches_1):
                # Check all combinations of potential C0's and C1's
                if soiu1.is_subset(soiu0): # is C1(X) a subset of C0(Y)?
                    # Yes, there is a combination that satisfies the condition
                    # set_of_units X is a subset of set_of_units Y
                    output_data_temp = copy.deepcopy(self.output_data)  # copy the output_data
                    output_data_temp.set_of_units = soiu0  # overwrite the set_of_units (output is Y)
                    output_data_temp.reset_score()
                    output_list.append(output_data_temp)
            if len(output_list)>0:
                return set(output_list)
        
        return False
            

In [None]:
# Shortest path model
class ModelShortestPath(md.Model):
    def __init__(self):
        self.name = "Shortest Path model"
        self.input_data = [md.Data(left_variables =[md.Variable("p", 0)], # source 0
                                   right_variables=[md.Variable("o", 0),
                                                    md.Variable("d", 0),
                                                     md.Variable("t", 2)],  
                                   set_of_units=md.SetOfIncludedUnits(name="X"),
                                   name = "data_in_1"),
                           md.Data(left_variables =[md.Variable("s", 0)],  # source 1
                                   right_variables=[md.Variable("o", 0),
                                                    md.Variable("d", 0),
                                                     md.Variable("r", 0)],
                                    set_of_units=md.SetOfIncludedUnits(name="Y"))]  
        self.output_data = md.Data(left_variables =[md.Variable("p", 0)],
                                    right_variables =[md.Variable("s", 0),
                                                      md.Variable("t", 2)],
                                    set_of_units=md.SetOfIncludedUnits(name="Y"))
        self.units_rule = "custom"
        
            
    def apply(self, potential_input):
        # If each source in the required input (self.input_data) is present in the potential_input, then the model is applicable
        # We "accept a data set" without regarding the set_of_units, only the left and right variables must match exactly
        
        # For this model, the output set_of_units is equal to the set_of_units of source 1 (so the second model in the list: (R0|R1, OD0))
        
        # Check if source 0 is availble (variables only)
        source_match_0 = any([ds.shrink_variables_only(self.input_data[0]) for ds in potential_input])
        source_match_1 = any([ds.shrink_variables_only(self.input_data[1]) for ds in potential_input])
        
        if source_match_0 and source_match_1:
            # Check if source 1 is available 
            # For all sources that match based on variables, we take the set_of_units so we can apply the set_of_units check(s)
            units_matches_1 = [ds.set_of_units for ds in potential_input if ds.shrink_variables_only(self.input_data[1])]

            # Output list: here we will add any valid outcomes based on the inputs
            output_list = []

            # We have one or more matches for data source. Now let's check if there is a combination that satisfies the set_of_units condition 
            # For this model: C1 is a subset of C0, then the set_of_units is C0

            for c1 in units_matches_1:
                # This set_of_units will be the set_of_units of the output source
                output_data_temp = copy.deepcopy(self.output_data)  # copy the output_data
                output_data_temp.set_of_units = c1  # overwrite the set_of_units 
                output_data_temp.reset_score()
                output_list.append(output_data_temp)
                
            if len(output_list)>0:
                
                return set(output_list)
        else:
            return False
            

In [None]:
# Calibration model
class ModelCalibration(md.Model):
    def __init__(self):
        self.name = "Calibration Vehicle to Person"
        self.input_data = [md.Data(left_variables =[md.Variable("c", 0),
                                                md.Variable("p", 0)], # source 0
                                    right_variables =[md.Variable("s", 0),
                                                     md.Variable("t", 2)],
                                set_of_units=md.SetOfIncludedUnits(name="X")),
                               md.Data(left_variables =[md.Variable("p", 0)],  # source 1
                                    right_variables =[md.Variable("s", 0),
                                                     md.Variable("t", 2)],
                                    set_of_units=md.SetOfIncludedUnits(name="Y"))]  
        self.output_data = md.Data(left_variables =[md.Variable("c", 0)],
                                    right_variables =[md.Variable("s", 0),
                                                      md.Variable("t", 2)],
                                    set_of_units=md.SetOfIncludedUnits(name="Y"))
        self.units_rule = "custom"

        # In general, X is expected to be a smaller set than Y. X is used for training. 
        # X does not necesarilly need to be a subset of Y, though that would probably be 
        # better for model performance.
        
        # The variable t_2 can be selected as t_2 == "morning rush hour" and be constant. It is then 
        # put in the set_of_units.
            
    def apply(self, potential_input):
        # If each source in the required input (self.input_data) is present in the potential_input, then the model is applicable
        # We "accept a data set" without regarding the set_of_units, only the left and right variables must match exactly
        
        # For this model, the output set_of_units is equal to the set_of_units of source 1 (so the second model in the list: (R0|R1, OD0))
        
        # Check if source 0 is availble (variables only)
        source_match_0 = any([ds.shrink_variables_only(self.input_data[0]) for ds in potential_input])
        source_match_1 = any([ds.shrink_variables_only(self.input_data[1]) for ds in potential_input])
        
        if source_match_0 and source_match_1:
            # For all sources that match based on variables, we take the set_of_units so we can apply the set_of_units check(s)
            units_matches_1 = [ds.set_of_units for ds in potential_input if ds.shrink_variables_only(self.input_data[1])]

            # Output list: here we will add any legal outcomes based on the inputs
            output_list = []

            # We have one or more matches for data source. Now let's check if there is a combination that satisfies the set_of_units condition 
            # For this model: C1 is a subset of C0, then the set_of_units is C0

            for c1 in units_matches_1:
                # This set_of_units will be the set_of_units of the output source
                output_data_temp = copy.deepcopy(self.output_data)  # copy the output_data
                output_data_temp.reset_score()
                output_data_temp.set_of_units = c1  # overwrite the set_of_units 
                output_list.append(output_data_temp)
            if len(output_list)>0:
                return set(output_list)
        else:
            return False
            

In [None]:
# Sim to person

class ModelCalibrateSimtoPerson(md.Model):
    def __init__(self):
        self.name = "Calibration Sim to Person model"
        self.input_data = [md.Data(left_variables =[md.Variable("n", 0)], # source 0 (MNO)
                                   right_variables =[md.Variable("o", 1),
                                                     md.Variable("l", 1),
                                                     md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="Y"), # C0: Y
                                   name="MNO data (sim)"),
                                   md.Data(left_variables =[md.Variable("p", 0)],  # source 1 (admin)
                                           right_variables =[md.Variable("o", 1),
                                                             md.Variable("t", 2)],
                                           set_of_units=md.SetOfIncludedUnits(name="X"),  # C1: X
                                           name="expected persons")]  
        self.output_data = md.Data(left_variables =[md.Variable("p", 0)],
                                   right_variables =[md.Variable("o", 1),
                                                     md.Variable("l", 1),
                                                     md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="X"),
                                   name="MNO data (persons)")
        self.units_rule = "custom" 

    def apply(self, potential_input):
        # If each source in the required input (self.input_data) is present in the potential_input, then the model is applicable
        # We "accept" a data set without regarding the set_of_units, only the left and right variables must match exactly, because the models were defined with dummy set_of_units

        # For this model, the output set_of_units is equal to the set_of_units of input data 1 (admin (P0|D1, O1, T2): X
        # If Y is a subset of X

        # Check if both sources are availble (variables only)
        source_match_0 = any([ds.shrink_variables_only(
            self.input_data[0]) for ds in potential_input])
        source_match_1 = any([ds.shrink_variables_only(
            self.input_data[1]) for ds in potential_input])

        if source_match_0 and source_match_1:
            # Now the variables match, we'll inspect the population (set_of_units)
            # For all sources that match based on variables, we take the set_of_units so we can apply the set_of_units check(s)
            units_matches_0 = [
                ds.set_of_units for ds in potential_input if ds.shrink_variables_only(self.input_data[0])]  # Y
            units_matches_1 = [
                ds.set_of_units for ds in potential_input if ds.shrink_variables_only(self.input_data[1])]  # X

            # Output list: here we will add any valid outcomes based on the inputs
            output_list = []

            # We have one or more matches for both data sources. Now let's check if there is a combination that satisfies the set_of_units condition
            # For this model: C1 is a subset of C0, then the set_of_units is C0

            for soiu0, soiu1 in itertools.product(units_matches_0, units_matches_1):
                # Check all combinations of potential C0's and C1's
                if soiu0.is_subset(soiu1):  # is C1(X) a subset of C0(Y)?
                    # Yes, there is a combination that satisfies the condition
                    # set_of_units Y is a subset of set_of_units X
                    output_data_temp = copy.deepcopy(
                        self.output_data)  # copy the output_data
                    # overwrite the set_of_units (output is X)
                    output_data_temp.set_of_units = soiu1
                    output_data_temp.reset_score()
                    output_list.append(output_data_temp)
            if len(output_list) > 0:
                return set(output_list)

        return False

             
        

In [None]:
class ModelCreateOD(md.Model):
    def __init__(self):
        self.name = "Create OD matrix"
        self.input_data = [md.Data(left_variables =[md.Variable("b", 0),
                                                    md.Variable("o", 0),
                                                    md.Variable("d", 0)], # source 0 (MNO)
                                   right_variables =[md.Variable("p", 0),
                                                     md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="X"),
                                   name="admin data")]  
        self.output_data = md.Data(left_variables =[md.Variable("p", 0)],
                                   right_variables =[md.Variable("o", 0),
                                                     md.Variable("d", 0),
                                                     md.Variable("t", 2)],
                                   set_of_units=md.SetOfIncludedUnits(name="X"),
                                   name="admin-based OD")
        self.units_rule = "equal"  

Model: Location estimation data for MNO

The location estimation model for MNO dta allows for variables that are based on geographic region to be transformed from G_2 to G_1. Given enough accuracy of the model and available data on cell towers, the model may potentially allow for a transformation from G_2 to G_0. For now, we assume the model is available for G_2 to G_1. Given this assumption, the most logical implementation is to update the aggregation graph of all variables based on geographic region. We want this model to be optional however, so it has been implemented as a single use model, which is applied once before the path search starts (if it is available).


In [None]:
class ModelLocationEstimation(md.ModelSingleUse):
    def __init__(self, name, variables_to_alter, node_from, node_to):
        self.name = name
        self.variables_to_alter = variables_to_alter
        self.node_from = node_from
        self.node_to = node_to

    def apply(self):
        """
        The location estimation model is applicable to all variables based on geographic region. 
        Introducing some uncertainty, it provides a way to translate cell tower coverage areas
        (self.node_to = 2) to municipalities (self.node_to = 1) or neighbourhoods (self.node_to = 0). 
        
        Assumption: this model is trained for all providers of the MNO data.
        """

        for varname in self.variables_to_alter:
            agg_graph_tmp = md.AggregationGraph.get(varname)
            agg_graph_tmp.add_aggregation_edge((self.node_from, self.node_to), model=self)  # add the node
        

In [None]:
m1 = ModelModalityChoice()
m2 = ModelShortestPath()
m3 = ModelCalibration()
m4 = ModelCalibrateSimtoPerson()
m5 = ModelCreateOD()
m6_1 = ModelLocationEstimation(name = "Location estimation (crude)",
    variables_to_alter=["d", "l", "o"], node_from=2, node_to=1)
m6_2 = ModelLocationEstimation(name = "Location estimation (detailed)",
    variables_to_alter=["d", "l", "o"], node_from=2, node_to=0)

potential_models_mno = [m1, m2, m3, m4, m5, m6_1, m6_2]

#models_mno = [m4, m5]

# Show legends

In [None]:
# Show the legend of variables and granularities
md.legend_print(legend_variables)

In [None]:
# Sets of included units
print("\033[1m----------Sets of included units (legend)----------\033[0m")

for soiu in all_soius:
    print(soiu)


In [None]:
# Data sets
print("\033[1m----------Data sets (legend)----------\033[0m")

for ds in start_set_potential:
    print(ds.str_descriptive(legend_variables=legend_variables))

# User choices

In [None]:
# Display all
for name, dropdown in dropdowns.items():
    display(dropdown)


In [None]:
# automatically generate the mno data based on the user options
data_mno = create_mno_data(dropdowns)
start_set_potential = [data_transport_survey, data_pop_register, data_census, data_traffic_loops,
                       data_routes, data_mno]  # ensure mno data is in potential set of starting data sources

In [None]:
# Create two separate CheckboxGroup instances
input_data_checkbox = md.CheckboxData(
    start_set_potential, "Select input data sets:", print_full_name=False, single_option=False)

# Display both groups
input_data_checkbox.display()

In [None]:
# Create two separate CheckboxGroup instances
input_model_checkbox = md.CheckboxModel(
    potential_models_mno, "Select input models:", print_full_name=False, single_option=False)

# Display both groups
input_model_checkbox.display()

In [None]:
# Create two separate CheckboxGroup instances
goal_data_checkbox = md.CheckboxData(goal_options,
                                       "Select target output:", 
                                       print_full_name=False, 
                                       single_option=True)

# Display both groups
goal_data_checkbox.display()

# Confirm and inspect test case
Create the test object and display it when the button is clicked.

In [None]:
button_settings = widgets.Button(
    description="Proceed to next step", layout={'width': 'max-content'})
button_settings_output = widgets.Output()

test_mno_1 = None  # Initialize outside of button widget

def button_settings_clicked(b):
    button_settings_output.clear_output()
    with button_settings_output:
        global test_mno_1
        # Reload test case
        test_mno_1 = md.TestCase(goal=goal_data_checkbox.get_selected(),
                                start_set=md.SetOfSources(
            start_set=input_data_checkbox.get_selected()),
            models=input_model_checkbox.get_selected())

        # Inspect test case
        print("Available data: \n"+ test_mno_1.start_set.str_nameonly())
        print("Available Models: \n " + "\n ".join([str(x.name) for x in test_mno_1.models]))
        print("Target output: " + str(test_mno_1.goal))
        display(Markdown("Would you like to analyse the above scenario?"))

display(button_settings, button_settings_output)
button_settings.on_click(button_settings_clicked)

# Solve MNO test case

In [None]:
button_solve = widgets.Button(
    description="Yes, analyse scenario", layout={'width': 'max-content'})
button_solve_output = widgets.Output()

# Clear the notebook output before displaying new widgets
button_solve_output.clear_output()

def button_solve_clicked(b):
    button_solve_output.clear_output()
    with button_solve_output:
        result = md.a_star(start_set=test_mno_1.start_set,
                           goal=test_mno_1.goal,
                           models=test_mno_1.models,
                           max_iteration=25,
                           similarity_choice="topsum",  # sum
                           score_function_parameter=3,  # for topsum: multiplier
                           prints=False,
                           preprocess_rhs=True,
                           find_multiple_paths=False,
                           shedding=True,
                           shedding_n=50,
                           variant="individual")  # base, normalized_basic, normalized_coupled, individual


        # display results
        if isinstance(result, md.SetOfSources):
            # one path was found
            to_print = md.path_print(result.path)
            display("Target output can be created by the following path:")
            
            to_print = to_print.style.format()
            to_print.hide()
            display(to_print)
        elif isinstance(result, list):
            # multiple paths were found
            display(str(len(result))+" paths were found")
        else:
            # no path was found
            display(result)

display(button_solve, button_solve_output)
button_solve.on_click(button_solve_clicked)

# Testing

In [None]:
if False:
    result = md.a_star(start_set=test_mno_1.start_set,
                    goal=test_mno_1.goal,
                    models=test_mno_1.models,
                    max_iteration=15,
                    similarity_choice="topsum",  # sum
                    score_function_parameter=3,  # for topsum: multiplier
                    prints=False,
                    preprocess_rhs=True,
                    find_multiple_paths=False,
                    shedding=True,
                    shedding_n=50,
                    variant="individual")  # base, normalized_basic, normalized_coupled, individual

In [None]:
#agg = md.AggregationGraph.get("o")
#agg.plot_graph()

In [None]:
# result.tree

In [None]:
#for s in result.path:
  #  print(s.method_detail)

In [None]:
#md.path_print(result.path)

In [None]:
#for r in result.set_of_sources:
  #  print(r.tree)