In [None]:
import os
if os.getcwd().split('/')[-1] != 'tcav':
    print(os.getcwd())
    os.chdir('../')
    os.chdir('../')
    os.chdir('../')    
    print(os.getcwd())

In [None]:
from tcav.utils import create_session
from tensorflow.io import gfile
from tcav.model import ModelWrapper, KerasModelWrapper
from tcav.tcav_examples.discrete.kdd99_activation_generator import KDD99DiscreteActivationGenerator
from tcav.tcav_examples.discrete.kdd99_model_wrapper import KDD99KerasModelWrapper
import tensorflow as tf
from transformers import TFRobertaModel
from tcav.activation_generator import ActivationGeneratorBase
import tokenizers

In [None]:
# Make source directory, 
source_dir = "c://Users//daoha//Downloads//explainableAI//project//tcav//"
working_dir = source_dir
acts_dir = os.path.join(working_dir, "activations")
gfile.makedirs(acts_dir)
cav_dir = os.path.join(working_dir, "cav")
gfile.makedirs(cav_dir)

In [None]:
MAX_LEN = 96
PATH = 'c://Users//daoha//Downloads//explainableAI//project//tcav//roBERTaFiles//'

tokenizer = tokenizers.ByteLevelBPETokenizer(
    vocab=PATH+'vocab-roberta-base.json',
    merges=PATH+'merges-roberta-base.txt',
    lowercase=True,
    add_prefix_space=True
)

sentiment_id = {'positive': 1313, 'negative': 2430, 'neutral': 7974}

In [None]:
class SentimentModelWrapper(ModelWrapper):
    def __init__(self, sess, model_path, custom_objects=None):
        self.sess = sess
        super(SentimentModelWrapper, self).__init__()
        self.import_keras_model(model_path, custom_objects=custom_objects)
        self.optimiezer = tf.keras.optimizers.Adam(learning_rate=0.001)
        
        # Using SparseCategoricalCrossEntropy here
        self.y_input = tf.compat.v1.placeholder(tf.int64, shape=[None])
        self.y_true = tf.one_hot(self.y_input, depth=96)
        self.loss = self.model.loss_functions[0](self.y_true, self.model.outputs[0])
        self._make_gradient_tensors()

    def import_keras_model(self, model_path, custom_objects=None):
        "Load the model and fetch input/output tensors."
        self.ends = {}
        self.model = tf.keras.models.load_model(model_path, custom_objects=custom_objects)
        self.get_bottleneck_tensors()
        self.get_inputs_and_outputs_and_ends()

    def get_bottleneck_tensors(self):
        self.bottleneck_tensors = {}
        bottleneck_layers = ['pooler_output', 'conv1d', 'conv1d_1']
        for layer in bottleneck_layers:
            if layer == 'pooler_output':
                pooler_layer = self.model.get_layer('tf_roberta_model')
                self.bottleneck_tensors[layer] = pooler_layer.output.pooler_output
                print(type(self.bottleneck_tensors[layer]))
            else:
                self.bottleneck_tensors[layer] = self.model.get_layer(layer).output
                print(type(self.bottleneck_tensors[layer]))

    def get_inputs_and_outputs_and_ends(self):
        "Fetch input and output tensors."
        self.ends['input'] = self.model.inputs[0]
        self.ends['prediction'] = self.model.outputs[0]

    def _make_gradient_tensors(self):
        """Makes gradient tensors for all bottleneck tensors."""
        print(self.bottleneck_tensors)
        self.bottlenecks_gradients = {}
        for bn_name, bn_tensor in self.bottleneck_tensors.items():
            if bn_tensor is not None:
                gradient_tensor = tf.gradients(
                    ys=self.loss,
                    xs=bn_tensor
                )[0]
                self.bottlenecks_gradients[bn_name] = gradient_tensor
            else:
                raise ValueError(f"{bn_name} tensor is None. Check model architecture.")
            


In [None]:
from tcav.activation_generator import ActivationGeneratorBase
import numpy as np
import pandas as pd

class SentimentActivationGenerator(ActivationGeneratorBase):
    def __init__(self, model, tokenizer, sentiment_id, source_dir, acts_dir, max_examples=500):
        super(SentimentActivationGenerator, self).__init__(model, acts_dir, max_examples)
        self.source_dir = source_dir
        self.tokenizer = tokenizer
        self.sentiment_id = sentiment_id

    def get_examples_for_concept(self, concept):
        concept_folder = os.path.join(self.source_dir, concept)
        concept_file = os.path.join(concept_folder, concept + '.csv')
        df = pd.read_csv(concept_file)
        examples = self.transform_data(df)
        return examples

    def transform_data(self, examples):
        # Initialize the input arrays
        input_ids = np.ones((len(examples), MAX_LEN), dtype='int32')
        attention_mask = np.zeros((len(examples), MAX_LEN), dtype='int32')
        token_type_ids_t = np.zeros((len(examples), MAX_LEN), dtype='int32') 

        # Tokenize the examples and set up the input arrays
        for k in range(len(examples)):
            text = " " + " ".join(examples.loc[k,'text'].split())
            enc = self.tokenizer.encode(text)
            s_tok = self.sentiment_id[examples.loc[k,'sentiment']]
            input_ids[k, :len(enc.ids)+5] = [0] + enc.ids + [2, 2] + [s_tok] + [2]
            attention_mask[k, :len(enc.ids)+5] = 1

        return [input_ids, attention_mask, token_type_ids_t]


In [None]:
model_path = os.path.join(source_dir, "v0-roberta-0.h5")
sess = create_session()
custom_objects = {'TFRobertaModel': TFRobertaModel}

mymodel = SentimentModelWrapper(sess, model_path, custom_objects=custom_objects)

In [None]:
# Create activation generator
act_gen = SentimentActivationGenerator(model=mymodel, tokenizer=tokenizer, sentiment_id=sentiment_id, source_dir=source_dir, acts_dir=acts_dir, max_examples=200)

In [None]:
from tcav.utils import create_session
import tcav.utils_plot as utils_plot # utils_plot requires matplotlib
from tcav.tcav import TCAV
import absl
absl.logging.set_verbosity(0)

target = "positive"
bottlenecks = ["conv1d", "conv1d_1"]
concepts = ["positive_words", "neutral_words"]
alphas = [0.01]



my_tcav = TCAV(create_session,
                   target,
                   concepts,
                   bottlenecks,
                   act_gen,
                   alphas,
                   cav_dir=cav_dir,
                   num_random_exp=5)


results = my_tcav.run()