In [1]:
# %matplotlib widget

In [2]:
# IMPORTS
import random
import numpy as np
import pandas as pd

import networkx as nx
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.spatial.distance import pdist, squareform
from sklearn.manifold import MDS

from deap import algorithms
from deap import base
from deap import creator
from deap import tools

from functools import partial
from ProblemScripts import load_problem_KP

In [3]:
n_items = 30

In [4]:
def OneMax_fitness(sol):
    fit = sum(sol)
    return fit

def generate_zero_solution(length):
    return np.zeros(length, dtype=int)

In [5]:
def MutationOnlyEA(NGEN, popsize, tournsize, MUTPB, indpb, starting_solution=None):
    if not hasattr(creator, "OneMax_fitness"):
        creator.create("OneMax_fitness", base.Fitness, weights=(1.0,))
    if not hasattr(creator, "Individual"):
        creator.create("Individual", list, fitness=creator.OneMax_fitness)

    # Define the toolbox
    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=10)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", lambda ind: (sum(ind),))
    toolbox.register("mutate", tools.mutFlipBit, indpb=indpb)
    toolbox.register("select", tools.selTournament, tournsize=tournsize)

    # Create an initial population
    population = toolbox.population(n=popsize)

    # Set starting solution if provided
    if starting_solution is not None:
        for ind in population:
            ind[:] = starting_solution[:]

    # Recording every generation's population
    all_generations = []
    best_solutions = []
    best_fitnesses = []

    # Evaluate the initial population
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    # Record initial population and best solution
    all_generations.append([ind[:] for ind in population])
    best_individual = max(population, key=lambda ind: ind.fitness.values)
    best_solutions.append(toolbox.clone(best_individual))
    best_fitnesses.append(best_individual.fitness.values[0])

    # Evolutionary loop
    for gen in range(NGEN):
        # Select the offspring using tournament selection
        offspring = [toolbox.clone(toolbox.select(population, 1)[0]) for _ in range(len(population))]

        # Apply mutation on the offspring
        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # Replace the weakest individuals in the population with the offspring regardless of fitness value
        for mutant in offspring:
            weakest_idx = min(range(len(population)), key=lambda idx: population[idx].fitness.values)
            population[weakest_idx] = mutant

        # Record current population and best solution
        all_generations.append([ind[:] for ind in population])
        best_individual = max(population, key=lambda ind: ind.fitness.values)
        best_solutions.append(toolbox.clone(best_individual))
        best_fitnesses.append(best_individual.fitness.values[0])

    return all_generations, best_solutions, best_fitnesses


In [6]:
def extract_trajectory_data(best_solutions, best_fitnesses):
    # Extract unique solutions and their corresponding fitness values
    unique_solutions = []
    unique_fitnesses = []
    solution_iterations = []
    seen_solutions = {}

    for solution, fitness in zip(best_solutions, best_fitnesses):
        # Convert solution to a tuple to make it hashable
        solution_tuple = tuple(solution)
        if solution_tuple not in seen_solutions:
            seen_solutions[solution_tuple] = 1
            unique_solutions.append(solution)
            unique_fitnesses.append(fitness)
        else:
            seen_solutions[solution_tuple] += 1

    # Create a list of iteration counts for each unique solution
    for solution in unique_solutions:
        solution_tuple = tuple(solution)
        solution_iterations.append(seen_solutions[solution_tuple])

    return unique_solutions, unique_fitnesses, solution_iterations

In [7]:
def extract_transitions(unique_solutions):
    # Extract transitions between solutions over generations
    transitions = []

    for i in range(1, len(unique_solutions)):
        prev_solution = tuple(unique_solutions[i - 1])
        current_solution = tuple(unique_solutions[i])
        transitions.append((prev_solution, current_solution))

    return transitions

In [8]:
def run_multiple_ea_runs(num_runs, NGEN, popsize, tournsize, MUTPB, indpb, starting_solution=None):
    all_run_trajectories = []
    for run in range(num_runs):
        _, best_solutions, best_fitnesses = MutationOnlyEA(NGEN, popsize, tournsize, MUTPB, indpb, starting_solution)
        unique_solutions, unique_fitnesses, solution_iterations = extract_trajectory_data(best_solutions, best_fitnesses)
        transitions = extract_transitions(unique_solutions)
        all_run_trajectories.append((unique_solutions, unique_fitnesses, solution_iterations, transitions))
    return all_run_trajectories

In [9]:
def umda_update_full(population, pop_size, select_size, replace_size, toolbox):
    # select from population
    selected_population = tools.selBest(population, select_size)

    # Calculate marginal propabilities
    probabilities = np.mean(selected_population, axis=0)

    new_solutions = []
    for _ in range(pop_size):
        new_solution = np.random.rand(n_items) < probabilities
        new_solution = creator.Individual(new_solution.astype(int).tolist())  # Create as DEAP Individual
        new_solutions.append(new_solution)
    
    return new_solutions

def UMDA(probsize, NGEN, popsize, selectsize, starting_solution=None):
    if not hasattr(creator, "OneMax_fitness"):
        creator.create("OneMax_fitness", base.Fitness, weights=(1.0,))
    if not hasattr(creator, "Individual"):
        creator.create("Individual", list, fitness=creator.OneMax_fitness)

    # Define the toolbox
    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=probsize)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", lambda ind: (sum(ind),))

    # Create an initial population
    population = toolbox.population(n=popsize)

    # Set starting solution if provided
    if starting_solution is not None:
        for ind in population:
            ind[:] = starting_solution[:]

    # Recording every generation's population
    all_generations = []
    best_solutions = []
    best_fitnesses = []

    # Evaluate the initial population
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    # Record initial population and best solution
    all_generations.append([ind[:] for ind in population])
    best_individual = max(population, key=lambda ind: ind.fitness.values)
    best_solutions.append(toolbox.clone(best_individual))
    best_fitnesses.append(best_individual.fitness.values[0])

    # Evolutionary loop
    for gen in range(NGEN):
        population = umda_update_full(population, popsize, selectsize, 0, toolbox)

        fitnesses = list(map(toolbox.evaluate, population))
        for ind, fit in zip(population, fitnesses):
            ind.fitness.values = fit

        # Record current population and best solution
        all_generations.append([ind[:] for ind in population])
        best_individual = max(population, key=lambda ind: ind.fitness.values)
        best_solutions.append(toolbox.clone(best_individual))
        best_fitnesses.append(best_individual.fitness.values[0])

    return all_generations, best_solutions, best_fitnesses

def run_multiple_UMDA_runs(num_runs, probsize, NGEN, popsize, selectsize, starting_solution=None):
    all_run_trajectories = []
    for run in range(num_runs):
        _, best_solutions, best_fitnesses = UMDA(probsize, NGEN, popsize, selectsize, starting_solution)
        unique_solutions, unique_fitnesses, solution_iterations = extract_trajectory_data(best_solutions, best_fitnesses)
        transitions = extract_transitions(unique_solutions)
        all_run_trajectories.append((unique_solutions, unique_fitnesses, solution_iterations, transitions))
    return all_run_trajectories

In [None]:
ss = generate_zero_solution(n_items)
all_run_trajectories1 = run_multiple_ea_runs(num_runs=5, NGEN=1000, popsize=100, tournsize=30, MUTPB=1, indpb=0.2, starting_solution=ss)
all_run_trajectories2 = run_multiple_UMDA_runs(num_runs=5, probsize=n_items, NGEN=1000, popsize=100, selectsize=50, starting_solution=None)

In [20]:
all_trajectories_list = [all_run_trajectories1, all_run_trajectories2]

In [24]:
import pickle
with open('data.pkl', 'wb') as f:
    pickle.dump(all_trajectories_list, f)

In [22]:
import plotly.graph_objs as go
import networkx as nx
import numpy as np
import ipywidgets as widgets
from IPython.display import display, clear_output
from sklearn.manifold import MDS as MDS_sklearn

plot_output = widgets.Output()

def plot_TN(all_trajectories_list, show_labels=False, hide_nodes=False, plot_3D=False, use_solution_iterations=False, MDS=False):
    G = nx.DiGraph()

    # Colors for different sets of trajectories
    edge_colors = ['blue', 'orange', 'purple', 'green', 'brown', 'cyan', 'magenta']
    node_color_shared = 'green'

    # Add nodes and edges for each set of trajectories
    node_mapping = {}  # To ensure unique solutions map to the same node
    start_nodes = set()
    end_nodes = set()
    overall_best_node = None

    def hamming_distance(sol1, sol2):
        return sum(el1 != el2 for el1, el2 in zip(sol1, sol2))

    def add_trajectories_to_graph(all_run_trajectories, edge_color):
        for run_idx, (unique_solutions, unique_fitnesses, solution_iterations, transitions) in enumerate(all_run_trajectories):
            for i, solution in enumerate(unique_solutions):
                solution_tuple = tuple(solution)
                if solution_tuple not in node_mapping:
                    node_label = f"Solution {len(node_mapping) + 1}"
                    node_mapping[solution_tuple] = node_label
                    G.add_node(node_label, solution=solution, fitness=unique_fitnesses[i], iterations=solution_iterations[i], run_idx=run_idx, step=i)
                else:
                    node_label = node_mapping[solution_tuple]

                if i == 0:
                    start_nodes.add(node_label)
                if i == len(unique_solutions) - 1:
                    end_nodes.add(node_label)

            for prev_solution, current_solution in transitions:
                if prev_solution in node_mapping and current_solution in node_mapping:
                    G.add_edge(node_mapping[prev_solution], node_mapping[current_solution], color=edge_color)

    for idx, all_run_trajectories in enumerate(all_trajectories_list):
        edge_color = edge_colors[idx % len(edge_colors)]
        add_trajectories_to_graph(all_run_trajectories, edge_color)

    overall_best_fitness = max(
        max(best_fitnesses) for all_run_trajectories in all_trajectories_list for _, best_fitnesses, _, _ in all_run_trajectories
    )
    for node, data in G.nodes(data=True):
        if data['fitness'] == overall_best_fitness:
            overall_best_node = node
            break

    node_colors = []
    for node in G.nodes():
        if node == overall_best_node:
            node_colors.append('red')
        elif node in start_nodes:
            node_colors.append('yellow')
        elif node in end_nodes:
            node_colors.append('grey')
        else:
            solution_tuple = next(key for key, value in node_mapping.items() if value == node)
            count_in_sets = 0
            for all_run_trajectories in all_trajectories_list:
                for unique_solutions, _, _, _ in all_run_trajectories:
                    if solution_tuple in set(tuple(sol) for sol in unique_solutions):
                        count_in_sets += 1
            node_colors.append(node_color_shared if count_in_sets > 1 else 'skyblue')

    node_sizes = (
        [0 for node in G.nodes()] if hide_nodes
        else [50 + G.nodes[node]['iterations'] * 20 for node in G.nodes()] if use_solution_iterations
        else [50 + G.in_degree(node) * 50 for node in G.nodes()]
    )

    if MDS:
        solutions = [data['solution'] for _, data in G.nodes(data=True)]
        n = len(solutions)
        dissimilarity_matrix = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                dissimilarity_matrix[i][j] = hamming_distance(solutions[i], solutions[j])

        mds = MDS_sklearn(n_components=2, dissimilarity='precomputed', random_state=42)
        positions_2d = mds.fit_transform(dissimilarity_matrix)
        pos = {node: positions_2d[i] for i, node in enumerate(G.nodes())}
    else:
        pos = nx.kamada_kawai_layout(G, dim=2 if not plot_3D else 3)

    if plot_3D:
        x_coords = [pos[node][0] for node in G.nodes()]
        y_coords = [pos[node][1] for node in G.nodes()]
        z_coords = [data['fitness'] for node, data in G.nodes(data=True)]

        node_trace = go.Scatter3d(
            x=x_coords,
            y=y_coords,
            z=z_coords,
            mode='markers+text' if show_labels else 'markers',
            marker=dict(size=node_sizes, color=node_colors, opacity=0.8),
            text=[node if show_labels else '' for node in G.nodes()],
        )

        edge_traces = []
        for edge in G.edges():
            x = [pos[edge[0]][0], pos[edge[1]][0], None]
            y = [pos[edge[0]][1], pos[edge[1]][1], None]
            z = [G.nodes[edge[0]]['fitness'], G.nodes[edge[1]]['fitness'], None]
            edge_color = G.edges[edge]['color']
            edge_trace = go.Scatter3d(
                x=x, y=y, z=z,
                mode='lines',
                line=dict(color=edge_color, width=2),
                opacity=0.5
            )
            edge_traces.append(edge_trace)

        fig = go.Figure(data=[node_trace] + edge_traces)
        fig.update_layout(
            title="3D Trajectory Network Plot",
            scene=dict(
                xaxis_title='X axis',
                yaxis_title='Y axis',
                zaxis_title='Fitness'
            )
        )
    else:
        x_coords = [pos[node][0] for node in G.nodes()]
        y_coords = [pos[node][1] for node in G.nodes()]

        node_trace = go.Scatter(
            x=x_coords,
            y=y_coords,
            mode='markers+text' if show_labels else 'markers',
            marker=dict(size=node_sizes, color=node_colors, opacity=0.8),
            text=[node if show_labels else '' for node in G.nodes()],
        )

        edge_traces = []
        for edge in G.edges():
            x = [pos[edge[0]][0], pos[edge[1]][0], None]
            y = [pos[edge[0]][1], pos[edge[1]][1], None]
            edge_color = G.edges[edge]['color']
            edge_trace = go.Scatter(
                x=x, y=y,
                mode='lines',
                line=dict(color=edge_color, width=2),
                opacity=0.5
            )
            edge_traces.append(edge_trace)

        fig = go.Figure(data=[node_trace] + edge_traces)
        fig.update_layout(
            title="Trajectory Network Plot",
            xaxis_title="X axis",
            yaxis_title="Y axis"
        )
    
    # Display the figure within the output widget
    plot_output.clear_output(wait=True)
    with plot_output:
        fig.show()

# Widgets for the dashboard
show_labels_toggle = widgets.Checkbox(value=False, description='Show Labels')
hide_nodes_toggle = widgets.Checkbox(value=False, description='Hide Nodes')
plot_3D_toggle = widgets.Checkbox(value=False, description='3D Plot')
use_solution_iterations_toggle = widgets.Checkbox(value=False, description='Node Size by Iterations')
MDS_toggle = widgets.Checkbox(value=False, description='Use MDS Layout')

def update_dashboard(change=None):
    plot_TN(
        all_trajectories_list,
        show_labels=show_labels_toggle.value,
        hide_nodes=hide_nodes_toggle.value,
        plot_3D=plot_3D_toggle.value,
        use_solution_iterations=use_solution_iterations_toggle.value,
        MDS=MDS_toggle.value
    )

# Attach listeners
for toggle in [show_labels_toggle, hide_nodes_toggle, plot_3D_toggle, use_solution_iterations_toggle, MDS_toggle]:
    toggle.observe(update_dashboard, names='value')

# Display the widgets and output container
display(show_labels_toggle, hide_nodes_toggle, plot_3D_toggle, use_solution_iterations_toggle, MDS_toggle)
display(plot_output)

# Initial plot
update_dashboard()


Checkbox(value=False, description='Show Labels')

Checkbox(value=False, description='Hide Nodes')

Checkbox(value=False, description='3D Plot')

Checkbox(value=False, description='Node Size by Iterations')

Checkbox(value=False, description='Use MDS Layout')

Output()

In [12]:
# n_items = 30
# ss = generate_zero_solution(n_items)

# all_run_trajectories1 = run_multiple_ea_runs(num_runs=3, NGEN=1000, popsize=10, tournsize=3, MUTPB=1, indpb=0.2, starting_solution=ss)
# all_run_trajectories2 = run_multiple_ea_runs(num_runs=3, NGEN=1000, popsize=10, tournsize=3, MUTPB=1, indpb=0.4, starting_solution=ss)
# all_run_trajectories3 = run_multiple_ea_runs(num_runs=3, NGEN=1000, popsize=10, tournsize=3, MUTPB=1, indpb=0.8, starting_solution=ss)
# all_trajectories_list = [all_run_trajectories1, all_run_trajectories2, all_run_trajectories3]

In [23]:
import dash
from dash import dcc, html, Input, Output
import plotly.graph_objs as go
import networkx as nx
import numpy as np
from sklearn.manifold import MDS as MDS_sklearn
from jupyter_dash import JupyterDash

# Sample data
all_trajectories_list = []  # Placeholder for your data

# Dash app setup
app = JupyterDash(__name__)
app.layout = html.Div([
    html.H1("Trajectory Network Dashboard"),
    dcc.Checklist(
        id='options',
        options=[
            {'label': 'Show Labels', 'value': 'show_labels'},
            {'label': 'Hide Nodes', 'value': 'hide_nodes'},
            {'label': '3D Plot', 'value': 'plot_3D'},
            {'label': 'Use Solution Iterations', 'value': 'use_solution_iterations'},
            {'label': 'MDS Layout', 'value': 'MDS'}
        ],
        value=[]
    ),
    dcc.Graph(id='trajectory-plot')
])


@app.callback(
    Output('trajectory-plot', 'figure'),
    Input('options', 'value')
)
def update_plot(options):
    show_labels = 'show_labels' in options
    hide_nodes = 'hide_nodes' in options
    plot_3D = 'plot_3D' in options
    use_solution_iterations = 'use_solution_iterations' in options
    MDS = 'MDS' in options

    G = nx.DiGraph()

    # Colors for different sets of trajectories
    edge_colors = ['blue', 'orange', 'purple', 'green', 'brown', 'cyan', 'magenta']
    node_color_shared = 'green'

    # Add nodes and edges for each set of trajectories (placeholder logic)
    node_mapping = {}  # To ensure unique solutions map to the same node
    start_nodes = set()
    end_nodes = set()
    overall_best_node = None

    # Function to calculate Hamming distance
    def hamming_distance(sol1, sol2):
        return sum(el1 != el2 for el1, el2 in zip(sol1, sol2))

    # Placeholder: Add nodes and edges to the graph
    # You would replace this with your actual logic to add trajectories

    # Prepare node positions
    if MDS:
        # Use MDS to position nodes based on dissimilarity (Hamming distance)
        solutions = [data['solution'] for _, data in G.nodes(data=True)]
        n = len(solutions)
        dissimilarity_matrix = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                dissimilarity_matrix[i][j] = hamming_distance(solutions[i], solutions[j])

        mds = MDS_sklearn(n_components=2, dissimilarity='precomputed', random_state=42)
        positions_2d = mds.fit_transform(dissimilarity_matrix)
        pos = {node: positions_2d[i] for i, node in enumerate(G.nodes())}
    else:
        # Use default spring layout
        pos = nx.spring_layout(G, dim=2 if not plot_3D else 3)

    # Prepare Plotly traces
    edge_trace = []
    for edge in G.edges(data=True):
        x0, y0 = pos[edge[0]]
        x1, y1 = pos[edge[1]]
        trace = go.Scatter(
            x=[x0, x1, None],
            y=[y0, y1, None],
            mode='lines',
            line=dict(width=2, color=edge[2].get('color', 'black')),
            hoverinfo='none'
        )
        edge_trace.append(trace)

    node_trace = go.Scatter(
        x=[],
        y=[],
        text=[],
        mode='markers+text' if show_labels else 'markers',
        hoverinfo='text',
        marker=dict(
            showscale=True,
            colorscale='YlGnBu',
            size=[],
            color=[],
            line_width=2
        )
    )

    for node in G.nodes(data=True):
        x, y = pos[node[0]]
        node_trace['x'] += tuple([x])
        node_trace['y'] += tuple([y])
        node_trace['text'] += tuple([str(node[1].get('fitness', ''))])
        node_trace['marker']['color'] += tuple(['skyblue'])  # Placeholder for color logic
        node_trace['marker']['size'] += tuple([10])  # Placeholder for size logic

    if plot_3D:
        # Placeholder for 3D plot logic
        fig = go.Figure()
        fig.update_layout(title="3D Trajectory Network Plot")
    else:
        fig = go.Figure(data=edge_trace + [node_trace],
                        layout=go.Layout(
                            title='Trajectory Network Plot',
                            showlegend=False,
                            hovermode='closest',
                            margin=dict(b=20, l=5, r=5, t=40),
                            xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                            yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)))
    return fig


if __name__ == '__main__':
    app.run_server(mode='inline', debug=True)


ModuleNotFoundError: No module named 'jupyter_dash'