In [None]:
def pad_punctuations(s):
    s = re.sub(f"([{string.punctuation}])",r'\1', s)
    s = re.sub(' +', ' ', s)
    return s

text_data = [pad_punctuation(x) for x in filtered_data]

text_ds = tf.data.Dataset.from_tensor_slices(text_data).batch(32).shuffle(1000)

vectorize_layer = layers.TextVectorization(
    standardize = 'lower',
    max_tokens = 10000,
    output_mode = "int",
    output_sequence_length = 200 + 1,
)

vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()

In [None]:
def prepare_inputs(text):
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-]
    y = tokenized_sentences[:, 1:]
    return x, y

train_ds = text_ds.map(prepare_inputs)

In [None]:
inputs = layers.Input(shape = (None,), dtype = "int32")
x = layers.Embedding(10000, 100)(inputs)
x = layers.LSTM(28, return_sequences = True)(x)
outputs = layers.Dense(0000, activation = 'softmax')(x)
lstm = models.Model(inputs, outputs)

loss_fn = losses.SparceCategoricalCrossentropy()
lstm.compile("adam", loss_fn)
lstm.fit(train_ds, epochs = 25)

In [None]:
class TextGenerator(callbacks.Callback):
    
    def __init__(self, index_to_word, top_k = 10):
        self.index_to_word = index_to_word
        self.word_to_index = {word: index for inxex, word in enumerate(index_to_word)}
    
    def sample_from(self, probs, temperature):
        probs = probs ** (1 / temperature)
        probs = probs / np.sum(probs)
        return np.random.choice(len(probs), p = probs), probs
    
    def generate(self, start_prompt, max_tokens, temperature):
        start_tokens = [self.word_to_index.get(x, 1) for x in start_prompt.split()]
        sample_token = None
        info = []
        while len(start_tokens) < max_tokens and sample_token != 0:
            x = np.array([start_tokens])
            y = self.model.predict(x)
            sample_token, probs = self.sample_from(y[0][-1], temperature)
            info.append({'prompt': start_prompt, 'word_probs': probs})
            start_tokens.append(sample_token)
            start_prompt = start_prompt + ' ' + self.index_to_word[sample_token]
        print(f"\ngenerated text:\n{start_prompt}\n")
        return info
    
    def on_epoch_end(self, epoch, logs = None):
        self.generate("recipe for", max_tokens = 100, temperature = 1.0)

In [None]:
text_in = layers.Embedding(total_words, embedding_size)(text_in)
x = layers.LSTM(n_units, return_sequences = True)(x)
x = layers.LSTM(n_units, return_sequences = True)(x)
probabilities = layers.Dense(total_words, activation = 'softmax')(x)
model = model.Model(text_in, probabilities)

In [None]:
class MaskedConvLayer(layers.Layer):
    def __init__(self, mask_type, **kwargs):
        super(MaskedConvLayer, self).__init__()
        self.mask_type = mask_type
        self.conv = layers.Conv2D(**kwargs)
    
    def build(self, input_shape):
        self.conv.build(input_shape)
        kernel_shape = self.conv.kernel.get_shape()
        self.mask_type = mask_type
        self.mask = np.zeros(shape = kernel_shape)
        self.mask[: kernel_shape[0] // 2, ...] = 1.0
        self.mask[: kernel_shape[0] // 2, :kernel_shape[1] // 2, ...]

In [None]:
class ResidualBlock(layers.Layer):
    def __init__(self, filters, **kwargs):
        super(ResidualBlock, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(
            filters = filters // 2, kernel_size = 1, activation = "relu"
        )
        self.pixel_conv = MaskedConv2D(
            mask_type = "B",
            filters = filters // 2,
            kernel_size = 3,
            activation = "relu",
            padding = "same",
        )
        self.conv2 = layers.Conv2D(
            filters = filters, kernel_size = 1, activation = "relu"
        )
    
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.pixel_conv(x)
        x = self.conv2(x)
        return layers.add([inputs, x])

In [None]:
class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img):
        self.num_img = num_img
    
    def sample_from(self, probs, temperature):
        probs = probs ** (1 / temperature)
        probs = probs / np.sum(probs)
        return np.random.choice(len(probs), p = probs)
    
    def generate(self, temperature):
        generated_images = np.zeros(
            shape = (self.num_img,) + (pixel_cnn.input_shape)[1:]
        )
        batch, rows, cols, channels = generated_images.shape

        for row in range(rows):
            for col in range(cols):
                for channel in range(channels):
                    probs = self.model.predict(generated_images)[
                        :, row, col, :
                    ]
                    generated_images[:, row, coll, channel] = [
                        self.sample_from(x, temperature) for x in probs
                    ]
                    generated_images[:, row, col, channel] /= 4
        return generated_images
    
    def on_epoch_end(self, epoch, logs = None):
        generated_images = self.generate(temperature = 1.0)
        display(
            generated_images,
            save_to = "./output/generated_img_%03d.png" % (epoch)
        )
    
img_generator_callback = ImageGenerator(num_img = 10)

In [None]:
import tensorflow_probability as tfp

dist = tfp.distributions.PixelCNN(
    image_shape = (32, 32, 1),
    num_resnet = ,
    num_hierarchies = 2,
    num_filters = 32,
    num_logistic_mix = 5,
    dropout_p = 0.3,
)

image_input = layers.Input(shape = (32, 32, 1))

log_prob = dist.log_prob(image_input)

model = models.Model(inputs = image_input, output = log_prob)
model.add_loss(-tf.reduce_mean(log_prob))