In [1]:
from __future__ import absolute_import, division, print_function
import collections
import json 
import random
import re 

import modeling
import six 
import tensorflow as tf 

In [5]:
import modeling
import copy


class BertModel(object):

    """BERT model ("Bidirectional Encoder Representations for Transformers").
    
    Example usage:
        
    
        ```Python 
        # Already been converted into WordPiece token ids 
        input_ids = tf.constant([[31, 51, 99], [15, 5, 0]])
        input_mask = tf.constant([[1, 1, 1], [1, 1, 1]])
        input_type_ids = tf.constant([[0, 0, 1], [0, 2, 0]])

        config = modeling.BertConfig(vocab_size = 32000, hidden_size = 512,
        num_hidden_layers=8, num_attention_heads=6, intermediate_size=1024)

        model = modeling.BertModel(config=config, is_training = True,
        input_ids=input_ids, input_mask=input_mask, token_type_ids=token_type_ids
        )

        label_embeddings = tf.compat.v1.get_variable(...)
        pooled_output = model.get_pooled_output()
        logits = tf.matmul(pooled_output, label_embeddings)

        ...
        ```
    """


    def __init__(self,
                 config,
                 is_training,
                 input_ids,
                 input_mask=None,
                 token_type_ids=None,
                 use_one_hot_embeddings=False,
                 scope=None) -> None:
        

        """Constructor for BertModel.
        
        Args:
            config: `BertConfig` instance.
            is_training: bool. True for training model, false for eval model. Controls 
                whether dropout will be applied.

            input_ids: int32 Tensor of shape [batch_size, seq_len]
            input_mask: (optional) int32 Tensor of shape [batch_size, seq_length]
            token_type_ids: (optional) int32 Tensor of shape [batch_size, seq_length]
            use_one_hot_embeddings: (Optional) bool. Whether to use one-hot word 
                embeddings or tf.embedding_lookup() for the word embeddings.

            scope: (optional) variable scope. Defaults to "bert".

        Raises:
            ValueError: The config is invalid or one of the input tensor shapes
            is invalid.
        
        """

        config = copy.deepcopy(config)
        # print(config)
        # print(config.vocab_size)
        if not is_training:
            config.hidden_dropout_prob = 0.0 
            config.attention_probs_dropout_prob = 0.0

        input_shape = modeling.get_shape_list(input_ids, expected_rank=2)
        print("input shape", input_shape)

        batch_size = input_shape[0]
        seq_length = input_shape[1]
        print(f"Batch size: {batch_size}, sequence length: {seq_length}")

        if input_mask is None:
            input_mask = tf.ones(shape=[batch_size, seq_length], dtype=tf.int32)

        with tf.compat.v1.variable_scope(scope, default_name="bert"):
            with tf.compat.v1.variable_scope("embeddings"):

                print(config.vocab_size)

                # Perform embedding lookup on the word ids.
                (self.embedding_output, self.embedding_table) = modeling.embedding_lookup(
                    input_ids=input_ids,
                    vocab_size=config.vocab_size,
                    embedding_size=config.hidden_size,
                    initializer_range=config.initializer_range,
                    word_embedding_name="word_embeddings",
                    use_one_hot_embeddings=use_one_hot_embeddings
                )

                print(self.embedding_output)
                print(self.embedding_table)

                # Add positional embeddings and token type embeddings, then layer 
                # normalize and perform dropout.
                
                self.embedding_output = modeling.embedding_postprocessor(
                    input_tensor=self.embedding_output,
                    use_token_type=True,
                    token_type_ids=token_type_ids,
                    token_type_vocab_size=config.type_vocab_size,
                    token_type_embedding_name="token_type_embeddings",
                    use_position_embeddings=True,
                    position_embedding_name="position_embeddings",
                    initializer_range=config.initializer_range,
                    max_position_embeddings=config.max_position_embeddings,
                    dropout_prob=config.hidden_dropout_prob
                )
                

                print("Embedding Output", self.embedding_output)

            with tf.compat.v1.variable_scope("encoder"):

                # This converts a 2D mask of shape [batch_size, seq_length] to a 3D 
                # mask of shape [batch_size, seq_length, seq_length] which is used 
                # for the attention scores.

                attention_mask = modeling.create_attention_mask_from_input_mask(
                    from_tensor=input_ids,
                    to_mask=input_mask
                )

                print("attention_mask", attention_mask)

                # Run the stocked transformer.
                # `Sequence_output` shape = [batch_size, seq_length, hidden_size]
                
                self.all_encoder_layers = modeling.transformer_model(
                input_tensor=self.embedding_output,
                attention_mask=attention_mask,
                hidden_size=config.hidden_size,
                num_hidden_layers=config.num_hidden_layers,
                num_attention_heads=config.num_attention_heads,
                intermediate_size=config.intermediate_size,
                intermediate_act_fn= modeling.get_activation(config.hidden_act),
                hidden_dropout_prob=config.hidden_dropout_prob,
                attention_probs_dropout_prob=config.attention_probs_dropout_prob,
                initializer_range=config.initializer_range,
                do_return_all_layers=True)


            self.sequence_output = self.all_encoder_layers[-1]

            # The "pooler" converts the encoded sequence tensor of shape 
            # [batch_size, seq_length, hidden_size] to a tensor of shape 
            # [batch_size, hidden_size]. This is neccessary for segment-level 
            # (or segment-pair-level) classification task where we need a fixed 
            # dimensional representation of the segment.

            with tf.compat.v1.variable_scope("pooler"):

                # we "pool" the model by simply taking the hidden state corresponding 
                # to the first token. we assume that this has been pre-trained.

                first_token_tensor = tf.squeeze(self.sequence_output[:, 0:1, :], axis=1)
                self.pooled_output = tf.keras.layers.Dense(
                    units=config.hidden_size,
                    activation=tf.tanh,
                    kernel_initializer= modeling.create_initializer(config.initializer_range)
                )(first_token_tensor)




    def get_pooled_output(self):
        return self.pooled_output
    

    def get_sequence_output(self):
        """Gets final hidden layer of encoder.
        

        Returns:
            float Tensor of shape [batch_size, seq_length, hidden_size] corresponding 
            to the final hidden of the transformer encoder.
        
        """

        return self.sequence_output
    

    def get_all_encoder_layers(self):
        return self.all_encoder_layers
    

    def get_embedding_output(self):

        """Gets output of the embedding lookup (i.e, input to the transformer.)
        
        Returns:
            float Tensor of shape [batch_sie, seq_length, hidden_size] correspoinding 
            to the output of the embedding layer, after summing the word 
            embedding with the positional embeddings and the token type embedings.
            the performing layer normalization. This is the input to the transformer.
        
        
        """

        return self.embedding_output
    


    def get_embedding_table(self):
        return self.embedding_table




In [6]:
# Example usage
input_ids = tf.constant([[31, 51, 99], [15, 5, 0]])
input_mask = tf.constant([[1, 1, 1], [1, 1, 1]])
token_type_ids = tf.constant([[0, 0, 1], [0, 2, 0]])

config = modeling.BertConfig(vocab_size=32000, hidden_size=512, num_hidden_layers=8, num_attention_heads=8, intermediate_size=1024)
model = BertModel(config=config, is_training=True, input_ids=input_ids, input_mask=input_mask, token_type_ids=token_type_ids)



input shape [2, 3]
Batch size: 2, sequence length: 3
32000
tf.Tensor(
[[[-0.03295476 -0.01691835  0.00876137 ... -0.0124014  -0.01919353
   -0.01375115]
  [-0.01643722 -0.0030208  -0.00078725 ... -0.02569843  0.00738917
    0.01281514]
  [-0.00244974  0.01686387  0.01088041 ... -0.0246888  -0.00314107
    0.00057836]]

 [[ 0.00209959 -0.0097736  -0.00558968 ... -0.00036225  0.01715203
    0.00660457]
  [ 0.01441314 -0.00921718 -0.01999765 ...  0.00740988 -0.01414874
    0.0012064 ]
  [-0.00656144  0.00375049 -0.00575585 ... -0.01238892 -0.03380669
    0.00518762]]], shape=(2, 3, 512), dtype=float32)
<tf.Variable 'bert_1/embeddings/word_embeddings:0' shape=(32000, 512) dtype=float32, numpy=
array([[-0.00656144,  0.00375049, -0.00575585, ..., -0.01238892,
        -0.03380669,  0.00518762],
       [ 0.02966835,  0.01244527, -0.00564474, ...,  0.02363148,
        -0.00556675,  0.02400868],
       [ 0.01303717, -0.01035753,  0.01226201, ..., -0.02942312,
         0.03174744,  0.00731598],
 

In [80]:
import tensorflow as tf
from absl import flags

# Define a dummy flag to avoid the 'unknown command line flag' error 
# flags.DEFINE_string('f', '', 'kernel')

class BertModelTest(tf.test.TestCase):

    class BertModelTester(object):

        def __init__(self,
                     parent,
                     batch_size=13,
                     seq_length=7,
                     is_training=True,
                     use_input_mask=True,
                     use_token_type_ids=True,
                     vocab_size=99,
                     hidden_size=32,
                     num_hidden_layers=5,
                     num_attention_heads=4,
                     intermediate_size=37,
                     hidden_act="gelu",
                     hidden_dropout_prob=0.1,
                     attention_probs_dropout_prob=0.1,
                     max_position_embeddings=512,
                     type_vocab_size=16,
                     initializer_range=0.02,
                     scope=None) -> None:
            
            self.parent = parent
            self.batch_size = batch_size
            self.seq_length = seq_length
            self.is_training = is_training
            self.use_input_mask = use_input_mask
            self.use_token_type_ids = use_token_type_ids
            self.vocab_size = vocab_size
            self.hidden_size = hidden_size
            self.num_hidden_layers = num_hidden_layers
            self.num_attention_heads = num_attention_heads
            self.intermediate_size = intermediate_size
            self.hidden_act = hidden_act
            self.hidden_dropout_prob = hidden_dropout_prob
            self.attention_probs_dropout_prob = attention_probs_dropout_prob
            self.max_position_embeddings = max_position_embeddings
            self.type_vocab_size = type_vocab_size
            self.initializer_range = initializer_range
            self.scope = scope

        def create_model(self):
            input_ids = BertModelTest.ids_tensor([self.batch_size, self.seq_length],
                                                 self.vocab_size)
            print("input_ids", input_ids)

            # input_mask = None 
            # if self.use_input_mask:
            #     input_mask = BertModelTest.ids_tensor(
            #         [self.batch_size, self.seq_length], vocab_size=2
            #     )

            #     print("input_mask", input_mask)

            

            

    @staticmethod
    def ids_tensor(shape, vocab_size):
        """Creates a random tensor of the given shape and vocab size."""
        return tf.random.uniform(shape, maxval=vocab_size, dtype=tf.int32)

    def test_create_model(self):
        tester = self.BertModelTester(parent=self)
        tester.create_model()

        

if __name__ == "__main__":
    tf.test.main()


Running tests under Python 3.12.0: c:\Users\Manjusha Kumari\AppData\Local\Programs\Python\Python312\python.exe
[ RUN      ] BertModelTest.test_create_model


input_ids Tensor("random_uniform:0", shape=(13, 7), dtype=int32)
INFO:tensorflow:time(__main__.BertModelTest.test_create_model): 0.0s


I1006 18:44:36.937553 14964 test_util.py:2634] time(__main__.BertModelTest.test_create_model): 0.0s
[       OK ] BertModelTest.test_create_model
[ RUN      ] BertModelTest.test_session
[  SKIPPED ] BertModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.003s

OK (skipped=1)


SystemExit: False

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [77]:
import six

class Flattener:
    @classmethod
    def flatten_recursive(cls, item):
        """Flattens (potentially nested) a tuple/dictionary/list to a list."""
        output = []
        if isinstance(item, list):
            output.extend(item)
        elif isinstance(item, tuple):
            output.extend(list(item))
        elif isinstance(item, dict):
            for v in item.values():
                output.append(v)
        else:
            return [item]

        flat_output = []
        for x in output:
            flat_output.extend(cls.flatten_recursive(x))
        return flat_output

# Example usage:
nested_structure = [1, (2, 3), {'a': 4, 'b': [5, 6]}, 7]
flattened = Flattener.flatten_recursive(nested_structure)
print(flattened)  # Output: [1, 2, 3, 4, 5, 6, 7]


[1, 2, 3, 4, 5, 6, 7]


In [None]:

def get_unreachable_ops(cls, graph, outputs):

    """Finds all of the tensors in graph that are unreachable from outputs."""

    outputs = cls.flatten_recursive(outputs)
    print("outputs", outputs)
    output_to_op = collections.defaultdict(list)
    print("output_to_op", output_to_op)
    op_to_all = collections.defaultdict(list)
    assign_out_to_in = collections.defaultdict(list)

    for op in graph.get_operations():
        print("op", op)
        for x in op.inputs:
            print("x: ", x)
            op_to_all[op.name].append(x.name)
            print("op_to_all: ", op_to_all)

        for y in op.outputs:
            print("y", y)
            output_to_op[y.name].append(op.name)
            print("output_to_op: %s" % output_to_op)

            op_to_all[op.name].append(y.name)
            print("op_to_all %s" % op_to_all)

        if str(op.type) == "Assign":
            print("match the assign")
            for y in op.outputs:
                for x in op.inputs:
                    assign_out_to_in[y.name].append(x.name)
                    print("assign_out_to_in %s" % assign_out_to_in)



    assign_groups = collections.defaultdict(list)
    for out_name in assign_out_to_in.keys():
        print("out_name: ", out_name)

        name_group = assign_out_to_in[out_name]
        for n1 in name_group:
            assign_groups[n1].append(out_name)

            for n2 in name_group:
                if n1 != n2:
                    assign_groups[n1].append(n2)
                    print("assign_group", assign_groups)


    seen_tensors = {}
    stack = [x.name for x in outputs]
    print("stack", stack)
    while stack:
        name = stack.pop()
        print("name", name)

        if name in seen_tensors:
            continue

        seen_tensors[name] = True
        print("seen_tensors: %s" % seen_tensors)

        if name in output_to_op:
            for op_name in output_to_op[name]:
                print("op_name", op_name)

                if op_name in op_to_all:
                    for input_name in op_to_all[op.name]:
                        print("input_name", input_name)

                        if input_name not in stack:
                            stack.append(input_name)
                            print("stack", stack)



        expended_names = []
        if name in assign_groups:
            print("name", name)
            
            for assign_name in assign_groups[name]:
                print("assign_name", assign_name)
                expended_names.append(assign_name)


        for expended_name in expended_names:
            print("expended name", expended_name)

            if expended_name not in stack:
                stack.append(expended_name)

                print("stack", stack)




    unreachable_ops = []
    for op in graph.get_operations():
        is_unreachable = False 
        
        print("op: ", op)

        all_names = [x.name for x in op.inputs] + [x.name for x in op.outputs]
        print("all_names", all_names)


        for name in all_names:
            if name not in seen_tensors:
                is_unreachable = True

        if is_unreachable:
            unreachable_ops.append(op)
            print("unreachable_ops", unreachable_ops)


    return unreachable_ops




                





    









# Define a simple graph 
graph = tf.Graph()

with graph.as_default():
    a = tf.constant(1, name='a')
    b = tf.constant(2, name='b')
    c = tf.add(a, b, name='c')
    d = tf.multiply(c, b, name='d')


# Example usage
outputs = [d]
unreachable_ops = get_unreachable_ops(cls=Flattener, graph=graph, outputs=outputs)
# print("Unreachable operations:", [op.name for op in unreachable_ops])
unreachable_ops






In [51]:
# stack =  ['d:0']
this_is_list = {}
stack =  ['we']
a = stack.pop()
b = this_is_list[a] = True 
print(b)

True


In [73]:
import tensorflow as tf
import re



def assert_all_tensors_reachable(sess, outputs):
    """Checks that all the tensors in the graph are reachable from outputs."""
    graph = sess.graph

    ignore_strings = [
        "^.*/assert_less_equal/.*$",
        "^.*/dilation_rate$",
        "^.*/Tensordot/concat$",
        "^.*/Tensordot/concat/axis$",
        "^testing/.*$",
    ]

    ignore_regexes = [re.compile(x) for x in ignore_strings]

    unreachable = get_unreachable_ops(cls=None, graph=graph, outputs=outputs)
    print("unreachable", unreachable)

    filtered_unreacable = []

    for x in unreachable:
        do_ignore = False

        for r in ignore_regexes:
            m = r.match(x.name)
            if m is not None:
                do_ignore = True

        if do_ignore:
            continue

        filtered_unreacable.append(x)
        print("filtered_unreachable", filtered_unreacable)


    unreachable = filtered_unreacable

    # Debugging print statment 
    print("Unreachable ops:", [x.name for x in unreachable])

    self.assertEqual(
        len(unreachable), 0 , "the followeing ops are unreachable: %s" % 
        (" ".join([x.name for x in unreachable]))
    )


    















# Define a simple graph
graph = tf.Graph()
with graph.as_default():
    a = tf.constant(2, name='a')
    b = tf.constant(3, name='b')
    c = tf.add(a, b, name='c')

# Create a session and run the assert function
with tf.compat.v1.Session(graph=graph) as sess:
    outputs = [c]
    assert_all_tensors_reachable(sess, outputs)


unreachable []
Unreachable ops: []


In [81]:
import tensorflow as tf
import random

def ids_tensor(cls, shape, vocab_size, rng=None, name=None):
    """Creates a random int32 tensor of the shape within the vocab size."""
    if rng is None:
        rng = random.Random()

    total_dims = 1
    for dim in shape:
        total_dims *= dim 

    values = [rng.randint(0, vocab_size - 1) for _ in range(total_dims)]

    return tf.constant(value=values, dtype=tf.int32, shape=shape, name=name)

# Example usage
class DummyClass:
    pass

shape = (2, 3)
vocab_size = 10
tensor = ids_tensor(DummyClass, shape, vocab_size)

print(tensor)


Tensor("Const:0", shape=(2, 3), dtype=int32)
