In [None]:
import numpy as np
import matplotlib.pyplot as plt

from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier

from utility import umap_picture, show_images, show_sample
from load_data import load_mnist, load_electro, load_sklearn_digits, create_MNIST_stream, load_har_data, sea_fd, stagger

import drift_counterfactuals as dcf

np.random.seed(42)

# A Simple Usecase Based on the Electricity Market Dataset

The following sample is based on the electricity market dataset. It illustrates how to use the method directly on the dataset. ...

In [None]:
## Load and Compose Dataset
X_0, X_1 = load_electro()

X = np.vstack( (X_0,X_1) )
y = np.array( X_0.shape[0]*[0]+X_1.shape[0]*[1] )

## Perform Drift Localization and find Characteristic Samples
samps,y_s, p, n,_ = dcf.select_samples(X,y, select_from_samples=True)

## Train and test compressed Model for Drift Localization 
model = dcf.simple_model(X,y,p, background=2.5,model = RandomForestClassifier())
print("Wrong timed samples before", (model.predict(X_0)==1).mean(), "Background confusion before", (model.predict(X_0)==3).mean() )
print("Wrong timed samples after ", (model.predict(X_1)==0).mean(), "Background confusion after ", (model.predict(X_1)==3).mean() )
 
## Compute Counterfactuals
cfs = dcf.compute_counterfactual_explanation(samps, y_s, model, method="assignment", X=X)
print("Classefied CS",model.predict(samps),"Classified CF",model.predict(cfs))

##Plot Results
umap_picture(X,p,n, model, samps,cfs)
plt.show()
show_sample(samps,cfs)
plt.show()

In [None]:
## Load and Compose Dataset
X_0, X_1 = load_sklearn_digits()

## Perform Drift Localization and find Characteristic Samples
X = np.vstack( (X_0,X_1) )
y = np.array( X_0.shape[0]*[0]+X_1.shape[0]*[1] )

samps,y_s, p, n,_ = dcf.select_samples(X,y, select_from_samples=True)

## Train and test compressed Model for Drift Localization 
model = dcf.simple_model(X,y,p, background=2.5,model = RandomForestClassifier())
print("Wrong timed samples before", (model.predict(X_0)==1).mean(), "Background confusion before", (model.predict(X_0)==3).mean() )
print("Wrong timed samples after ", (model.predict(X_1)==0).mean(), "Background confusion after ", (model.predict(X_1)==3).mean() )
 
## Compute Counterfactuals
cfs = dcf.compute_counterfactual_explanation(samps, y_s, model, method="assignment", X=X)
print("Classefied CS",model.predict(samps),"Classified CF",model.predict(cfs))

## Plot Results
umap_picture(X,p,n, model, samps,cfs)
plt.show()
show_images(samps,cfs, decoder=lambda x:x.reshape((8,8)))
plt.show()

In [None]:
X_0, X_1 = load_mnist()

X = np.vstack( (X_0,X_1) )
y = np.array( X_0.shape[0]*[0]+X_1.shape[0]*[1] )

samps,y_s, p, n,_ = dcf.select_samples(X,y, select_from_samples=True)
    
model = dcf.simple_model(X,y,p, background=2.5,
                         model = RandomForestClassifier())
                         #model=KNeighborsClassifier(n_neighbors=25))
print( (model.predict(X_0)==1).mean(), (model.predict(X_0)==3).mean() )
print( (model.predict(X_1)==0).mean(), (model.predict(X_1)==3).mean() )

cfs = dcf.compute_counterfactual_explanation(samps, y_s, model, method="assignment", X=X)
print(model.predict(samps),model.predict(cfs))

umap_picture(X,p,n, model, samps,cfs)
plt.show()
show_images(samps,cfs, decoder=lambda x:x.reshape((28,28)))
plt.show()

In [None]:
## Load datastream
X,y,_ = create_MNIST_stream()
X = X.reshape((X.shape[0],-1))

## Localize Drift and find Characteristic Samples
samps,y_s, p, n,_ = dcf.select_samples(X,y, select_from_samples=True)

## Train Simple Localization Model
model = dcf.simple_model(X,y,p, background=2.5,model = RandomForestClassifier())

## Compute Counterfactuals
cfs = dcf.compute_counterfactual_explanation(samps, y_s, model, method="assignment", X=X)
print(model.predict(samps),model.predict(cfs))

## Plot Results
umap_picture(X,p,n, model, samps,cfs)
plt.show()
show_images(samps,cfs, decoder=lambda x:x.reshape((28,28)))
plt.show()

In [None]:
from vae import load_model

## Load Dataset
X,y,_ = create_MNIST_stream()

## Obtain latent representation
vae = load_model()
X, _, _ = vae.encoder.predict( np.expand_dims(X, -1).astype("float32") / 255 )

## Localize Drift and find CS in Latent Space
samps,y_s, p, n,_ = dcf.select_samples(X,y, select_from_samples=True)

## Train simplified model in Latent Space
model = dcf.simple_model(X,y,p, background=2.5,model = RandomForestClassifier())

## Compute Counterfactuals in Latent Space
cfs = dcf.compute_counterfactual_explanation(samps, y_s, model, method="assignment", X=X)
print(model.predict(samps),model.predict(cfs))

## Plot Results
show_images(samps,cfs, decoder=lambda x:vae.decoder.predict(x.reshape((1,-1))).reshape((28,28)))
plt.show()

In [None]:
from ceml.sklearn import generate_counterfactual
import torch
from pytorch_pretrained_biggan import (BigGAN, one_hot_from_names, truncated_noise_sample, convert_to_images)

def do_one_hot(X, one_hot):
    X = X.copy()
    best = X[:,one_hot].argmax(axis=1)
    for i,j in enumerate(best):
        v = round(max(0,min(X[i,one_hot[j]],1)))
        X[i,one_hot] = 0
        X[i,one_hot[j]] = v
    return X 

image_net__one_hot=1000
image_net__one_hot_index = range(image_net__one_hot)
def generate_image_net_stream(truncation = 0.4, n=1500, pre_drift=[277], post_drift=[280], both=[253,281,333, 279,269,270,271]):
    one_hot_index,one_hot = image_net__one_hot_index,image_net__one_hot
    data = np.concatenate( (np.zeros( (2*n,one_hot) ),truncated_noise_sample(truncation=truncation, batch_size=2*n)), axis=1 )
    for i,j in enumerate(np.hstack( (np.random.choice( pre_drift+both, size=n, replace=True ),np.random.choice( post_drift+both, size=n, replace=True )) )):
        data[i,j] = 1
    y = np.array(n*[0]+n*[1])
    return data, y

## Transforms Latent Vector Into Image
def gen_image(model, data, n_noise=128, n_class=1000, truncation=0.4):
    assert data.shape[1] == n_class + n_noise
    class_vector = torch.from_numpy(data[:,:n_class]).float()
    noise_vector = torch.from_numpy(data[:,n_class:]).float()
    
    with torch.no_grad():
        output = model(noise_vector, class_vector, truncation)
    output = output.to('cpu')
    return convert_to_images(output)
def plot_result(model, orig, clfs,n_noise=128, n_class=1000):
    for o,c in zip(gen_image(model,orig),gen_image(model,clfs)):
        plt.subplot(121)
        plt.imshow(np.array(o));plt.xticks([],[]); plt.yticks([],[])
        plt.subplot(122)
        plt.imshow(np.array(c));plt.xticks([],[]); plt.yticks([],[])
        plt.show()

## Generate Counterfactual With One Hot Encoding
def generate_counterfactual_one_hot(model, x, y_target, one_hots, lam=None, max_itr=100):
    x_cf = x.copy()
    prediction = None
    for _ in range(max_itr):
        cf = generate_counterfactual(model, x_cf, y_target=y_cf, features_whitelist=None)
        x_cf = cf["x_cf"].copy()
        for oh in one_hots:
            x_cf[oh] -= (2/(np.abs(cf["delta"][oh]).max()) if lam is None else lam-1)*cf["delta"][oh]
            x_cf = do_one_hot(x_cf.reshape(1,-1),oh)[0]
        prediction = model.predict(x_cf.reshape(1,-1))[0] 
        if prediction == y_target:
            break
    if prediction != y_target or sum([sum(abs(x_cf[oh]))!=1 for oh in one_hots]) != 0:
        print("WARNING: No counter factual found")
        return None
    return {"x_cf": x_cf, "y_cf": prediction, "delta": x-x_cf}

## Setup
p_thr = 0.1
decoder = BigGAN.from_pretrained('biggan-deep-256')

## Generate Stream
X,y = generate_image_net_stream()

## Select Samples
original_samples,y_s, p, n0,n1 = dcf.select_samples(X, y, p_thr=p_thr)
sel = p < p_thr
original_labels = n0*[0]+n1*[1]

original_samples = do_one_hot(original_samples, image_net__one_hot_index)

## Train Decision Model
y_ = 2*np.ones(y.shape)
y_[sel] = y[sel]
best_score, best_model = -1, None
for _ in range(100):
    model = DecisionTreeClassifier()
    model.fit(X, y_)
    score = model.score(original_samples,original_labels)
    if score > best_score:
        best_score, best_model = score, model
        if best_score == 1:
            break
model = best_model
if best_score < 1:
    print("WARNING: No valid model found")
print("train score: %.2f, \t sample score: %.2f"%(model.score(X,y_), model.score(original_samples,original_labels)))

## Generate Counterfactuals
orig, cfs = [], []
for i in range(original_samples.shape[0]):
    if original_labels[i] != 2:
        x,y_t = original_samples[i,:], original_labels[i]
        y_cf = 1-y_t
        print("True label on x: {1}, Prediction on x: {0}, Aiming for label: {2}".format(model.predict([x])[0], y_t, y_cf), end="")
        cf = generate_counterfactual_one_hot(model, x, y_target=y_cf, one_hots=[image_net__one_hot_index])
        if cf is not None:
            print(", Found: {0}, Succsess: {1}".format(cf["y_cf"], cf["y_cf"]==y_cf))
            if cf["y_cf"]==y_cf:
                orig.append(x)
                cfs.append(cf["x_cf"])

## Plot Results
plot_result(decoder, np.array(orig).reshape(len(orig),-1), np.array(cfs).reshape(len(cfs),-1))