In [1]:
import warnings
warnings.filterwarnings("ignore", message=r"Passing", category=FutureWarning)

In [2]:
import json
import matplotlib.pyplot as plt
import matplotlib as mtl
import numpy as np
import operator
import pandas as pd
import imageio
import re

from carla import log
from functools import reduce
from os import listdir, rename
from os.path import isfile, join

[INFO] Using Python-MIP package version 1.12.0 [model.py <module>]


Using TensorFlow backend.


In [3]:
def plot_distribution(data, initial_data, model, output_directory, generator_name,
                      plot_type, plot_id, cfs=None, show_plot=True):
    """
    Generates a plot a two-class dataset and saves it to a file.
    
    Args:
        data (pandas.DataFrame): 
            Records along with their labels.
        model (MLModelCatalog):
            Classifier implementing a `predict_proba()` method.
        output_directory (str): 
            Name of the directory where images are saved.
        generator_name (str): 
            Name of the applied recourse generator.
        plot_name (str): 
            Type of the created plot.
        plot_id (str): 
            ID for the generated plot (e.g. consecutive numbers for different distributions).
        show_plot (Boolean): 
            If True the plot will also be outputted directly to the notebook.
    """
    
    train = data._df_train
    test = data._df_test
    fig, ax = plt.subplots()

    fig.set_dpi(150)
#     plt.axis('equal')
    ax.set_xlim([-0.25, 1.25])
    ax.set_ylim([-0.25, 1.25])
    ax.set_xlabel('$feature1$')
    ax.set_ylabel('$feature2$')
    
    x0, x1, z = calculate_boundary(data._df, model)
    ax.contourf(x0, x1, z, cmap='plasma', levels=10, alpha=0.8)
        
    # Emphasize which samples have been turned into counterfactuals
    if cfs is not None:
        label = cfs.iloc[:, 2]
        ax.scatter(cfs.iloc[:, 0], cfs.iloc[:, 1], s=30, facecolor='darkorange', 
                    linewidth=1.2, edgecolor='limegreen', label='counterfactual')

        fs = initial_data.loc[cfs.index]
        ax.scatter(fs.iloc[:, 0], fs.iloc[:, 1], s=30, facecolor='None',
                    linewidth=1.2, edgecolor='limegreen', label='factual (origin)')     

        train = train.drop(cfs.index)
        
    test = test.to_numpy()
    y = test[:, 2]
    y = y.reshape((len(y), ))
    
    # Predict the labels of test samples
    test_samples = test[:, :2]
    test_pred = np.argmax(model.predict_proba(test_samples), axis=1)
    
    # Plot test samples and mark ones which are misclassified
    ax.scatter(test[(test_pred == 1) & (y == 1), 0], test[(test_pred == 1) & (y == 1), 1],
                s=30, c='darkorange', linewidth=0.6, edgecolor='black', marker='X', label='test, +, correct')
    ax.scatter(test[(test_pred == 0) & (y == 1), 0], test[(test_pred == 0) & (y == 1), 1],
                s=30, c='darkorange', linewidth=0.6, edgecolor='crimson', marker='X', label='test, +, misclassfied')
    ax.scatter(test[(test_pred == 0) & (y == 0), 0], test[(test_pred == 0) & (y == 0), 1],
                s=30, c='cornflowerblue', linewidth=0.6, edgecolor='black', marker='X', label='test, -, correct')
    ax.scatter(test[(test_pred == 1) & (y == 0), 0], test[(test_pred == 1) & (y == 0), 1],
                s=30, c='cornflowerblue', linewidth=0.6, edgecolor='crimson', marker='X', label='test, -, misclassified')
    
    # Plot train samples
    train = train.to_numpy()
    y = train[:, 2]
    y = y.reshape((len(y), ))
    
    # Predict the labels of test samples
    train_samples = train[:, :2]
    train_pred = np.argmax(model.predict_proba(train_samples), axis=1)
    
    # Plot train samples and mark ones which are misclassified
    ax.scatter(train[(train_pred == 1) & (y == 1), 0], train[(train_pred == 1) & (y == 1), 1],
                s=30, c='darkorange', linewidth=0.6, edgecolor='black', label='train, +, correct')
    ax.scatter(train[(train_pred == 0) & (y == 1), 0], train[(train_pred == 0) & (y == 1), 1],
                s=30, c='darkorange', linewidth=0.6, edgecolor='crimson', label='train, +, misclassified')
    ax.scatter(train[(train_pred == 0) & (y == 0), 0], train[(train_pred == 0) & (y == 0), 1],
                s=30, c='cornflowerblue', linewidth=0.6, edgecolor='black', label='train, -, correct')
    ax.scatter(train[(train_pred == 1) & (y == 0), 0], train[(train_pred == 1) & (y == 0), 1],
                s=30, c='cornflowerblue', linewidth=0.6, edgecolor='crimson', label='train, -, misclassified')
    
    ax.legend(bbox_to_anchor=(1.01, 1.0), loc='upper left')
    # Save (and output) the figure
    figure = plt.gcf()
    if show_plot:
        plt.show()
     
    figure.savefig(f"{output_directory}/{generator_name}_{plot_type}_{f'{plot_id:06}'}.png", bbox_inches='tight')

In [4]:
def calculate_boundary(data, model):
    data = data.to_numpy()
    x_min = np.min(data[:, :], axis=0) - 1
    x_max = np.max(data[:, :], axis=0) + 1
    
    x0, x1 = np.meshgrid(np.arange(x_min[0], x_max[0], 0.01),
                         np.arange(x_min[1], x_max[1], 0.01))
    
    x_new = np.c_[x0.flatten().reshape((-1, 1)),
                  x1.flatten().reshape((-1, 1))]
    
    y_new = model.predict_proba(x_new)[:, 1]
    z = y_new.reshape(x0.shape)
    
    return x0, x1, z

In [5]:
def get_by_path(root, items):
    """
    Access a dictionary based on a set of keys in the provided order.
    
    Args:
        root (dict):
            Top-level of the nested dictionary.
        items (List[str]):
            List of strings specifying the consecutive keys.
            
    Returns:
        object: Value corresponding to the last key in the `items` list.
    
    """  
    try: 
        return reduce(operator.getitem, items, root)
    except:
        raise ValueError("Specified path does not exist in the dictionary.")


def plot_experiment_data(output_directory, generators, plot_type, dict_path,
                         file_name='measurements.json', show_plot=False):
    """
    Plots a specified component of the experiment data gathered over all epochs of recourse.
    
    Args:
        output_directory (str): 
            Name of the directory where images are saved.
        generator_name (List[str]): 
            List of the names of all generators which should be plotted.
        plot_type (str): 
            Type of the created plot.
        dict_path (List[str]):
            Location of the measurements of interest within the dictionary of experiment data.
        file_name (str):
            Name of the file containing the experiment data dictionary.
        show_plot (Boolean): 
            If True the plot will also be outputted directly to the notebook.
    
    """
    with open(f'{output_directory}/{file_name}') as data_file:
        data = json.load(data_file)
        
        plt.figure(dpi=150)
        plt.grid(True)
        
        # Apply consistent theme over all plots generated for the project
        colormap = plt.cm.plasma
        colors = [colormap(int(g * colormap.N / len(generators))) for g in range(len(generators))]
        
        for index, g in enumerate(generators):
            # Check if the generators have been correctly specified
            if g not in data:
                raise ValueError(f'No measurements available for {g}')
              
            # Sort the keys in a dictionary of the generator in a chronological order
            data[g] = {int(k): v for k, v in data[g].items()}
            epochs = sorted(data[g].items())
            
            result = []
            for e in epochs:
                result.append(get_by_path(e[1], dict_path))
                
            plt.plot(range(len(result)), result, linewidth=2,
                     label=f'{g.capitalize()}', color=colors[index])
        
        # Format the plot
        plt.xlim([0, len(result) - 1])
        plt.ylim([0 - 0.2 * max(result), 1.2 * max(result)])
        plt.legend()
        plt.savefig(f"{output_directory}/{plot_type}.png", bbox_inches='tight')
        
        # Only show if asked
        if show_plot:
            plt.show()
            
        plt.close

In [6]:
experiment_path = '../experiment_data/20220514215330_p_value'


# config = {'type': 'MMD', 'dict_path': ['MMD', 'value']}
# config = {'type': 'disagreement', 'dict_path': ['disagreement']}
# config = {'type': 'num_clusters', 'dict_path': ['distribution', 'num_clusters']}

# plot_experiment_data(experiment_path, ['DICE_1', 'DICE_5', 'Wachter'], config['type'], config['dict_path'])

In [7]:
def generate_gif(experiment_path, generator_name):
    """
    Collect the images of data distribution over time and process them into a .gif file.
    
    Args:
        experiment_path (str): path to the experiment directory where images are stored
        generator_name (str): name of the generator (either 'DICE' or 'wachter') which should be processed
    """
    # Find corresponding filenames
    filenames = [name for name in listdir(experiment_path) if isfile(join(experiment_path, name))]
    filenames = list(filter(lambda name: name.startswith(generator_name), filenames))

    # Generate the GIF and save to the original directory
    images = []
    for filename in sorted(filenames):
        for _ in range(3):
            images.append(imageio.imread(f'{experiment_path}/{filename}'))
    imageio.mimsave(f'{experiment_path}/{generator_name}.gif', images)

In [8]:
# experiment_path = '../experiment_data/20220514215330_p_value'

# generate_gif(experiment_path, 'DICE_1')
# generate_gif(experiment_path, 'DICE_5')
# generate_gif(experiment_path, 'Wachter')