In [None]:
print("starting 2_occlusion_activations_in_Inception_V1")

# Investigate Occlusion Stimuli

## Imports

In [None]:
# general imports
import numpy as np
import math
import random
import pandas as pd
import tensorflow as tf
import torch
import os
import csv
import time
import copy
import matplotlib.pyplot as plt
import argparse

In [None]:
# lucid imports
import lucid.modelzoo.vision_models as models
from render import import_model

In [None]:
# custom imports
import occlusion_utils as ut

## Load model

In [None]:
# import InceptionV1 from the Lucid modelzoo
model = models.InceptionV1()
model.load_graphdef()

## Parameters

In [None]:
batch_size_forward_pass = 128

In [None]:
# setting seeds
tf.set_random_seed(1234)
random.seed(0)
np.random.seed(0)

In [None]:
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--stimuli-dir", required=True, help="Path to save stimuli to.")
parser.add_argument("-t", "--trial-type", required=True, help="instruction_practice_catch or sampled_trials.")
args = parser.parse_args()
print(args)

In [None]:
stimuli_dir = args.stimuli_dir
trial_type = args.trial_type

## Load experiment specification

In [None]:
# read in unit specifications from csv into pandas dataframe
path_to_csv_file = os.path.join(stimuli_dir, f"layer_folder_mapping_{trial_type}.csv")
unit_specs_df = pd.read_csv(path_to_csv_file, header=1)

In [None]:
unit_specs_df

In [None]:
for _, row in unit_specs_df.iterrows():
    
    # load unit specification
    layer_number = row["layer_number"]
    kernel_size_number = row["kernel_size_number"]
    channel_number = row["channel_number"]
    feature_map_number = row["feature_map_number"]
    layer_name = row["layer_name"]
    pre_post_relu = row["pre_post_relu"]
    
    print(row)
    
    for batch in range(ut.n_batches):
        print(f"batch {batch}")
        
        start = time.time()
    
        # dataloader
        data_dir = os.path.join(
            stimuli_dir,
            ut.objective,
            trial_type,
            f"layer_{layer_number}",
            f"kernel_size_{kernel_size_number}",
            f"channel_{channel_number}",
            "natural_images",
            f"batch_{batch}",
        )
        data_loader = ut.get_data_loader(os.path.join(data_dir, "val"))

        for heatmap_size_i, occlusion_size_i, percentage_side_length_i in zip(ut.heatmap_sizes_list, ut.occlusion_sizes_list, ut.percentage_side_length_list):
            print(heatmap_size_i, occlusion_size_i, percentage_side_length_i)

            list_of_positions = ut.get_list_of_occlusion_positions(heatmap_size_i, occlusion_size_i)

            if 'session' in locals() and session is not None:
                print('Close interactive session')
                session.close()

            with tf.Graph().as_default() as graph, tf.Session() as sess:

                image = tf.placeholder(tf.float32, shape=(batch_size_forward_pass, 224, 224, 3))
                print("image.shape", image.shape)
                model_instance = import_model(model, image)
                tf_activations_list, unique_layer_str_list = ut.get_tf_activations_list(model_instance, layer_name, pre_post_relu)

                # iterate over all occlusion positions in batches
                list_of_activations_for_occlusions = []
                n_iterations = math.ceil(len(list_of_positions) / batch_size_forward_pass)
                for iteration_i in range(n_iterations):
                    cur_list_of_positions = list_of_positions[iteration_i*batch_size_forward_pass:iteration_i*batch_size_forward_pass+batch_size_forward_pass]

                    images, targets, paths = next(iter(data_loader))

                    # forward pass
                    images_np_transformed = images.numpy().transpose(0,2,3,1) # (1, 224, 224, 3)
                    images_np_transformed_copied = copy.deepcopy(images_np_transformed)
                    images_np_transformed_ready_for_occlusion = np.tile(images_np_transformed_copied, (batch_size_forward_pass, 1, 1, 1)) # (batch_size_forward_pass, 224, 224, 3)

                    # loop through occlusion positions
                    for idx, (x_start, x_end, y_start, y_end) in enumerate(cur_list_of_positions):
                        images_np_transformed_ready_for_occlusion[idx, x_start:x_end, y_start:y_end, :] = np.mean(np.mean(images_np_transformed_ready_for_occlusion[idx, x_start:x_end, y_start:y_end, :], axis=0), axis=0)

#                     # plot patches to check if occlusion worked fine
#                     plt.imshow(images_np_transformed_ready_for_occlusion[idx, :, :, :])
#                     plt.show()

                    activations_list = sess.run(tf_activations_list, {image: images_np_transformed_ready_for_occlusion})
    
                    # unpack single list item
                    activations_np = activations_list[0]
                    unit_activations = ut.get_activation_according_to_objective(ut.objective, activations_np, feature_map_number)
                    list_of_activations_for_occlusions.append(list(unit_activations))

                # after having calculated all occlusions: save activations
                final_list_of_activations_for_occlusions = [item for sublist in list_of_activations_for_occlusions for item in sublist]
                destination_dir = os.path.join(data_dir, f"{percentage_side_length_i}_percent_side_length")
                os.makedirs(destination_dir, exist_ok=True)
                filename = os.path.join(destination_dir, f"activations_for_occlusions_of_{percentage_side_length_i}_percent.npy")
                np.save(filename, final_list_of_activations_for_occlusions[:len(list_of_positions)+1])
                print(f"activations saved under{filename}")

                print('Done!!!')
        end = time.time()
        print(f"       time for one batch: {end-start}")
print("Completely done!")

In [None]:
print("done with 2_occlusion_activations_in_Inception_V1")