<a href="https://colab.research.google.com/github/1Ramirez7/Applied-Time-Series-Analysis-Notebook/blob/main/content_project/curri_Content.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Module 6 model

In [None]:
# cell 1

# ============================================
# Flags / Configuration
# ============================================
restart = True

# ============================================
# Imports
# ============================================
import os
import re
import io
import random
import string
import contextlib
import numpy as np
import tensorflow as tf

# from Google Colab drive
from google.colab import drive
drive.mount('/content/drive')

!ls /content/drive/MyDrive

# Keras / TensorFlow imports
from tensorflow.keras.layers import TextVectorization, Embedding, LSTM, GRU, Dense
from tensorflow.keras import Sequential, Model
from tensorflow.keras.losses import SparseCategoricalCrossentropy

# For text wrapping in final output
import textwrap

# Project (Google Drive) save path
project_path = "/content/drive/MyDrive/content_models/"

# For vectorizer adaptation
BATCH_SIZE_FOR_ADAPT = 1024


Mounted at /content/drive
'Colab Notebooks'				  'Financial accounting key terms.gdoc'
 content_models					  'Literature Review Env #1.gdoc'
'Copy of Righteous Slice Budget Activity.gsheet'  'Term question.gdoc'
'ECON 388 Homework #4.gdoc'			  'Untitled document.gdoc'
'ECON 388 Homework #5.gdoc'


spacer text


In [None]:
# cell 2

# ============================================
# Preprocessing Function
# ============================================
def preprocess_text(text):
    """
    Cleans and normalizes raw text:
      - Removes Gutenberg references
      - Converts to lowercase
      - Removes punctuation
      - Removes extra whitespace
    """
    # Remove references to Project Gutenberg
    text = text.replace("Project Gutenberg", "")
    text = text.replace("Gutenberg", "")

    # Convert to lowercase
    #text = text.lower()

    # Remove punctuation (keep only letters, digits, and whitespace)
    #text = re.sub(r'[^a-z0-9\s]', '', text)

    # Replace multiple whitespace with single space
    text = re.sub(r'\s+', ' ', text).strip()

    return text



In [None]:
# cell 3
# ============================================
# Single function to get text by filename + URL
# ============================================
def get_author_text(filename, file_url, local_dir='saved_files'):
    """
    - Checks if filename is already in local_dir.
    - If not, downloads from file_url.
    - Reads it, then applies preprocess_text.
    - Returns the cleaned text or None on error.
    """
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)

    local_path = os.path.join(local_dir, filename)

    # Download if not found locally
    if not os.path.exists(local_path):
        print(f"File '{filename}' not found locally. Downloading it.")
        try:
            downloaded_path = tf.keras.utils.get_file(filename, file_url)
            with open(downloaded_path, 'rb') as src, open(local_path, 'wb') as dst:
                dst.write(src.read())
        except Exception as e:
            print(f"Could not download {filename} from {file_url}: {e}")
            return None
    else:
        print(f"File '{filename}' found locally. Using it.")

    # Read file and preprocess
    try:
        with open(local_path, 'r', encoding='utf-8') as f:
            raw_text = f.read()
        return preprocess_text(raw_text)
    except Exception as e:
        print(f"Error reading {filename}: {e}")
        return None

# ============================================
# Load text for Northanger, Emma, and potter
# ============================================
if restart:
    # URLs
    northanger_url     = "https://raw.githubusercontent.com/1Ramirez7/Machine-Learning/refs/heads/main/content_project/rowling/harrybookone.txt"

    northanger_text = get_author_text('northanger.txt', northanger_url)


    # Combine into a single list for further processing
    combined_corpus = []
    if northanger_text:
        combined_corpus.append(northanger_text)

    print("\n---- Summary ----")
    if northanger_text:
        print("Loaded Northanger text.")

    if combined_corpus:
        print("\nSnippet of first loaded text (first 300 chars):")
        print(combined_corpus[0][:300])


File 'northanger.txt' not found locally. Downloading it.
Downloading data from https://raw.githubusercontent.com/1Ramirez7/Machine-Learning/refs/heads/main/content_project/rowling/harrybookone.txt
[1m438730/438730[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step

---- Summary ----
Loaded Northanger text.

Snippet of first loaded text (first 300 chars):
Harry Potter and the Sorcerer's Stone CHAPTER ONE THE BOY WHO LIVED Mister and Missus Dursley, of number four, Privet Drive, were proud to say that they were perfectly normal, thank you very much. They were the last people you'd expect to be involved in anything strange or mysterious, because they j


In [None]:
# cell 4
# ============================================
# Attention-based RNN (BahdanauAttention)
# + model builder (build_rnn_model)
# ============================================
class BahdanauAttention(tf.keras.layers.Layer):
    """
    Simplified Bahdanau Attention that uses
    the final hidden state to attend over all RNN outputs.
    """
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.W1 = Dense(units)
        self.W2 = Dense(units)
        self.V  = Dense(1)

    def call(self, hidden_states):
        """
        hidden_states: (batch, seq_len, rnn_units)

        We'll attend over all time steps, using
        the last time step as the 'query' (like simplified Bahdanau).
        """
        # Query vector = last timestep
        # shape: (batch, rnn_units)
        last_state = hidden_states[:, -1, :]

        # Expand dims so last_state can be added to each time step
        # shape: (batch, 1, rnn_units)
        last_state_expanded = tf.expand_dims(last_state, 1)

        # Score shape: (batch, seq_len, 1)
        score = self.V(tf.nn.tanh(
            self.W1(hidden_states) + self.W2(last_state_expanded)
        ))

        # Attention weights across the time dimension
        # shape: (batch, seq_len, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # Weighted sum of hidden_states
        weighted_output = hidden_states * attention_weights
        context_vector = tf.reduce_sum(weighted_output, axis=1)

        return context_vector, attention_weights

    def get_config(self):
        # For model saving/loading
        config = super().get_config()
        config.update({'units': self.units})
        return config


def build_rnn_model(vocab_size,
                    embedding_dim=512,
                    rnn_units=1024,
                    attention_units=512,
                    cell_type='GRU'):
    """
    Builds a single-layer RNN (LSTM or GRU) with Bahdanau attention.
      - cell_type: "LSTM" or "GRU"
    """
    # Dynamically pick the RNN layer
    RNNLayer = LSTM if cell_type.upper() == 'LSTM' else GRU

    # 1) Inputs
    inputs = tf.keras.Input(shape=(None,), dtype=tf.int64)  # (batch, seq_len)

    # 2) Embedding
    x = Embedding(vocab_size, embedding_dim)(inputs)        # (batch, seq_len, embed_dim)

    # 3) Recurrent layer (LSTM or GRU), returning sequences
    x = RNNLayer(rnn_units, return_sequences=True)(x)       # (batch, seq_len, rnn_units)

    # 4) Bahdanau Attention
    attn_layer = BahdanauAttention(attention_units)
    context_vector, attn_weights = attn_layer(x)            # (batch, rnn_units)

    # 5) Final Dense
    outputs = Dense(vocab_size)(context_vector)             # (batch, vocab_size)

    # Wrap in Model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model



In [None]:
# cell 5
# ============================================
# Vectorizer loading and text-generation
# ============================================

def load_vectorizer_from_vocab(vocab_path):
    """
    - Reads the vocabulary from the .txt file created during training.
    - Rebuilds a TextVectorization layer and sets the vocabulary without re-adapting.
    """
    with open(vocab_path, 'r', encoding='utf-8') as f:
        vocab = [line.strip() for line in f]

    vectorizer = TextVectorization(
        standardize=None,
        split='whitespace',
        max_tokens=len(vocab),
        output_mode='int',
        output_sequence_length=None
    )
    vectorizer.set_vocabulary(vocab)
    return vectorizer


def generate_text(model,
                  start_string,
                  vectorizer,
                  num_words=50,
                  temperature=1.5,
                  sequence_length=100):
    """
    Generate text from a given model and vectorizer, starting with 'start_string'.
    """
    # Vectorize the start string
    tokens = vectorizer(tf.constant([start_string]))  # shape: (1, token_count)
    generated_tokens = tokens
    generated_words = start_string.split()

    for _ in range(num_words):
        # Optionally truncate to last 'sequence_length' tokens
        if generated_tokens.shape[1] > sequence_length:
            input_tokens = generated_tokens[:, -sequence_length:]
        else:
            input_tokens = generated_tokens

        # Get predictions
        predictions = model(input_tokens)
        predictions = predictions / temperature

        # Sample next token
        predicted_id = tf.random.categorical(predictions, num_samples=1)[0, 0].numpy()
        generated_words.append(vectorizer.get_vocabulary()[predicted_id])

        # Append predicted token
        predicted_id_tensor = tf.constant([[predicted_id]], dtype=generated_tokens.dtype)
        generated_tokens = tf.concat([generated_tokens, predicted_id_tensor], axis=1)

    return " ".join(generated_words)


def run_inference(model_path, vocab_path, prompt, num_words=50, temperature=1.5, sequence_length=100):
    """
    Loads a saved model (with custom BahdanauAttention),
    rebuilds the vectorizer, and generates text.
    """
    # 1) Load model with custom attention in scope
    loaded_model = tf.keras.models.load_model(
        model_path,
        custom_objects={"BahdanauAttention": BahdanauAttention}
    )

    # 2) Rebuild vectorizer
    loaded_vectorizer = load_vectorizer_from_vocab(vocab_path)

    # 3) Print model summary
    print("Model architecture summary:")
    loaded_model.summary()

    # 4) Generate text
    generated_text = generate_text(
        model=loaded_model,
        start_string=prompt,
        vectorizer=loaded_vectorizer,
        num_words=num_words,
        temperature=temperature,
        sequence_length=sequence_length
    )

    return generated_text


In [None]:
# cell 6

# ============================================
# Curriculum Training Approach
# ============================================

def build_dataset_for_length(raw_text, vectorize_layer, sequence_length, batch_size, buffer_size):
    """
    Converts the entire text into an integer sequence using vectorize_layer.
    Then creates a tf.data.Dataset with sequences of (sequence_length + 1),
    split into (input, target).
    """
    all_tokens = vectorize_layer(tf.constant([raw_text]))[0]
    all_tokens = np.array(all_tokens, dtype=np.int64)

    total_tokens = len(all_tokens)
    input_length = sequence_length + 1
    sequences = []

    for i in range(total_tokens - input_length):
        seq = all_tokens[i : i + input_length]
        sequences.append(seq)

    dataset = tf.data.Dataset.from_tensor_slices(sequences)

    # Split each sequence into (input, target)
    def split_input_target(seq):
        return seq[:-1], seq[-1]

    dataset = dataset.map(split_input_target)
    dataset = dataset.shuffle(buffer_size).batch(batch_size, drop_remainder=True)

    return dataset


def train_author_model_curriculum(
    author_name,
    raw_text,
    sequence_lengths=[25, 25, 100],   # for demonstration
    epochs_per_stage=[1, 1, 1],       # total of 10 epochs across stages
    batch_size=128, # 64
    buffer_size=2500, # 10000
    embedding_dim=512, # 256
    rnn_units=1024, # 1024
    attention_units=512, # 256
    cell_type='LSTM',
    save_dir=project_path
):
    """
    Trains a single model in multiple stages, incrementally increasing 'sequence_length'.
    Steps:
      1) Build ONE TextVectorization layer for the entire text.
      2) Build and compile ONE RNN model.
      3) For each stage, build a Dataset with the current sequence_length and train.
      4) Save the final model, vocab, and config to disk.

    Returns: (model, vectorize_layer)
    """

    # 1) Build a single Vectorizer
    text_ds = tf.data.Dataset.from_tensor_slices([raw_text])
    vectorize_layer = TextVectorization(
        standardize=None,
        split='whitespace',
        max_tokens=20000,
        output_mode='int',
        output_sequence_length=None
    )
    vectorize_layer.adapt(text_ds.batch(BATCH_SIZE_FOR_ADAPT))

    vocab_size = len(vectorize_layer.get_vocabulary())
    print(f"\n--- Curriculum Training ({cell_type}) for: {author_name} ---")
    print(f"Vocabulary size: {vocab_size}")

    # 2) Build + compile the RNN once
    model = build_rnn_model(
        vocab_size=vocab_size,
        embedding_dim=embedding_dim,
        rnn_units=rnn_units,
        attention_units=attention_units,
        cell_type=cell_type
    )
    model.compile(
        loss=SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005)
    )

    # 3) Curriculum Stages
    total_stages = min(len(sequence_lengths), len(epochs_per_stage))
    for stage_idx in range(total_stages):
        seq_len = sequence_lengths[stage_idx]
        stage_epochs = epochs_per_stage[stage_idx]

        print(f"\nStage {stage_idx+1}/{total_stages}: seq_length={seq_len}, epochs={stage_epochs}")
        dataset = build_dataset_for_length(
            raw_text=raw_text,
            vectorize_layer=vectorize_layer,
            sequence_length=seq_len,
            batch_size=batch_size,
            buffer_size=buffer_size
        )

        # Train
        model.fit(dataset, epochs=stage_epochs)

    # 4) Save final model & vocab
    final_model_path = os.path.join(save_dir, f"testone_{cell_type}_{author_name}_curriculum_model.h5")
    model.save(final_model_path)
    print(f"\n[Saved Model] -> {final_model_path}")

    vocab_path = os.path.join(save_dir, f"testone_{cell_type}_{author_name}_curriculum_vocab.txt")
    with open(vocab_path, 'w', encoding='utf-8') as f:
        for token in vectorize_layer.get_vocabulary():
            f.write(token + "\n")
    print(f"[Saved Vocab] -> {vocab_path}")

    # 4b) Save a config file
    config_file = os.path.join(save_dir, f"testone_{cell_type}_{author_name}_curriculum_config.txt")
    with open(config_file, "w", encoding="utf-8") as f:
        f.write("Curriculum training configuration:\n")
        f.write(f"Author:          {author_name}\n")
        f.write(f"Cell Type:       {cell_type}\n")
        f.write(f"Sequence stages: {sequence_lengths}\n")
        f.write(f"Epochs stages:   {epochs_per_stage}\n")
        f.write(f"Batch Size:      {batch_size}\n")
        f.write(f"Buffer Size:     {buffer_size}\n")
        f.write(f"Embedding Dim:   {embedding_dim}\n")
        f.write(f"RNN Units:       {rnn_units}\n")
        f.write(f"Attn Units:      {attention_units}\n")
        f.write(f"Vocab Size:      {vocab_size}\n")
        f.write(f"Adapt Batch:     {BATCH_SIZE_FOR_ADAPT}\n")
    print(f"[Saved Config] -> {config_file}")

    return model, vectorize_layer


In [None]:
# cell 7
# ============================================
# Example: Train Northanger in curriculum style
#          Then generate text
# ============================================

if restart:
    # Example: training with 3 stages
    northanger_model, northanger_vectorizer = train_author_model_curriculum(
        author_name="northanger",
        raw_text=northanger_text,
        sequence_lengths=[25, 25, 100],      # 3 incremental difficulties
        epochs_per_stage=[1, 1, 1],         # total 10 epochs
        cell_type="GRU",                    # can also be 'LSTM'
        save_dir=project_path
    )

    # Generate text
    prompt = "Once upon a time"
    generated_northanger = generate_text(
        model=northanger_model,
        start_string=prompt,
        vectorizer=northanger_vectorizer,
        num_words=50,
        temperature=1.5,
        sequence_length=125  # final stage length
    )

    print("\n--- Generated Text (Northanger) ---")
    print(textwrap.fill(generated_northanger, width=80))



--- Curriculum Training (GRU) for: northanger ---
Vocabulary size: 11994

Stage 1/3: seq_length=25, epochs=1
[1m607/607[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 10ms/step - loss: 7.7513

Stage 2/3: seq_length=25, epochs=1
[1m607/607[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 10ms/step - loss: 6.8708

Stage 3/3: seq_length=100, epochs=1
[1m607/607[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 19ms/step - loss: 6.8091





[Saved Model] -> /content/drive/MyDrive/content_models/testone_GRU_northanger_curriculum_model.h5
[Saved Vocab] -> /content/drive/MyDrive/content_models/testone_GRU_northanger_curriculum_vocab.txt
[Saved Config] -> /content/drive/MyDrive/content_models/testone_GRU_northanger_curriculum_config.txt

--- Generated Text (Northanger) ---
Once upon a time just Station. slits front. it even gold down." allowed, steeply
"Oh (usually offhand. for," it Malfoy, badger, swear furiously unfortunate said.
white house. room door; Dumbledore!" "My happy." illegal Hermione did moons
upstairs, heaving almost thinks slowly, Galleons," because lent move," upset
been noise? line. "... gave but only stick.


In [None]:
# cell 8

# ============================================
# Inference from a saved model (optional)
# ============================================
northanger_model_path = f"{project_path}20_seq_LSTM_model.h5"
northanger_vocab_path = f"{project_path}20_seq_LSTM_vocab.txt"
northanger_config_path = f"{project_path}20_seq_LSTM_config.txt"

prompt = "Once upon a time in a land far away"

try:
    with open(northanger_config_path, "r", encoding="utf-8") as f:
        config_info = f.read()
    print("\n--- Training Configuration (Northanger) ---")
    print(config_info)
except Exception as e:
    print(f"Could not load training configuration: {e}")

# Generate from the saved model
try:
    generated_northanger = run_inference(
        model_path=northanger_model_path,
        vocab_path=northanger_vocab_path,
        prompt=prompt,
        num_words=50,
        temperature=1.5,
        sequence_length=125  # match final stage
    )
    print("\n--- Generated from Saved Northanger Model ---")
    print(textwrap.fill(generated_northanger, width=80))
except Exception as e:
    print(f"Could not run inference: {e}")


In [None]:
# cell 9
# ============================================
# This cell simply lists what's in the environment
# ============================================
import os

# List the contents of the current directory
print("Current directory contents:")
print(os.listdir())

# Check if the saved_files directory exists
if "saved_files" in os.listdir():
    print("\nContents of saved_files directory:")
    print(os.listdir("saved_files"))
else:
    print("\nThe 'saved_files' directory was not found.")


In [None]:
# cell 10

In [None]:
# cell 11