ACE library.

Library for discovering and testing concept activation vectors. It contains
ConceptDiscovery class that is able to discover the concepts belonging to one
of the possible ResNet_pytorch labels of the ResNet_pytorch task of a network
and calculate each concept's TCAV score..

In [1]:
import jdc
import warnings
warnings.filterwarnings('ignore')

In [3]:
import os,sys,inspect
import scipy.stats as stats
import skimage.segmentation as segmentation
import sklearn.cluster as cluster
import sklearn.metrics.pairwise as metrics
from tcav import cav
curdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0,curdir)
from ace_helpers import *

ModuleNotFoundError: No module named 'src'

Discovering and testing concepts of a class.

For a trained network, it first discovers the concepts as areas of the iamges in the class and then calculates the TCAV score of each concept. It is also able to transform images from pixel space into concept space.

Runs concept discovery for a given class in a trained model.

For a trained ResNet_pytorch model, the ConceptDiscovery class first performs unsupervised concept discovery using examples of one of the classes in the network.

Args:\
    model: A trained ResNet_pytorch model on which we run the concept discovery algorithm\
    target_class: Name of the one of the classes of the network\
    random_concept: A concept made of random images (used for statistical test) e.g. "random500_199"\
    bottlenecks: a list of bottleneck layers of the model for which the cocept discovery stage is performed\
    sess: Model's tensorflow session\
    source_dir: This directory that contains folders with images of network's classes.\
    activation_dir: directory to save computed activations\
    cav_dir: directory to save CAVs of discovered and random concepts\
    num_random_exp: Number of random counterparts used for calculating several CAVs and TCAVs for each concept (to make statistical testing possible.)\
    channel_mean: If true, for the unsupervised concept discovery the bottleneck activations are averaged over channels instead of using the whole acivation vector (reducing dimensionality)\
    max_imgs: maximum number of images in a discovered concept\
    min_imgs : minimum number of images in a discovered concept for the concept to be accepted\
    num_discovery_imgs: Number of images used for concept discovery. If None, will use max_imgs instead.\
    num_workers: if greater than zero, runs methods in parallel with num_workers parallel threads. If 0, no method is run in parallel threads.\
    average_image_value: The average value used for mean subtraction in the nework's preprocessing stage.\

In [11]:
class ConceptDiscovery(object):
    def __init__(self,
               model,
               target_class,
               random_concept,
               bottlenecks,
               sess,
               source_dir,
               activation_dir,
               cav_dir,
               num_random_exp=2,
               channel_mean=True,
               max_imgs=40,
               min_imgs=20,
               num_discovery_imgs=40,
               num_workers=20,
               average_image_value=117
    ):
        self.model = model
        self.sess = sess
        self.target_class = target_class
        self.num_random_exp = num_random_exp
        if isinstance(bottlenecks, str):
            bottlenecks = [bottlenecks]
        self.bottlenecks = bottlenecks
        self.source_dir = source_dir
        self.activation_dir = activation_dir
        self.cav_dir = cav_dir
        self.channel_mean = channel_mean
        self.random_concept = random_concept
        self.image_shape = model.get_image_shape()[:2]
        self.max_imgs = max_imgs
        self.min_imgs = min_imgs
        if num_discovery_imgs is None:
            num_discovery_imgs = max_imgs
        self.num_discovery_imgs = num_discovery_imgs
        self.num_workers = num_workers
        self.average_image_value = average_image_value

Loads all colored images of a concept.

Args:\
    concept: The name of the concept to be loaded\
    max_imgs: maximum number of images to be loaded

Returns:\
Images of the desired concept or class.

In [12]:
%%add_to ConceptDiscovery
def load_concept_imgs(self, concept, max_imgs=1000, compute_tcav = False):
    if compute_tcav:
        concept_dir = os.path.join(self.source_dir, concept+'_50')
    else:
        concept_dir = os.path.join(self.source_dir, concept)

    img_paths = [
        os.path.join(concept_dir, d)
        for d in tf.gfile.ListDirectory(concept_dir)
    ]
    return load_images_from_files(
        img_paths,
        max_imgs=max_imgs,
        return_filenames=False,
        do_shuffle=False,
        run_parallel=(self.num_workers > 0),
        shape=(self.image_shape),
        num_workers=self.num_workers)

Creates a set of image patches using superpixel methods.

This method takes in the concept discovery images and transforms it to a dataset made of the patches of those images.

Args:\
    method: The superpixel method used for creating image patches. One of 'slic', 'watershed', 'quickshift', 'felzenszwalb'.\
    discovery_images: Images used for creating patches. If None, the images in the target class folder are used.\
    param_dict: Contains parameters of the superpixel method used in the form of {'param1':[a,b,...], 'param2':[z,y,x,...], ...}. For instance {'n_segments':[15,50,80], 'compactness':[10,10,10]} for slic method.

In [13]:
%%add_to ConceptDiscovery
def create_patches(self, method='slic', discovery_images=None, param_dict=None, gradcam=False, keep_percent=80):
    if param_dict is None:
        param_dict = {}
    dataset, image_numbers, patches = [], [], []
    if discovery_images is None:
        raw_imgs = self.load_concept_imgs(self.target_class, self.num_discovery_imgs, compute_tcav=True)
        self.discovery_images = raw_imgs
    else:
        self.discovery_images = discovery_images
    if self.num_workers:
        pool = multiprocessing.Pool(self.num_workers)
        if gradcam:
            outputs = pool.map(lambda img: self._return_gradcam_superpixels(img, method, param_dict, keep_percent), self.discovery_images)
        else:
            outputs = pool.map(lambda img: self._return_superpixels(img, method, param_dict), self.discovery_images)
        for fn, sp_outputs in enumerate(outputs):
            image_superpixels, image_patches = sp_outputs
            for superpixel, patch in zip(image_superpixels, image_patches):
                dataset.append(superpixel)
                patches.append(patch)
                image_numbers.append(fn)
    else:
        for fn, img in enumerate(self.discovery_images):
            if gradcam:
                image_superpixels, image_patches = self._return_gradcam_superpixels(img, method, param_dict, keep_percent)
            else:
                image_superpixels, image_patches = self._return_superpixels(img, method, param_dict)
            for superpixel, patch in zip(image_superpixels, image_patches):
                dataset.append(superpixel)
                patches.append(patch)
                image_numbers.append(fn)
    self.dataset, self.image_numbers, self.patches = np.array(dataset), np.array(image_numbers), np.array(patches)

Returns all patches for one image using gradcam.

Given an image, calculates superpixels for each of the parameter lists in param_dict and returns a set of unique superpixels by removing duplicates. If two patches have Jaccard similarity more than 0.5, they are concidered duplicates.

Args:

img: The input image \
method: superpixel method, one of slic, watershed, quichsift, or felzenszwalb \
param_dict: Contains parameters of the superpixel method used in the form of {'param1':[a,b,...], 'param2':[z,y,x,...], ...}. For instance {'n_segments':[15,50,80], 'compactness':[10,10,10]} for slic method.
    
Raises:

ValueError: if the segementation method is invaled.

In [None]:
%%add_to ConceptDiscovery
def _return_gradcam_superpixels(self, img, method='slic', param_dict=None, keep_percent=80):
    if param_dict is None:
        param_dict = {}

    upsample_size = (img.shape[1], img.shape[0])
    gradCAM = self.model.gradCAM_model
    chosen_class = int(self.model.labels[self.target_class]) #####
    inputs = np.expand_dims(img, axis=0)
    # ori_img = (img * 256).astype(np.uint8)
    # gradCAM.showCAMs(ori_img, inputs, chosen_class, upsample_size)
    cam = gradCAM.compute_heatmap(inputs, chosen_class, upsample_size, keep_percent)[1][:,:,0]

    if method == 'slic':
        n_segmentss = param_dict.pop('n_segments', [15, 50, 80])
        n_params = len(n_segmentss)
        compactnesses = param_dict.pop('compactness', [20] * n_params)
        sigmas = param_dict.pop('sigma', [1.] * n_params)
    elif method == 'watershed':
        markerss = param_dict.pop('marker', [15, 50, 80])
        n_params = len(markerss)
        compactnesses = param_dict.pop('compactness', [0.] * n_params)
    elif method == 'quickshift':
        max_dists = param_dict.pop('max_dist', [20, 15, 10])
        n_params = len(max_dists)
        ratios = param_dict.pop('ratio', [1.0] * n_params)
        kernel_sizes = param_dict.pop('kernel_size', [10] * n_params)
    elif method == 'felzenszwalb':
        scales = param_dict.pop('scale', [1200, 500, 250])
        n_params = len(scales)
        sigmas = param_dict.pop('sigma', [0.8] * n_params)
        min_sizes = param_dict.pop('min_size', [20] * n_params)
    else:
        raise ValueError('Invalid superpixel method!')
    unique_masks = []
    for i in range(n_params):
        param_masks = []
        if method == 'slic':
        segments = segmentation.slic(
            img, n_segments=n_segmentss[i], compactness=compactnesses[i],
            sigma=sigmas[i])
        elif method == 'watershed':
        segments = segmentation.watershed(
            img, markers=markerss[i], compactness=compactnesses[i])
        elif method == 'quickshift':
        segments = segmentation.quickshift(
            img, kernel_size=kernel_sizes[i], max_dist=max_dists[i],
            ratio=ratios[i])
        elif method == 'felzenszwalb':
        segments = segmentation.felzenszwalb(
            img, scale=scales[i], sigma=sigmas[i], min_size=min_sizes[i])
        for s in range(segments.max()):
            mask = (segments == s).astype(float)
            if np.mean(mask) > 0.001:
                unique = True

                if sum(sum(cam * mask)) == sum(sum(mask)):
                    for seen_mask in unique_masks:
                        jaccard = np.sum(seen_mask * mask) / np.sum((seen_mask + mask) > 0)
                        if jaccard > 0.5:
                            unique = False
                            break
                else:
                    unique = False

                if unique:
                    param_masks.append(mask)
        unique_masks.extend(param_masks)
    superpixels, patches = [], []
    while unique_masks:
        superpixel, patch = self._extract_patch(img, unique_masks.pop())
        superpixels.append(superpixel)
        patches.append(patch)
    return superpixels, patches

Returns all patches for one image.

Given an image, calculates superpixels for each of the parameter lists in param_dict and returns a set of unique superpixels by removing duplicates. If two patches have Jaccard similarity more than 0.5, they are concidered duplicates.

Args:

img: The input image \
method: superpixel method, one of slic, watershed, quichsift, or felzenszwalb \
param_dict: Contains parameters of the superpixel method used in the form of {'param1':[a,b,...], 'param2':[z,y,x,...], ...}. For instance {'n_segments':[15,50,80], 'compactness':[10,10,10]} for slic method.
    
Raises:

ValueError: if the segementation method is invaled.

In [None]:
%%add_to ConceptDiscovery
def _return_superpixels(self, img, method='slic',
                          param_dict=None):
    if param_dict is None:
      param_dict = {}

    if method == 'slic':
      n_segmentss = param_dict.pop('n_segments', [15, 50, 80])
      n_params = len(n_segmentss)
      compactnesses = param_dict.pop('compactness', [20] * n_params)
      sigmas = param_dict.pop('sigma', [1.] * n_params)
    elif method == 'watershed':
      markerss = param_dict.pop('marker', [15, 50, 80])
      n_params = len(markerss)
      compactnesses = param_dict.pop('compactness', [0.] * n_params)
    elif method == 'quickshift':
      max_dists = param_dict.pop('max_dist', [20, 15, 10])
      n_params = len(max_dists)
      ratios = param_dict.pop('ratio', [1.0] * n_params)
      kernel_sizes = param_dict.pop('kernel_size', [10] * n_params)
    elif method == 'felzenszwalb':
      scales = param_dict.pop('scale', [1200, 500, 250])
      n_params = len(scales)
      sigmas = param_dict.pop('sigma', [0.8] * n_params)
      min_sizes = param_dict.pop('min_size', [20] * n_params)
    else:
      raise ValueError('Invalid superpixel method!')
    unique_masks = []
    for i in range(n_params):
      param_masks = []
      if method == 'slic':
        segments = segmentation.slic(
            img, n_segments=n_segmentss[i], compactness=compactnesses[i],
            sigma=sigmas[i])
      elif method == 'watershed':
        segments = segmentation.watershed(
            img, markers=markerss[i], compactness=compactnesses[i])
      elif method == 'quickshift':
        segments = segmentation.quickshift(
            img, kernel_size=kernel_sizes[i], max_dist=max_dists[i],
            ratio=ratios[i])
      elif method == 'felzenszwalb':
        segments = segmentation.felzenszwalb(
            img, scale=scales[i], sigma=sigmas[i], min_size=min_sizes[i])
      for s in range(segments.max()):
        mask = (segments == s).astype(float)
        if np.mean(mask) > 0.001:
          unique = True
          for seen_mask in unique_masks:
            jaccard = np.sum(seen_mask * mask) / np.sum((seen_mask + mask) > 0)
            if jaccard > 0.5:
              unique = False
              break
          if unique:
            param_masks.append(mask)
      unique_masks.extend(param_masks)
    superpixels, patches = [], []
    while unique_masks:
      superpixel, patch = self._extract_patch(img, unique_masks.pop())
      superpixels.append(superpixel)
      patches.append(patch)
    return superpixels, patches

Extracts a patch out of an image.

    Args:
      image: The original image
      mask: The binary mask of the patch area

    Returns:
      image_resized: The resized patch such that its boundaries touches the
        image boundaries
      patch: The original patch. Rest of the image is padded with average value

In [None]:
def _extract_patch(self, image, mask):
    mask_expanded = np.expand_dims(mask, -1)
    patch = (mask_expanded * image + (
        1 - mask_expanded) * float(self.average_image_value) / 255)
    ones = np.where(mask == 1)
    h1, h2, w1, w2 = ones[0].min(), ones[0].max(), ones[1].min(), ones[1].max()
    image = Image.fromarray((patch[h1:h2, w1:w2] * 255).astype(np.uint8))
    image_resized = np.array(image.resize(self.image_shape, Image.BICUBIC)).astype(float) / 255
    return image_resized, patch

    Returns activations of a list of imgs.

    Args:
      imgs: List/array of images to calculate the activations of
      bottleneck: Name of the bottleneck layer of the model where activations
        are calculated
      bs: The batch size for calculating activations. (To control computational
        cost)
      channel_mean: If true, the activations are averaged across channel.

    Returns:
      The array of activations

In [None]:
def _patch_activations(self, imgs, bottleneck, bs=100, channel_mean=None):
    if channel_mean is None:
      channel_mean = self.channel_mean
    if self.num_workers:
      pool = multiprocessing.Pool(self.num_workers)
      output = pool.map(
          lambda i: self.model.run_examples(imgs[i * bs:(i + 1) * bs], bottleneck),
          np.arange(int(imgs.shape[0] / bs) + 1))
    else:
      output = []
      for i in range(int(imgs.shape[0] / bs) + 1):
        if imgs[i * bs:(i + 1) * bs].shape[0] > 0:
          output.append(
              self.model.run_examples(imgs[i * bs:(i + 1) * bs], bottleneck))
    output = np.concatenate(output, 0)
    if channel_mean and len(output.shape) > 3:
      output = np.mean(output, (1, 2))
    else:
      output = np.reshape(output, [output.shape[0], -1])
    return output

    Runs unsupervised clustering algorithm on concept actiavtations.

    Args:
      acts: activation vectors of datapoints points in the bottleneck layer.
        E.g. (number of clusters,) for Kmeans
      method: clustering method. We have:
        'KM': Kmeans Clustering
        'AP': Affinity Propagation
        'SC': Spectral Clustering
        'MS': Mean Shift clustering
        'DB': DBSCAN clustering method
      param_dict: Contains superpixl method's parameters. If an empty dict is
                 given, default parameters are used.

    Returns:
      asg: The cluster assignment label of each data points
      cost: The clustering cost of each data point
      centers: The cluster centers. For methods like Affinity Propagetion
      where they do not return a cluster center or a clustering cost, it
      calculates the medoid as the center  and returns distance to center as
      each data points clustering cost.

    Raises:
      ValueError: if the clustering method is invalid.

In [None]:
def _cluster(self, acts, method='KM', param_dict=None):
    if param_dict is None:
      param_dict = {}
    centers = None
    if method == 'KM':
      n_clusters = param_dict.pop('n_clusters', 25)
      km = cluster.KMeans(n_clusters)
      d = km.fit(acts)
      centers = km.cluster_centers_
      d = np.linalg.norm(
          np.expand_dims(acts, 1) - np.expand_dims(centers, 0), ord=2, axis=-1)
      asg, cost = np.argmin(d, -1), np.min(d, -1)
    elif method == 'AP':
      damping = param_dict.pop('damping', 0.5)
      ca = cluster.AffinityPropagation(damping)
      ca.fit(acts)
      centers = ca.cluster_centers_
      d = np.linalg.norm(
          np.expand_dims(acts, 1) - np.expand_dims(centers, 0), ord=2, axis=-1)
      asg, cost = np.argmin(d, -1), np.min(d, -1)
    elif method == 'MS':
      ms = cluster.MeanShift(n_jobs=self.num_workers)
      asg = ms.fit_predict(acts)
    elif method == 'SC':
      n_clusters = param_dict.pop('n_clusters', 25)
      sc = cluster.SpectralClustering(
          n_clusters=n_clusters, n_jobs=self.num_workers)
      asg = sc.fit_predict(acts)
    elif method == 'DB':
      eps = param_dict.pop('eps', 0.5)
      min_samples = param_dict.pop('min_samples', 20)
      sc = cluster.DBSCAN(eps, min_samples, n_jobs=self.num_workers)
      asg = sc.fit_predict(acts)
    else:
      raise ValueError('Invalid Clustering Method!')
    if centers is None:  ## If clustering returned cluster centers, use medoids
      centers = np.zeros((asg.max() + 1, acts.shape[1]))
      cost = np.zeros(len(acts))
      for cluster_label in range(asg.max() + 1):
        cluster_idxs = np.where(asg == cluster_label)[0]
        cluster_points = acts[cluster_idxs]
        pw_distances = metrics.euclidean_distances(cluster_points)
        centers[cluster_label] = cluster_points[np.argmin(
            np.sum(pw_distances, -1))]
        cost[cluster_idxs] = np.linalg.norm(
            acts[cluster_idxs] - np.expand_dims(centers[cluster_label], 0),
            ord=2,
            axis=-1)
    return asg, cost, centers

    Discovers the frequent occurring concepts in the target class.

      Calculates self.dic, a dicationary containing all the informations of the
      discovered concepts in the form of {'bottleneck layer name: bn_dic} where
      bn_dic itself is in the form of {'concepts:list of concepts,
      'concept name': concept_dic} where the concept_dic is in the form of
      {'images': resized patches of concept, 'patches': original patches of the
      concepts, 'image_numbers': image id of each patch}

    Args:
      method: Clustering method.
      activations: If activations are already calculated. If not calculates
                   them. Must be a dictionary in the form of {'bn':array, ...}
      param_dicts: A dictionary in the format of {'bottleneck':param_dict,...}
                   where param_dict contains the clustering method's parametrs
                   in the form of {'param1':value, ...}. For instance for Kmeans
                   {'n_clusters':25}. param_dicts can also be in the format
                   of param_dict where same parameters are used for all
                   bottlenecks.
    

In [None]:
def discover_concepts(self,
                        method='KM',
                        activations=None,
                        param_dicts=None):
    if param_dicts is None:
      param_dicts = {}
    if set(param_dicts.keys()) != set(self.bottlenecks):
      param_dicts = {bn: param_dicts for bn in self.bottlenecks}
    self.dic = {}  ## The main dictionary of the ConceptDiscovery class.
    for bn in self.bottlenecks:
      bn_dic = {}
      if activations is None or bn not in activations.keys():
        bn_activations = self._patch_activations(self.dataset, bn)
      else:
        bn_activations = activations[bn]
      bn_dic['label'], bn_dic['cost'], centers = self._cluster(
          bn_activations, method, param_dicts[bn])
      concept_number, bn_dic['concepts'] = 0, []
      for i in range(bn_dic['label'].max() + 1):
        label_idxs = np.where(bn_dic['label'] == i)[0]
        if len(label_idxs) > self.min_imgs:
          concept_costs = bn_dic['cost'][label_idxs]
          concept_idxs = label_idxs[np.argsort(concept_costs)[:self.max_imgs]]
          concept_image_numbers = set(self.image_numbers[label_idxs])
          discovery_size = len(self.discovery_images)
          highly_common_concept = len(
              concept_image_numbers) > 0.5 * len(label_idxs)
          mildly_common_concept = len(
              concept_image_numbers) > 0.25 * len(label_idxs)
          mildly_populated_concept = len(
              concept_image_numbers) > 0.25 * discovery_size
          cond2 = mildly_populated_concept and mildly_common_concept
          non_common_concept = len(
              concept_image_numbers) > 0.1 * len(label_idxs)
          highly_populated_concept = len(
              concept_image_numbers) > 0.5 * discovery_size
          cond3 = non_common_concept and highly_populated_concept
          if highly_common_concept or cond2 or cond3:
            concept_number += 1
            concept = '{}_concept{}'.format(self.target_class, concept_number)
            bn_dic['concepts'].append(concept)
            bn_dic[concept] = {
                'images': self.dataset[concept_idxs],
                'patches': self.patches[concept_idxs],
                'image_numbers': self.image_numbers[concept_idxs]
            }
            bn_dic[concept + '_center'] = centers[i] # most important, we store the center of all 25 concept
      bn_dic.pop('label', None)
      bn_dic.pop('cost', None)
      self.dic[bn] = bn_dic

Wrapper for computing or loading activations of random concepts.

    Takes care of making, caching (if desired) and loading activations.

    Args:
      bottleneck: The bottleneck layer name
      random_concept: Name of the random concept e.g. "random500_0"

    Returns:
      A nested dict in the form of {concept:{bottleneck:activation}}

In [None]:
def _random_concept_activations(self, bottleneck, random_concept):
    rnd_acts_path = os.path.join(self.activation_dir, 'acts_{}_{}'.format(
        random_concept, bottleneck))
    if not tf.gfile.Exists(rnd_acts_path):
      rnd_imgs = self.load_concept_imgs(random_concept, self.max_imgs)
      acts = get_acts_from_images(rnd_imgs, self.model, bottleneck)
      with tf.gfile.Open(rnd_acts_path, 'w') as f:
        np.save(f, acts, allow_pickle=False)
      del acts
      del rnd_imgs
    return np.load(rnd_acts_path).squeeze()