In [2]:
import torch
import math
import torch.nn as nn

# ------------------------------
# Monotonicity Score Function
# ----------------------------

def monotonicity_score(input_tensor, epsilon=0.1, seed=None):
    """
    Compute a sublinear monotonicity score for a 1D tensor.

    Purpose: Estimates how close a sequence is to being monotone increasing by
    sampling a sublinear number of points and checking for violations (where a
    later value is less than an earlier one). The score ranges from 0 (highly
    non-monotone) to 1 (fully monotone).

    Args:
        input_tensor (torch.Tensor): 1D tensor of shape (n,) representing a sequence.
        epsilon (float): Error tolerance controlling sample size (smaller = more accurate).
                         Default is 0.1.
        seed (int, optional): Random seed for reproducibility.

    Returns:
        float: Monotonicity score between 0 and 1.

    Why: This function allows us to quantify monotonicity efficiently, which we
    can then use as a feature in a neural network.
    """
    # Validate input
    if input_tensor.dim() != 1:
        raise ValueError("Input must be a 1D tensor")

    n = input_tensor.size(0)
    if n < 2:
        return 1.0  # Sequences of length 0 or 1 are trivially monotone

    # Set seed for reproducibility
    if seed is not None:
        torch.manual_seed(seed)

    # Determine sample size: O((1/ε) log n) for sublinear complexity
    c = 2.0  # Empirical constant (can be tuned)
    num_samples = int(math.ceil(c / epsilon * math.log(n)))
    num_samples = min(num_samples, n)  # Don’t sample more than sequence length

    # Sample random indices and corresponding values
    sampled_indices = torch.randperm(n)[:num_samples]
    sampled_values = input_tensor[sampled_indices]

    # Sort by index to check monotonicity in sequence order
    sorted_indices = torch.argsort(sampled_indices)
    sampled_indices = sampled_indices[sorted_indices]
    sampled_values = sampled_values[sorted_indices]

    # Helper function to count violations using binary search
    def count_violations(value, idx, values, indices):
        """
        Count how many points to the right of idx have values less than 'value'.

        Why: Efficiently identifies monotonicity violations (later value < earlier value).
        """
        left, right = 0, len(values) - 1
        violations = 0

        while left <= right:
            mid = (left + right) // 2
            if indices[mid] > idx:  # Check only points to the right
                if values[mid] < value:  # Violation found
                    violations += 1
                    left = mid + 1  # Continue searching right
                else:
                    right = mid - 1  # Search left
            else:
                left = mid + 1  # Skip points to the left

        return violations

    # Calculate total violations
    total_violations = 0
    total_checks = 0

    for i in range(num_samples):
        value = sampled_values[i]
        idx = sampled_indices[i]
        violations = count_violations(value, idx, sampled_values, sampled_indices)
        total_violations += violations
        total_checks += (num_samples - i - 1)  # Comparisons to remaining points

    # Compute score: 1 - (fraction of violations)
    score = 1.0 if total_checks == 0 else max(0.0, 1.0 - total_violations / total_checks)
    return score

# ------------------------------
# PyTorch Model Definition
# ------------------------------

class SimpleModel(nn.Module):
    """
    A simple neural network that incorporates monotonicity scores as features.

    Why: Demonstrates how to integrate the monotonicity score into a model by
    concatenating it to each sample’s feature vector.
    """
    def __init__(self, input_dim):
        """
        Args:
            input_dim (int): Number of features per sample (excluding mono score).
        """
        super().__init__()
        # Input dim + 1 because we append the monotonicity score
        self.fc = nn.Linear(input_dim + 1, 1)

    def forward(self, x, mono_scores):
        """
        Forward pass: Concatenate mono_scores to input features and process.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, input_dim).
            mono_scores (torch.Tensor): Tensor of shape (batch_size,) with scores.

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, 1).

        Why: Concatenates each sample’s mono score to its features, ensuring
        dimensional compatibility.
        """
        # Ensure mono_scores matches batch size and add feature dimension
        mono_scores = mono_scores.unsqueeze(1)  # Shape: (batch_size, 1)
        x = torch.cat([x, mono_scores], dim=1)  # Shape: (batch_size, input_dim + 1)
        return self.fc(x)

# ------------------------------
# Example Usage
# ------------------------------

if __name__ == "__main__":
    # ------------------
    # Test Monotonicity Score
    # ------------------
    # Generate synthetic sequences to test the function
    n = 1000
    monotone_series = torch.arange(n, dtype=torch.float32)  # Strictly increasing
    random_series = torch.rand(n)  # Random sequence
    decreasing_series = torch.flip(monotone_series, dims=[0])  # Strictly decreasing

    # Compute scores
    epsilon = 0.1  # Error tolerance for sampling
    seed = 42  # For reproducibility
    score_monotone = monotonicity_score(monotone_series, epsilon, seed)
    score_random = monotonicity_score(random_series, epsilon, seed)
    score_decreasing = monotonicity_score(decreasing_series, epsilon, seed)

    # Display results
    print("\nTesting Monotonicity Score Function:")
    print(f"Monotone series score: {score_monotone:.3f}")  # Should be ~1.0
    print(f"Random series score: {score_random:.3f}")      # Should be ~0.5
    print(f"Decreasing series score: {score_decreasing:.3f}")  # Should be ~0.0

    # ------------------
    # Integrate with Model
    # ------------------
    # Create a batch of data (e.g., tabular data or short sequences)
    batch_size = 32
    seq_length = 10  # Features per sample, assumed to be ordered
    data = torch.rand(batch_size, seq_length)  # Shape: (32, 10)

    # Compute monotonicity score for each sample’s feature sequence
    # Why: We want one score per sample to concatenate as a feature
    mono_scores = torch.tensor(
        [monotonicity_score(data[i], epsilon) for i in range(batch_size)],
        dtype=torch.float32
    )  # Shape: (32,)

    # Initialize and run the model
    model = SimpleModel(seq_length)
    output = model(data, mono_scores)

    # Verify output
    print("\nModel Integration:")
    print(f"Input data shape: {data.shape}")           # (32, 10)
    print(f"Monotonicity scores shape: {mono_scores.shape}")  # (32,)
    print(f"Model output shape: {output.shape}")      # (32, 1)


Testing Monotonicity Score Function:
Monotone series score: 1.000
Random series score: 0.968
Decreasing series score: 0.899

Model Integration:
Input data shape: torch.Size([32, 10])
Monotonicity scores shape: torch.Size([32])
Model output shape: torch.Size([32, 1])
