In [2]:
# Mount your Google Drive in the new notebook
from google.colab import drive
drive.mount('/content/drive')

# Install required dependencies
!pip install transformers datasets torch nltk rouge_score
import nltk
nltk.download('punkt')

# Import necessary libraries
import os
import json
import numpy as np
import torch
from transformers import RobertaTokenizer, T5ForConditionalGeneration, TrainingArguments, Trainer
from datasets import Dataset
from sklearn.model_selection import train_test_split
from rouge_score import rouge_scorer
import time
import gc

Mounted at /content/drive
Collecting rouge_score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [3]:
# Load your existing data splits
data_dir = '/content/drive/MyDrive/ts_documentation/data'

with open(f'{data_dir}/full_train_split.json', 'r') as f:
    train_data = json.load(f)

with open(f'{data_dir}/full_val_split.json', 'r') as f:
    val_data = json.load(f)

with open(f'{data_dir}/full_test_split.json', 'r') as f:
    test_data = json.load(f)

print(f"Loaded {len(train_data)} training, {len(val_data)} validation, and {len(test_data)} test examples")

# If you have a partially trained model, you can also load it
# model_dir = '/content/drive/MyDrive/ts_documentation/models/[your_model_name]'
# model = T5ForConditionalGeneration.from_pretrained(model_dir)
# tokenizer = RobertaTokenizer.from_pretrained(model_dir)

Loaded 1837 training, 230 validation, and 230 test examples


In [4]:
from transformers import RobertaTokenizer, T5ForConditionalGeneration, TrainingArguments, Trainer, TrainerCallback

def train_on_a100(train_data, val_data,
                  output_dir,
                  model_name="Salesforce/codet5-base",
                  epochs=8,
                  batch_size=4,
                  gradient_accumulation_steps=4,
                  learning_rate=3e-5,
                  max_input_length=512,
                  max_output_length=256,
                  fp16=True):
    """Training function optimized for A100 GPU"""

    print(f"Training on {len(train_data)} examples with {model_name} on A100 GPU")
    start_time = time.time()

    # Create directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Initialize model and tokenizer
    print(f"Loading model and tokenizer: {model_name}")
    tokenizer = RobertaTokenizer.from_pretrained(model_name)
    model = T5ForConditionalGeneration.from_pretrained(model_name)

    # With A100, gradient checkpointing is optional but still helpful for larger batches
    model.gradient_checkpointing_enable()
    print("Gradient checkpointing enabled")

    # Check transformers version to use the right parameter names
    import transformers
    from packaging import version
    transformers_version = transformers.__version__
    print(f"Using transformers version: {transformers_version}")
    is_new_version = version.parse(transformers_version) >= version.parse('4.46.0')

    # Create training arguments
    args_dict = {
        "output_dir": output_dir,
        "learning_rate": learning_rate,
        "per_device_train_batch_size": batch_size,
        "per_device_eval_batch_size": batch_size,
        "gradient_accumulation_steps": gradient_accumulation_steps,
        "num_train_epochs": epochs,
        "weight_decay": 0.01,
        "logging_dir": f"{output_dir}/logs",
        "logging_steps": 50,
        "save_strategy": "epoch",
        "save_total_limit": 2,
        "load_best_model_at_end": True,
        "metric_for_best_model": "rougeL",
        "greater_is_better": True,
        "fp16": fp16,
        "dataloader_num_workers": 4,
        "report_to": "none"
    }

    # Add the correct evaluation strategy parameter based on version
    if is_new_version:
        args_dict["eval_strategy"] = "epoch"
    else:
        args_dict["evaluation_strategy"] = "epoch"

    # Save training configuration
    with open(f'{output_dir}/training_config.json', 'w') as f:
        json.dump(args_dict, f, indent=2)

    # Create training arguments
    training_args = TrainingArguments(**args_dict)

    # Define the tokenization function
    def tokenize_function(examples, max_input_length=max_input_length, max_output_length=max_output_length):
        # Format inputs
        inputs = [f"Generate documentation for TypeScript code: {item['input']}" for item in examples]
        outputs = [item['output'] for item in examples]

        # Tokenize inputs
        model_inputs = tokenizer(
            inputs,
            max_length=max_input_length,
            padding="max_length",
            truncation=True
        )

        # Tokenize outputs
        with tokenizer.as_target_tokenizer():
            labels = tokenizer(
                outputs,
                max_length=max_output_length,
                padding="max_length",
                truncation=True
            )

        model_inputs["labels"] = labels["input_ids"]
        return model_inputs

    # Process training data in batches for efficiency
    print("Tokenizing training dataset...")
    tokenized_train_dict = {
        'input_ids': [],
        'attention_mask': [],
        'labels': []
    }

    # Process in batches to maintain efficiency
    batch_size_processing = 100  # A100 can handle larger processing batches
    for i in range(0, len(train_data), batch_size_processing):
        batch_end = min(i + batch_size_processing, len(train_data))
        batch = train_data[i:batch_end]
        tokenized_batch = tokenize_function(batch)

        # Add to dictionary
        for key in tokenized_train_dict:
            tokenized_train_dict[key].extend(tokenized_batch[key])

        if (i // batch_size_processing) % 5 == 0:  # Log progress every 5 batches
            print(f"Processed {batch_end}/{len(train_data)} training examples")

    # Create dataset
    train_dataset = Dataset.from_dict(tokenized_train_dict)

    # Free memory
    del tokenized_train_dict
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    # Tokenize validation dataset
    print("Tokenizing validation dataset...")
    tokenized_val = tokenize_function(val_data)
    val_dataset = Dataset.from_dict(tokenized_val)

    # Define compute_metrics function
    def compute_metrics(eval_preds):
        """Compute ROUGE metrics for evaluation"""
        # Extract preds and labels
        preds, labels = eval_preds

        # If predictions are a tuple (common with seq2seq models), take the first element
        if isinstance(preds, tuple):
            preds = preds[0]

        # If preds are logits, convert to token IDs
        if hasattr(preds, 'shape') and len(preds.shape) == 3:
            # Shape is [batch_size, seq_len, vocab_size]
            # Taking argmax along vocab dimension to get most likely token IDs
            preds = np.argmax(preds, axis=-1)

        # Replace -100 values in labels with pad token ID
        labels = np.where(labels != -100, labels, tokenizer.pad_token_id)

        # Decode predictions and labels
        decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
        decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

        # Calculate ROUGE scores
        scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
        rouge_scores = {'rouge1': 0.0, 'rouge2': 0.0, 'rougeL': 0.0}
        count = 0

        for pred, label in zip(decoded_preds, decoded_labels):
            if not pred.strip() or not label.strip():
                continue

            scores = scorer.score(label, pred)
            for key in rouge_scores:
                rouge_scores[key] += scores[key].fmeasure
            count += 1

        # Average scores
        if count > 0:
            for key in rouge_scores:
                rouge_scores[key] /= count

        print(f"Computed ROUGE scores on {count} examples: {rouge_scores}")
        return rouge_scores

    # Early stopping callback
    class EarlyStoppingCallback(TrainerCallback):
        """Callback to stop training when metrics stop improving"""
        def __init__(self, metric_name="eval_rougeL", patience=2, min_delta=0.001,
                    greater_is_better=True, verbose=True):
            self.metric_name = metric_name
            self.patience = patience
            self.min_delta = min_delta
            self.wait = 0
            self.best_value = None
            self.greater_is_better = greater_is_better
            self.verbose = verbose
            self.stopped_epoch = 0

        def on_evaluate(self, args, state, control, metrics=None, **kwargs):
            if not metrics:
                return

            # Get current value
            current = metrics.get(self.metric_name)
            if current is None:
                return

            if self.best_value is None:
                self.best_value = current
                if self.verbose:
                    print(f"Initial {self.metric_name}: {current:.6f}")
                return

            # Improvement check
            if self.greater_is_better:
                is_better = current > self.best_value + self.min_delta
            else:
                is_better = current < self.best_value - self.min_delta

            if is_better:
                if self.verbose:
                    print(f"Epoch {state.epoch:.2f}: {self.metric_name} improved from {self.best_value:.6f} to {current:.6f}")
                self.best_value = current
                self.wait = 0
            else:
                self.wait += 1
                if self.verbose:
                    print(f"Epoch {state.epoch:.2f}: {self.metric_name} did not improve. Wait is {self.wait}/{self.patience}")

            # Stop if no improvement for patience epochs
            if self.wait >= self.patience:
                if self.verbose:
                    print(f"Early stopping triggered after epoch {state.epoch:.2f}")
                self.stopped_epoch = state.epoch
                control.should_training_stop = True

    # Initialize trainer with early stopping
    early_stopping_callback = EarlyStoppingCallback(
        metric_name="eval_rougeL",
        patience=2,
        min_delta=0.001,
        greater_is_better=True,
        verbose=True
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        callbacks=[early_stopping_callback]
    )

    # Start training with error handling
    print(f"Starting training for {epochs} epochs...")
    try:
        trainer.train()
        print("Training completed successfully!")
    except Exception as e:
        print(f"Error during training: {e}")
        import traceback
        traceback.print_exc()
        print("Saving partial model...")
        model.save_pretrained(f"{output_dir}/partial_model")
        tokenizer.save_pretrained(f"{output_dir}/partial_model")
        return model, tokenizer, trainer

    # Save final model
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)

    # Calculate training time
    training_time = time.time() - start_time
    hours, remainder = divmod(training_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f"Training completed in {int(hours)}h {int(minutes)}m {int(seconds)}s")
    print(f"Final model saved to {output_dir}")

    return model, tokenizer, trainer

In [5]:
# Define output directory
output_dir = '/content/drive/MyDrive/ts_documentation/models/codet5-base-a100'

# Start training with A100-optimized settings
model, tokenizer, trainer = train_on_a100(
    train_data=train_data,
    val_data=val_data,
    output_dir=output_dir,
    model_name="Salesforce/codet5-base",
    epochs=8,
    batch_size=4,
    gradient_accumulation_steps=4
)

Training on 1837 examples with Salesforce/codet5-base on A100 GPU
Loading model and tokenizer: Salesforce/codet5-base


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/1.48k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/703k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/294k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/12.5k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.57k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/892M [00:00<?, ?B/s]

Gradient checkpointing enabled
Using transformers version: 4.51.3
Tokenizing training dataset...


model.safetensors:   0%|          | 0.00/892M [00:00<?, ?B/s]



Processed 100/1837 training examples
Processed 600/1837 training examples
Processed 1100/1837 training examples
Processed 1600/1837 training examples
Tokenizing validation dataset...
Starting training for 8 epochs...


`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`...


Epoch,Training Loss,Validation Loss,Rouge1,Rouge2,Rougel
1,0.342,0.209612,0.878543,0.798659,0.86834
2,0.2264,0.17961,0.897533,0.827715,0.889167
3,0.1668,0.165132,0.906683,0.841282,0.897819
4,0.1774,0.157514,0.908496,0.845575,0.90124
5,0.1755,0.154413,0.909857,0.849087,0.903234
6,0.1597,0.151502,0.913536,0.855849,0.906757
7,0.1606,0.150183,0.912952,0.855251,0.906164
8,0.151,0.149546,0.913486,0.856173,0.906689


Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Computed ROUGE scores on 230 examples: {'rouge1': 0.8785430453439855, 'rouge2': 0.7986589854392327, 'rougeL': 0.8683400884366703}
Initial eval_rougeL: 0.868340
Computed ROUGE scores on 230 examples: {'rouge1': 0.8975328349623052, 'rouge2': 0.8277148122533237, 'rougeL': 0.8891670060724433}
Epoch 2.00: eval_rougeL improved from 0.868340 to 0.889167
Computed ROUGE scores on 230 examples: {'rouge1': 0.9066831904673701, 'rouge2': 0.8412822922366446, 'rougeL': 0.8978189234782373}
Epoch 3.00: eval_rougeL improved from 0.889167 to 0.897819
Computed ROUGE scores on 230 examples: {'rouge1': 0.9084956849430231, 'rouge2': 0.84557465697041, 'rougeL': 0.9012395700888379}
Epoch 4.00: eval_rougeL improved from 0.897819 to 0.901240
Computed ROUGE scores on 230 examples: {'rouge1': 0.9098573948012553, 'rouge2': 0.8490872876269447, 'rougeL': 0.9032344980919039}
Epoch 5.00: eval_rougeL improved from 0.901240 to 0.903234
Computed ROUGE scores on 230 examples: {'rouge1': 0.9135363219731103, 'rouge2': 0.8558

There were missing keys in the checkpoint model loaded: ['encoder.embed_tokens.weight', 'decoder.embed_tokens.weight', 'lm_head.weight'].


Training completed successfully!
Training completed in 0h 24m 22s
Final model saved to /content/drive/MyDrive/ts_documentation/models/codet5-base-a100


# Testing phase

In [6]:
def test_typescript_examples(model_path, examples=None):
    """Test the model on real TypeScript examples"""

    # Load the model and tokenizer
    tokenizer = RobertaTokenizer.from_pretrained(model_path)
    model = T5ForConditionalGeneration.from_pretrained(model_path)

    # Move to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()  # Set to evaluation mode

    # If no examples provided, use these defaults
    if not examples:
        examples = [
            """
            export function useState<S>(initialState: S | (() => S)): [S, Dispatch<SetStateAction<S>>] {
              const dispatcher = resolveDispatcher();
              return dispatcher.useState(initialState);
            }
            """,

            """
            export class Component<P = {}, S = {}> {
              static contextType?: Context<any>;
              constructor(props: P);
              setState<K extends keyof S>(
                state: ((prevState: Readonly<S>, props: Readonly<P>) => (Pick<S, K> | S | null)) | (Pick<S, K> | S | null),
                callback?: () => void
              ): void;
              forceUpdate(callback?: () => void): void;
              render(): ReactNode;
              readonly props: Readonly<P>;
              state: Readonly<S>;
            }
            """
        ]

    # Process each example
    results = []
    for i, code in enumerate(examples):
        # Prepare input
        input_text = f"Generate documentation for TypeScript code: {code}"

        # Tokenize
        inputs = tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True).to(device)

        # Generate documentation
        with torch.no_grad():
            output_sequences = model.generate(
                inputs.input_ids,
                max_length=256,
                num_beams=4,
                early_stopping=True,
                temperature=0.7  # Slightly increased for more variety
            )

        # Decode
        generated_doc = tokenizer.decode(output_sequences[0], skip_special_tokens=True)

        # Print results
        print(f"\n\n----- Example {i+1} -----")
        print(f"CODE:\n{code}")
        print(f"\nGENERATED DOCUMENTATION:\n{generated_doc}")

        results.append({
            'code': code,
            'documentation': generated_doc
        })

    return results

# Test the model on some examples
test_results = test_typescript_examples('/content/drive/MyDrive/ts_documentation/models/codet5-base-a100')





----- Example 1 -----
CODE:

            export function useState<S>(initialState: S | (() => S)): [S, Dispatch<SetStateAction<S>>] {
              const dispatcher = resolveDispatcher();
              return dispatcher.useState(initialState);
            }
            

GENERATED DOCUMENTATION:
The function **useState** is defined in this module.

This function is exported.

Signature: `useState(initialState: S | (() => S) -> [S, Dispatch<SetStateAction<S>>]`


----- Example 2 -----
CODE:

            export class Component<P = {}, S = {}> {
              static contextType?: Context<any>;
              constructor(props: P);
              setState<K extends keyof S>(
                state: ((prevState: Readonly<S>, props: Readonly<P>) => (Pick<S, K> | S | null)) | (Pick<S, K> | S | null),
                callback?: () => void
              ): void;
              forceUpdate(callback?: () => void): void;
              render(): ReactNode;
              readonly props: Readonly<P>;
 

In [7]:
def generate_comprehensive_docs(model_path, code_example, doc_style="standard"):
    """Generate detailed documentation with enhanced comprehensiveness"""

    # Load model and tokenizer
    tokenizer = RobertaTokenizer.from_pretrained(model_path)
    model = T5ForConditionalGeneration.from_pretrained(model_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    # Select prompt based on documentation style
    if doc_style == "jsdoc":
        prompt = f"Generate JSDoc-style documentation with @param, @returns, and @example tags for TypeScript code: {code_example}"
    elif doc_style == "detailed":
        prompt = f"Generate detailed documentation for TypeScript code explaining purpose, parameters, return types, and usage examples: {code_example}"
    elif doc_style == "markdown":
        prompt = f"Generate markdown documentation for TypeScript code with sections for Parameters, Returns, and Examples: {code_example}"
    else:
        prompt = f"Generate documentation for TypeScript code: {code_example}"

    # Tokenize and generate
    inputs = tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True).to(device)
    with torch.no_grad():
        output_sequences = model.generate(
            inputs.input_ids,
            max_length=384,         # Increased for more detailed output
            num_beams=5,            # More beams for better quality
            do_sample=True,         # Enable sampling
            temperature=0.7,        # Controlled creativity
            top_p=0.92,             # Nucleus sampling
            no_repeat_ngram_size=2, # Prevent repetition
            early_stopping=True
        )

    # Decode output
    generated_doc = tokenizer.decode(output_sequences[0], skip_special_tokens=True)

    # Post-process to add structure if needed
    if doc_style == "jsdoc" and "@param" not in generated_doc:
        # Extract function name and parameters
        import re
        fn_match = re.search(r'function\s+(\w+)', code_example)
        if fn_match:
            fn_name = fn_match.group(1)
            params_match = re.search(r'\(([^)]*)\)', code_example)
            params = params_match.group(1) if params_match else ""

            # Add JSDoc structure
            generated_doc = f"/**\n * {generated_doc}\n *\n"

            # Add parameters
            for param in params.split(','):
                if param.strip():
                    param_name = param.strip().split(':')[0].strip()
                    generated_doc += f" * @param {param_name} Description of {param_name}\n"

            # Add return
            return_match = re.search(r'\):\s*([^{]+)', code_example)
            if return_match:
                return_type = return_match.group(1).strip()
                generated_doc += f" * @returns {return_type}\n"

            generated_doc += " */\n"

    elif doc_style == "markdown" and "## Parameters" not in generated_doc:
        # Add markdown structure
        import re
        fn_match = re.search(r'function\s+(\w+)|class\s+(\w+)', code_example)
        if fn_match:
            entity_name = fn_match.group(1) if fn_match.group(1) else fn_match.group(2)
            entity_type = "Function" if fn_match.group(1) else "Class"

            # Start with heading
            structured_doc = f"# {entity_name}\n\n{generated_doc}\n\n"

            # Add parameter section
            structured_doc += "## Parameters\n\n"
            params_match = re.search(r'\(([^)]*)\)', code_example)
            if params_match and params_match.group(1).strip():
                for param in params_match.group(1).split(','):
                    if param.strip():
                        parts = param.strip().split(':')
                        param_name = parts[0].strip()
                        param_type = parts[1].strip() if len(parts) > 1 else "any"
                        structured_doc += f"- **{param_name}** (`{param_type}`): Description of parameter\n"
            else:
                structured_doc += "- No parameters\n"

            # Add returns section for functions
            if entity_type == "Function":
                structured_doc += "\n## Returns\n\n"
                return_match = re.search(r'\):\s*([^{]+)', code_example)
                if return_match:
                    return_type = return_match.group(1).strip()
                    structured_doc += f"- `{return_type}`: Return value description\n"
                else:
                    structured_doc += "- `void`: This function doesn't return a value\n"

            # Add example section
            structured_doc += "\n## Example\n\n```typescript\n// Example usage of " + entity_name + "\n```\n"

            generated_doc = structured_doc

    return generated_doc