In [1]:
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layer




In [28]:
def ischar(token):
    """
    Returns true only if provided 'token' is a string with a length of exactly 1.
    """
    if not isinstance(token, str):
        return False
    return len(token) == 1

In [29]:
def list_len_trim_to(data:list, length:int):
    """
    If a list is longer than the provided length, it will be returned trimmed to the provided length.
    If the list is shorter or equal in length, it will be returned unchanged.
    """
    return data[:min(len(data), length)]

def list_len_pad_to(data:list, length:int, pad):
    """
    If a list is shorter than the provided length, it will be returned padded to the provided length by the pad token.
    If the list is longer or equal in length, it will be returned unchanged.
    """
    return data + [pad] * (length - len(data))

def list_len_force_to(data:list, length:int, pad):
    """
    If a list is longer than the provided length, it will be returned trimmed to the provided length.
    If a list is shorter than the provided length, it will be returned padded to the provided length by the pad token.
    If the list is equal in length, it will be returned unchanged.
    """
    data = list_len_trim_to(data, length)
    return list_len_pad_to(data, length, pad)

In [76]:
class LanguageTokenSet():
    def __init__(self, alphabet_tokens:list, pad_token:str):
        # Ensure the pad_token is a single character.
        if not ischar(pad_token):
            raise ValueError(f"'pad_token' must be a single character string. Got {pad_token}.")
        self.pad_token = pad_token

        # Get the alphabet_tokens as both a list and a set.
        # The list will maintain the order of the tokens. (Later stored.)
        # The set will allow for fast error checking. (Discarded after.)
        if isinstance(alphabet_tokens, list):
            alph_list = [x for x in alphabet_tokens] # deepcopy
            alph_set = set(alphabet_tokens)
        elif isinstance(alphabet_tokens, set):
            alph_list = list(alphabet_tokens)
            alph_set = alphabet_tokens
        elif isinstance(alphabet_tokens, str):
            alph_list = [*alphabet_tokens]
            alph_set = set(alph_list)
        else:
            raise ValueError(f"'alphabet_tokens' should be provided as either a list, set or string. Got type '{type(alphabet_tokens)}'.")
        
        # Check for errors in alphabet.

        # List and set size should be the same. If different, this indicates a duplicate token.
        if len(alph_list) != len(alph_set):
            raise ValueError(f"{len(alph_list)-len(alph_set)} duplicate tokens detected in 'alphabet_tokens'.")
        # Pad token cannot be in alphabet.
        if self.pad_token in alph_set:
            raise ValueError(f"'pad_token' {self.pad_token} cannot be in alphabet.")
        # Check that each token is a single character.
        for x in alph_list:
            if not ischar(x):
                raise ValueError(f"Each entry of 'alphabet_tokens' must be a single character. Got '{x}'.")
        
        # Alphabet tokens validated. Store.
        self.alphabet_tokens = alph_list

        # Instantiate encoder & decoder.
        self._encoder = layer.StringLookup(vocabulary=self.alphabet_tokens, oov_token=self.pad_token, output_mode="int", invert=False)
        self._decoder = layer.StringLookup(vocabulary=self.alphabet_tokens, oov_token=self.pad_token, output_mode="int", invert=True)

    @property
    def token_count(self):
        """
        The number of tokens in the language, including the pad token.
        """
        return self.encoder.vocabulary_size()
    
    def encode(self, data:str, shape:tuple):
        """
        Encodes a Python string into a TensorFlow tensor.
        """
        # Different rank shapes need to be handled differently.
        if len(shape) == 1:
            # Just need to ensure that the resulting list is of the proper size.
            data = list_len_force_to([*data], shape[0], self.pad_token)
        elif len(shape) == 2:
            # Split the input by whitespace.
            data = [list(x) for x in data.split()]
            # Ensure each word is the proper length.
            data = [list_len_force_to(x, shape[1], self.pad_token) for x in data]
            # Ensure there are the proper number of "words".
            data = list_len_force_to(data, shape[0], [self.pad_token]*shape[1])
        else:
            raise ValueError(f"Unsupported shape rank={len(shape)}.")
        # Send to encoder.
        return self._encoder(data)
    
    def decode(self, data):
        """
        Encodes a TensorFlow tensor, NumPy array or Python array to a Python string.
        """
        # Send to decoder.
        data = self._decoder(data).numpy()
        # Different rank shapes need to be handled differently.
        if len(data.shape) == 1:
            data = b''.join(data).decode("utf-8").rstrip(self.pad_token)
        elif len(data.shape) == 2:
            # Join letters.
            data = [b''.join(x).decode("utf-8").rstrip(self.pad_token) for x in data]
            # Join words.
            data = ' '.join([x for x in data if len(x) > 0])
        else:
            raise ValueError(f"Unsupported shape rank={len(data.shape)}.")
        return data

In [77]:
chat = LanguageTokenSet("CHAT", '-')
a = chat.encode("CATCH THAT CAT", shape=(3,5))
print(a)
chat.decode(a)

tf.Tensor(
[[1 3 4 1 2]
 [4 2 3 4 0]
 [1 3 4 0 0]], shape=(3, 5), dtype=int64)


'CATCH THAT CAT'

In [89]:
data = chat._encoder([[*"TA-HA"], [*"-TACA"], [*"AT---"]])
print(data)

# Create a binary mask for the data where:
# - 0 represents pad tokens.
# - 1 represents any other token.
# Pad tokens are always represented by 0 and all other tokens are positive so:
mask = tf.math.sign(data)
print(mask)

# Apply column-wise denoising.

# Gather the starting values of each row of the mask.
# This will highlight all rows starting with pad tokens.
col = mask[:, 0]
print(col)
# Apply the cumulative product to the isolated column.
# Since 1*1=1, 1*0=0, 0*0=0, the column is left with a sequence of 1's followed only by 0's.
# This will allow us to remove all values after first empty word.
col = tf.math.cumprod(col, axis=-1)
print(col)
# Multiply this isolated column back over the mask.
mask = tf.multiply(mask, col[..., tf.newaxis])
print(mask)

# Apply row-wise denoising.
# Use cumulative product for the same reasoning as in column-wise denoising.
mask = tf.math.cumprod(mask, axis=-1)
print(mask)

# Multiply the (binary) mask back over the data.
result = tf.math.multiply(data, mask)
print(result)


tf.Tensor(
[[4 3 0 2 3]
 [0 4 3 1 3]
 [3 4 0 0 0]], shape=(3, 5), dtype=int64)
tf.Tensor(
[[1 1 0 1 1]
 [0 1 1 1 1]
 [1 1 0 0 0]], shape=(3, 5), dtype=int64)
tf.Tensor([1 0 1], shape=(3,), dtype=int64)
tf.Tensor([1 0 0], shape=(3,), dtype=int64)
tf.Tensor(
[[1 1 0 1 1]
 [0 0 0 0 0]
 [0 0 0 0 0]], shape=(3, 5), dtype=int64)
tf.Tensor(
[[1 1 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]], shape=(3, 5), dtype=int64)
tf.Tensor(
[[4 3 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]], shape=(3, 5), dtype=int64)


In [97]:
def denoise_language(data):
    """
    Denoises language data when it is represented as a tensor of integers.
    """
    # Create a binary mask for the data where:
    # - 0 represents pad tokens.
    # - 1 represents any other token.
    # Pad tokens are always represented by 0 and all other tokens are positive so:
    mask = tf.math.sign(data)

    # Apply column-wise denoising.
    if len(data.shape) > 1:
        # Gather the starting values of each row of the mask.
        # This will highlight all rows starting with pad tokens.
        col = mask[:, 0]
        # Apply the cumulative product to the isolated column.
        # Since 1*1=1, 1*0=0, 0*0=0, the column is left with a sequence of 1's followed only by 0's.
        # This will allow us to remove all values after first empty word.
        col = tf.math.cumprod(col, axis=-1)
        # Multiply this isolated column back over the mask.
        mask = tf.multiply(mask, col[..., tf.newaxis])

    # Apply row-wise denoising.
    # Use cumulative product for the same reasoning as in column-wise denoising.
    mask = tf.math.cumprod(mask, axis=-1)

    # Multiply the (binary) mask back over the data.
    result = tf.math.multiply(data, mask)
    return result

tf.Tensor(
[[4 3 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]], shape=(3, 5), dtype=int64)
tf.Tensor([4 3 0 0 0], shape=(5,), dtype=int64)


In [None]:
@tf.keras.utils.register_keras_serializable(package="CGAEL", name="ArgmaxLayer")
class ArgmaxLayer(layer.Layer):
    def __init__(self):
        super(ArgmaxLayer, self).__init__()

    def call(self, data, axis=-1):
        return tf.math.argmax(data, axis=axis)

In [None]:
@tf.keras.utils.register_keras_serializable(package="CGAEL", name="LanguageDenoiseLayer")
class LanguageDenoiseLayer(layer.Layer):
    def __init__(self):
        super(ArgmaxLayer, self).__init__()

    def call(self, data):
        return denoise_language(data)