Colab is making it easier than ever to integrate powerful Generative AI capabilities into your projects. We are launching public preview for a simple and intuitive Python library (google.colab.ai) to access state-of-the-art language models directly within Pro and Pro+ subscriber Colab environments.  This means subscribers can spend less time on configuration and set up and more time bringing their ideas to life. With just a few lines of code, you can now perform a variety of tasks:
- Generate text
- Translate languages
- Write creative content
- Categorize text

Happy Coding!


[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/googlecolab/colabtools/blob/main/notebooks/Getting_started_with_google_colab_ai.ipynb)

In [1]:
# @title List available models
from google.colab import ai

ai.list_models()

['google/gemini-2.5-flash', 'google/gemini-2.5-flash-lite']

Choosing a Model
The model names give you a hint about their capabilities and intended use:

Pro: These are the most capable models, ideal for complex reasoning, creative tasks, and detailed analysis.

Flash: These models are optimized for high speed and efficiency, making them great for summarization, chat applications, and tasks requiring rapid responses.

Gemma: These are lightweight, open-weight models suitable for a variety of text generation tasks and are great for experimentation.

In [2]:
# @title Simple batch generation example
# Only text-to-text input/output is supported
from google.colab import ai

response = ai.generate_text("What is the capital of France?")
print(response)

APIStatusError: Error code: 402 - {'message': 'Colab Models is only available to Colab Pro and Pro+ subscribers.', 'type': 'invalid_request_error'}

In [None]:
# @title Choose a different model
from google.colab import ai

response = ai.generate_text("What is the capital of England", model_name='google/gemini-2.0-flash-lite')
print(response)

For longer text generations, you can stream the response. This displays the output token by token as it's generated, rather than waiting for the entire response to complete. This provides a more interactive and responsive experience. To enable this, simply set stream=True.

In [None]:
# @title Simple streaming example
from google.colab import ai

stream = ai.generate_text("Tell me a short story.", stream=True)
for text in stream:
  print(text, end='')

In [None]:
#@title Text formatting setup
#code is not necessary for colab.ai, but is useful in fomatting text chunks
import sys

class LineWrapper:
    def __init__(self, max_length=80):
        self.max_length = max_length
        self.current_line_length = 0

    def print(self, text_chunk):
        i = 0
        n = len(text_chunk)
        while i < n:
            start_index = i
            while i < n and text_chunk[i] not in ' \n': # Find end of word
                i += 1
            current_word = text_chunk[start_index:i]

            delimiter = ""
            if i < n: # If not end of chunk, we found a delimiter
                delimiter = text_chunk[i]
                i += 1 # Consume delimiter

            if current_word:
                needs_leading_space = (self.current_line_length > 0)

                # Case 1: Word itself is too long for a line (must be broken)
                if len(current_word) > self.max_length:
                    if needs_leading_space: # Newline if current line has content
                        sys.stdout.write('\n')
                        self.current_line_length = 0
                    for char_val in current_word: # Break the long word
                        if self.current_line_length >= self.max_length:
                            sys.stdout.write('\n')
                            self.current_line_length = 0
                        sys.stdout.write(char_val)
                        self.current_line_length += 1
                # Case 2: Word doesn't fit on current line (print on new line)
                elif self.current_line_length + (1 if needs_leading_space else 0) + len(current_word) > self.max_length:
                    sys.stdout.write('\n')
                    sys.stdout.write(current_word)
                    self.current_line_length = len(current_word)
                # Case 3: Word fits on current line
                else:
                    if needs_leading_space:
                        # Define punctuation that should not have a leading space
                        # when they form an entire "word" (token) following another word.
                        no_leading_space_punctuation = {
                            ",", ".", ";", ":", "!", "?",        # Standard sentence punctuation
                            ")", "]", "}",                     # Closing brackets
                            "'s", "'S", "'re", "'RE", "'ve", "'VE", # Common contractions
                            "'m", "'M", "'ll", "'LL", "'d", "'D",
                            "n't", "N'T",
                            "...", "…"                          # Ellipses
                        }
                        if current_word not in no_leading_space_punctuation:
                            sys.stdout.write(' ')
                            self.current_line_length += 1
                    sys.stdout.write(current_word)
                    self.current_line_length += len(current_word)

            if delimiter == '\n':
                sys.stdout.write('\n')
                self.current_line_length = 0
            elif delimiter == ' ':
                # If line is full and a space delimiter arrives, it implies a wrap.
                if self.current_line_length >= self.max_length:
                    sys.stdout.write('\n')
                    self.current_line_length = 0

        sys.stdout.flush()


To use the Gemini API, you'll need an API key. If you don't already have one, create a key in Google AI Studio.
In Colab, add the key to the secrets manager under the "🔑" in the left panel. Give it the name `GOOGLE_API_KEY`. Then pass the key to the SDK:

In [None]:
# Import the Python SDK
import google.generativeai as genai
# Used to securely store your API key
from google.colab import userdata

GOOGLE_API_KEY=userdata.get('GOOGLE_API_KEY')
genai.configure(api_key=GOOGLE_API_KEY)

Before you can make any API calls, you need to initialize the Generative Model.

In [None]:
# Initialize the Gemini API
gemini_model = genai.GenerativeModel('gemini-2.5-flash-preview-04-17')

Now you can make API calls. For example, to generate a poem:

In [None]:
response = gemini_model.generate_content('Write a poem about the moon.')
print(response.text)

In [None]:
# @title Formatted streaming example
from google.colab import ai

wrapper = LineWrapper()
for chunk in ai.generate_text('Give me a long winded description about the evolution of the Roman Empire.', model_name='google/gemini-2.0-flash', stream=True):
  wrapper.print(chunk)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from math import sqrt
import requests
import io

# Set a random seed for reproducibility
np.random.seed(42)

# --- 1. Decision Tree Node and Classifier (Modified from previous assignment) ---

class Node:
    """Represents a single node in the Decision Tree."""
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

class DecisionTreeClassifier:
    """Custom Decision Tree Classifier using Gini Impurity, now supporting feature subsetting."""
    def __init__(self, max_depth=None, min_samples_split=2, n_features_subset=None):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None
        self.n_features_subset = n_features_subset
        self.feature_importance = None # To store feature importance (Gini reduction)

    def _gini_impurity(self, y):
        """Calculates the Gini Impurity of a set of labels y."""
        if len(y) == 0: return 0.0
        m = len(y)
        gini = 1.0
        for count in Counter(y).values():
            gini -= (count / m)**2
        return gini

    def _information_gain(self, y_parent, y_left, y_right):
        """Calculates Information Gain based on Gini impurity."""
        gini_parent = self._gini_impurity(y_parent)
        n_parent = len(y_parent)

        # Avoid division by zero
        if n_parent == 0: return 0

        weighted_impurity = (len(y_left) / n_parent) * self._gini_impurity(y_left) + \
                            (len(y_right) / n_parent) * self._gini_impurity(y_right)

        return gini_parent - weighted_impurity

    def _best_split(self, X, y):
        """Finds the feature and threshold that yield the highest Information Gain."""
        m, n = X.shape
        best_gain = -1
        best_feature_index = None
        best_threshold = None

        # Implement Feature Randomness: select a random subset of features
        if self.n_features_subset is None or self.n_features_subset >= n:
            features_to_check = range(n)
        else:
            # Randomly select self.n_features_subset indices without replacement
            features_to_check = np.random.choice(n, self.n_features_subset, replace=False)

        for feature_index in features_to_check:
            X_column = X[:, feature_index]
            # Consider all unique values in the feature column as potential thresholds
            possible_thresholds = np.unique(X_column)

            for threshold in possible_thresholds:
                y_left = y[X_column <= threshold]
                y_right = y[X_column > threshold]

                if len(y_left) == 0 or len(y_right) == 0: continue

                gain = self._information_gain(y, y_left, y_right)

                if gain > best_gain:
                    best_gain = gain
                    best_feature_index = feature_index
                    best_threshold = threshold

        return best_feature_index, best_threshold, best_gain

    def _most_common_label(self, y):
        """Returns the most frequent class label in the subset y."""
        return Counter(y).most_common(1)[0][0]

    def _update_feature_importance(self, feature_index, gain, X_size):
        """Updates feature importance based on the split's Gini reduction (used only for internal nodes)."""
        if self.feature_importance is None:
            self.feature_importance = np.zeros(X_size)

        # Weight the gain by the proportion of samples at this node
        self.feature_importance[feature_index] += gain * (X_size / self.X_train_size)

    def _build_tree(self, X, y, depth=0):
        """Recursive function to build the decision tree."""
        m, n = X.shape
        num_labels = len(np.unique(y))

        # --- Stopping Criteria ---
        if self.max_depth is not None and depth >= self.max_depth:
            return Node(value=self._most_common_label(y))
        if m < self.min_samples_split:
            return Node(value=self._most_common_label(y))
        if num_labels == 1:
            return Node(value=y[0])

        # --- Find the best split ---
        feature_index, threshold, gain = self._best_split(X, y)

        if gain <= 0:
            return Node(value=self._most_common_label(y))

        # Update feature importance for this tree (used only for Random Forest aggregation)
        if self.X_train_size is not None:
             self._update_feature_importance(feature_index, gain, m)

        # --- Perform the split ---
        X_column = X[:, feature_index]
        left_indices = X_column <= threshold

        X_left, y_left = X[left_indices], y[left_indices]
        X_right, y_right = X[~left_indices], y[~left_indices]

        left_child = self._build_tree(X_left, y_left, depth + 1)
        right_child = self._build_tree(X_right, y_right, depth + 1)

        return Node(feature_index, threshold, left_child, right_child)

    def fit(self, X, y):
        """Initiates the recursive tree-building process."""
        # Store size for feature importance calculation weighting
        self.X_train_size = len(X)
        self.root = self._build_tree(X, y, depth=0)
        return self

    def _traverse_tree(self, x, node):
        """Helper to traverse the tree for a single sample x."""
        if node.value is not None:
            return node.value

        if x[node.feature_index] <= node.threshold:
            return self._traverse_tree(x, node.left)
        else:
            return self._traverse_tree(x, node.right)

    def predict(self, X):
        """Predicts the class labels for a dataset X."""
        predictions = [self._traverse_tree(x, self.root) for x in X]
        return np.array(predictions)

# --- 2. Random Forest Classifier Implementation (Part A, Task 4) ---

class RandomForestClassifier:
    """Ensemble of Decision Trees using Bagging and Feature Randomness."""
    def __init__(self, n_trees=50, max_depth=None, min_samples_split=2, n_features=None):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.n_features = n_features
        self.trees = []
        self.total_features = 0
        self.oob_indices = [] # Bonus Challenge

    def _bootstrap_sample(self, X, y):
        """Creates a bootstrap sample (sampling with replacement) and records OOB indices."""
        m = len(X)
        # Select m indices with replacement
        sample_indices = np.random.choice(m, m, replace=True)

        # Determine Out-of-Bag (OOB) indices
        all_indices = np.arange(m)
        oob_indices = np.setdiff1d(all_indices, np.unique(sample_indices))
        self.oob_indices.append(oob_indices) # Store for OOB error calculation

        return X[sample_indices], y[sample_indices]

    def fit(self, X, y):
        """Trains n_trees Decision Trees using bagging and feature randomness."""
        self.total_features = X.shape[1]
        self.trees = []

        # Determine the number of features to use at each split (default: sqrt(total_features))
        if self.n_features is None:
            self.n_features = int(sqrt(self.total_features))

        for _ in range(self.n_trees):
            # 1. Create a bootstrap sample (Bagging)
            X_sample, y_sample = self._bootstrap_sample(X, y)

            # 2. Train a Decision Tree with Feature Randomness
            tree = DecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                n_features_subset=self.n_features # Pass n_features to control randomness
            )
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

        # Calculate combined feature importance after all trees are grown
        self.feature_importances_ = self._calculate_feature_importance()

        return self

    def _calculate_feature_importance(self):
        """Aggregates feature importance (Gini reduction) across all trees."""
        total_importance = np.zeros(self.total_features)

        for tree in self.trees:
            if tree.feature_importance is not None:
                total_importance += tree.feature_importance

        # Normalize the importance scores
        if np.sum(total_importance) > 0:
            return total_importance / np.sum(total_importance)
        return total_importance

    def predict(self, X):
        """Aggregates predictions from all trees using majority vote."""
        # Get predictions from every tree
        tree_predictions = np.array([tree.predict(X) for tree in self.trees])

        # Transpose to get (n_samples, n_trees)
        tree_predictions = tree_predictions.T

        final_predictions = np.array([
            Counter(pred).most_common(1)[0][0]
            for pred in tree_predictions
        ])
        return final_predictions

    # --- Bonus Challenge: Out-of-Bag (OOB) Error Estimation ---
    def oob_error(self, X_train, y_train):
        """Estimates generalization error using Out-of-Bag samples."""
        m = len(X_train)
        oob_predictions = np.empty((m, self.n_trees)) * np.nan # NaN for samples not in OOB

        for i, tree in enumerate(self.trees):
            # Get the indices that were NOT used for training this tree
            oob_indices = self.oob_indices[i]

            if len(oob_indices) > 0:
                X_oob = X_train[oob_indices]
                y_oob = y_train[oob_indices]

                # Get predictions for the OOB samples
                predictions = tree.predict(X_oob)

                # Store predictions in the full matrix at the correct indices
                for idx, pred in zip(oob_indices, predictions):
                    oob_predictions[idx, i] = pred

        # For each sample, find the majority vote among trees where it was OOB
        final_oob_preds = []
        for i in range(m):
            # Predictions from trees where sample i was OOB (non-NaN values)
            valid_preds = oob_predictions[i][~np.isnan(oob_predictions[i])]

            if len(valid_preds) > 0:
                # Majority vote for that sample
                final_oob_preds.append(Counter(valid_preds).most_common(1)[0][0])
            else:
                # If a sample was in every bootstrap (rare), assign majority class of the full training set
                final_oob_preds.append(Counter(y_train).most_common(1)[0][0])

        # Calculate error
        final_oob_preds = np.array(final_oob_preds)

        # Compare OOB prediction with actual training label
        return 1.0 - calculate_accuracy(y_train, final_oob_preds)

# --- 3. Evaluation Metrics (Part B, Task 1) ---

def confusion_matrix(y_true, y_pred, labels=[0, 1]):
    """Calculates the confusion matrix components (TP, FP, FN, TN)."""
    TP = np.sum((y_true == 1) & (y_pred == 1))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    FN = np.sum((y_true == 1) & (y_pred == 0))
    TN = np.sum((y_true == 0) & (y_pred == 0))
    return TP, FP, FN, TN

def calculate_accuracy(y_true, y_pred):
    """Calculates classification accuracy."""
    return np.sum(y_true == y_pred) / len(y_true)

def precision_score(y_true, y_pred):
    """Precision: TP / (TP + FP)"""
    TP, FP, _, _ = confusion_matrix(y_true, y_pred)
    # Handle division by zero
    return TP / (TP + FP) if (TP + FP) > 0 else 0.0

def recall_score(y_true, y_pred):
    """Recall: TP / (TP + FN)"""
    TP, _, FN, _ = confusion_matrix(y_true, y_pred)
    # Handle division by zero
    return TP / (TP + FN) if (TP + FN) > 0 else 0.0

def f1_score(y_true, y_pred):
    """F1-Score: 2 * (Precision * Recall) / (Precision + Recall)"""
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    # Handle division by zero
    return 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

# --- 4. Data Loading and Preprocessing (Part A, Task 2) ---

def load_wine_data(url):
    """Loads the Wine Quality (Red) dataset and converts it to binary classification."""
    try:
        # Use requests to fetch data, then pandas to read
        s = requests.get(url).content
        df = pd.read_csv(io.StringIO(s.decode('utf-8')), sep=';')
    except Exception as e:
        print(f"Error loading data from URL. Using local fallback (mock data) if available.")
        # Fallback needed if external connection fails
        # Using a simple mock dataset if fetching fails for demonstration purposes
        df = pd.DataFrame(np.random.rand(100, 12), columns=['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol', 'quality'])

    # Features and target
    X = df.drop('quality', axis=1).values
    y_raw = df['quality'].values

    # Binary Classification: quality > 5 is 'good' (1), <= 5 is 'bad' (0)
    y_binary = (y_raw > 5).astype(int)

    feature_names = df.drop('quality', axis=1).columns.to_list()

    # Simple standardization for demonstration, though not strictly required for DT/RF,
    # it is often good practice if the ensemble is later used for regression or different tasks.
    X = (X - np.mean(X, axis=0)) / np.std(X, axis=0)

    return X, y_binary, feature_names

WINE_URL_RED = 'https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv'
X, y, feature_names = load_wine_data(WINE_URL_RED)

# Split the dataset (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# --- 5. Model Training and Comparison (Part B, Task 2) ---

# --- A. Single Overfitting Decision Tree (Max Depth = high) ---
single_tree = DecisionTreeClassifier(max_depth=100) # Ensure it grows deep
single_tree.fit(X_train, y_train)
y_pred_single = single_tree.predict(X_test)

metrics_single = {
    'Accuracy': calculate_accuracy(y_test, y_pred_single),
    'Precision': precision_score(y_test, y_pred_single),
    'Recall': recall_score(y_test, y_pred_single),
    'F1-Score': f1_score(y_test, y_pred_single),
}

# --- B. Random Forest Classifier ---
# Calculate the optimal n_features for this dataset (sqrt(11) approx 3)
N_FEATURES_SUBSET = int(sqrt(X_train.shape[1]))

rf_model = RandomForestClassifier(
    n_trees=100,
    max_depth=10,
    min_samples_split=2,
    n_features=N_FEATURES_SUBSET # Using 3 features per split
)
rf_model.fit(X_train, y_train)
y_pred_rf = rf_model.predict(X_test)

metrics_rf = {
    'Accuracy': calculate_accuracy(y_test, y_pred_rf),
    'Precision': precision_score(y_test, y_pred_rf),
    'Recall': recall_score(y_test, y_pred_rf),
    'F1-Score': f1_score(y_test, y_pred_rf),
}

# --- C. Bonus Challenge: OOB Error ---
oob_error_rate = rf_model.oob_error(X_train, y_train)
test_error_rate = 1.0 - metrics_rf['Accuracy']

# Print Comparison Table
print("--------------------------------------------------------------------------------")
print("Model Performance Comparison (Binary Wine Classification)")
print("--------------------------------------------------------------------------------")
print(f"{'Metric':<10} | {'Single Deep Tree':<20} | {'Random Forest (100 Trees)':<25}")
print("-" * 60)
for metric in ['Accuracy', 'Precision', 'Recall', 'F1-Score']:
    print(f"{metric:<10} | {metrics_single[metric]:<20.4f} | {metrics_rf[metric]:<25.4f}")
print("--------------------------------------------------------------------------------")
print(f"Bonus: OOB Error Estimate: {oob_error_rate:.4f} (Test Error: {test_error_rate:.4f})")
print("--------------------------------------------------------------------------------")

# --- 6. Visualization: Accuracy vs. Number of Trees (Part B, Task 3) ---

n_trees_range = [1, 5, 10, 25, 50, 100]
rf_accuracy_over_trees = []

print("\nEvaluating Accuracy vs. Number of Trees...")
for n in n_trees_range:
    # Use consistent depth and features for fair comparison
    temp_rf = RandomForestClassifier(n_trees=n, max_depth=10, n_features=N_FEATURES_SUBSET)
    temp_rf.fit(X_train, y_train)
    y_pred_temp = temp_rf.predict(X_test)
    rf_accuracy_over_trees.append(calculate_accuracy(y_test, y_pred_temp))

plt.figure(figsize=(10, 6))
plt.plot(n_trees_range, rf_accuracy_over_trees, marker='o', linestyle='-', color='darkgreen')
plt.title('Random Forest Test Accuracy vs. Number of Trees')
plt.xlabel('Number of Trees (n_trees)')
plt.ylabel('Test Set Accuracy')
plt.xticks(n_trees_range)
plt.grid(True, linestyle='--', alpha=0.7)
plt.show() # [Image of Accuracy vs. Number of Trees plot]


# --- 7. Visualization: Feature Importance (Part B, Task 3) ---

importance_scores = rf_model.feature_importances_
# Sort features by importance
sorted_indices = np.argsort(importance_scores)[::-1]
sorted_importances = importance_scores[sorted_indices]
sorted_feature_names = [feature_names[i] for i in sorted_indices]

plt.figure(figsize=(12, 7))
plt.bar(sorted_feature_names, sorted_importances, color='teal')
plt.xlabel('Feature')
plt.ylabel('Normalized Importance Score (Gini Reduction)')
plt.title('Random Forest Feature Importance')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show() #
