<a href="https://colab.research.google.com/github/LiYuan199701/AMIC/blob/document-level-BAMIC/Model_only_document_level_BAMIC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Display Model codes for document-level BAMIC

# Basic package loading and import torch

In [None]:
import pyximport
import os
import IPython
import Cython
import sys
import os
import numpy as np
import pandas as pd
import copy
import scipy
import pickle
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
import random, math, sys, string, re
%matplotlib inline
# Set inline backend and resolution
%config InlineBackend.figure_format = 'retina'  # For Retina displays
plt.rcParams['figure.dpi'] = 100  # Set dpi to 200 for high resolution

# Import keras and tensorflow
#from keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
#from tensorflow.keras.preprocessing.sequence import pad_sequences

# Split dataset into train, val, test
from sklearn.model_selection import train_test_split

# Import Torch Package and its functions
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn import Parameter
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.autograd import Variable
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
torch.__version__

'2.8.0+cu126'

# Bayesian AMIC structure

## Bayesian last-layer class

In [None]:
#%%writefile models/models.py
import numpy as np
import pandas as pd
import copy
import scipy
import pickle
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
import random, math, sys, string, re
# Import Torch Package and its functions
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn import Parameter
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.autograd import Variable
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

import torch
import torch.nn as nn
import torch.nn.functional as F
# =========================
# 2) Bayesian last-layer pieces
# =========================
class BayesLinear(nn.Module):
    """Factorized Gaussian posterior; prior N(0, sigma_p^2 I)."""
    def __init__(self, in_features, out_features, prior_sigma=1.0, init_rho=-3.0):
        super().__init__()
        self.weight_mu  = nn.Parameter(torch.zeros(out_features, in_features))
        self.weight_rho = nn.Parameter(torch.full((out_features, in_features), float(init_rho)))
        self.bias_mu    = nn.Parameter(torch.zeros(out_features))
        self.bias_rho   = nn.Parameter(torch.full((out_features,), float(init_rho)))
        self.register_buffer("prior_sigma", torch.tensor(float(prior_sigma)))

    @property
    def weight_sigma(self):
        return F.softplus(self.weight_rho) + 1e-8

    @property
    def bias_sigma(self):
        return F.softplus(self.bias_rho) + 1e-8

    def kl(self) -> torch.Tensor:
        w_mu, w_sigma = self.weight_mu, self.weight_sigma
        b_mu, b_sigma = self.bias_mu,   self.bias_sigma
        p_sigma = self.prior_sigma
        kl_w = torch.log(p_sigma / w_sigma) + (w_sigma**2 + w_mu**2) / (2 * p_sigma**2) - 0.5
        kl_b = torch.log(p_sigma / b_sigma) + (b_sigma**2 + b_mu**2) / (2 * p_sigma**2) - 0.5
        return kl_w.sum() + kl_b.sum()

    def forward(self, x, sample: bool = True):
        if self.training or sample:
            W = self.weight_mu + self.weight_sigma * torch.randn_like(self.weight_mu)
            b = self.bias_mu   + self.bias_sigma   * torch.randn_like(self.bias_mu)
        else:
            W, b = self.weight_mu, self.bias_mu
        return F.linear(x, W, b)


## create_emb_layer function

In [None]:
# -------------------------------------------------------------------
# create_emb_layer
# -------------------------------------------------------------------
def create_emb_layer(weights_matrix, non_trainable=False):

    num_embeddings, embedding_dim = pad_embedding_matrix.size()
    emb_layer = nn.Embedding.from_pretrained(pad_embedding_matrix)
    emb_layer.load_state_dict({'weight': weights_matrix})
    if non_trainable:
        emb_layer.weight.requires_grad = False

    return emb_layer, num_embeddings, embedding_dim

## Self‐Attention

In [None]:
# -------------------------------------------------------------------
# Self‐Attention
# -------------------------------------------------------------------
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class SelfAttention(nn.Module):
    """
    Multi-head self-attention with standard scaling by 1/sqrt(S),
    where S = emb // heads is the per-head dimensionality.

    Args:
        emb (int): token embedding size E.
        heads (int): number of attention heads H.
        mask (bool): if True, apply causal masking (no attending to future tokens).
        reduced_dim (int|None): optional projection size after attention.
        attn_dropout (float): dropout on attention weights.
        proj_dropout (float): dropout on the unified head output.
    """
    def __init__(self, emb, heads=8, mask=False, reduced_dim=None,
                 attn_dropout=0.10, proj_dropout=0.20):
        super().__init__()
        assert emb % heads == 0, (
            f'Embedding dimension ({emb}) should be divisible by nr. of heads ({heads})'
        )

        self.emb   = emb
        self.heads = heads
        self.mask  = mask

        # Linear projections to K, Q, V (E -> E), no bias (common choice)
        self.tokeys    = nn.Linear(emb, emb, bias=False)
        self.toqueries = nn.Linear(emb, emb, bias=False)
        self.tovalues  = nn.Linear(emb, emb, bias=False)

        # After concatenating heads back to E, mix them
        self.unifyheads = nn.Linear(emb, emb, bias=True)

        # Optional final projection E -> reduced_dim
        self.projection = nn.Linear(emb, reduced_dim, bias=True) if reduced_dim is not None else None

        # Optional dropouts
        self.attn_drop = nn.Dropout(attn_dropout) if attn_dropout > 0 else nn.Identity()
        self.proj_drop = nn.Dropout(proj_dropout) if proj_dropout > 0 else nn.Identity()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (B, T, E)
        returns: (B, T, E) or (B, T, reduced_dim) if projection is used
        """
        B, T, E = x.size()
        H = self.heads
        assert E == self.emb, f'Input embedding dim ({E}) should match layer embedding dim ({self.emb})'
        S = E // H  # per-head size

        # Project to K, Q, V: (B,T,E)
        K = self.tokeys(x)
        Q = self.toqueries(x)
        V = self.tovalues(x)

        # Reshape to heads: (B,H,T,S)
        K = K.view(B, T, H, S).transpose(1, 2).contiguous()
        Q = Q.view(B, T, H, S).transpose(1, 2).contiguous()
        V = V.view(B, T, H, S).transpose(1, 2).contiguous()

        # Scaled dot-product attention: scores (B,H,T,T)
        # Scale by sqrt(S) — the standard Transformer choice
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(S)

        # Optional causal mask: disallow attending to future positions
        if self.mask:
            # mask shape (T,T), True where we want to mask (upper triangle)
            causal_mask = torch.triu(torch.ones(T, T, device=x.device, dtype=torch.bool), diagonal=1)
            scores = scores.masked_fill(causal_mask, float('-inf'))

        # Softmax over keys dimension, then dropout on weights
        attn = F.softmax(scores, dim=-1)
        attn = self.attn_drop(attn)

        # Weighted sum of values -> (B,H,T,S)
        out = torch.matmul(attn, V)

        # Merge heads: (B,T,E)
        out = out.transpose(1, 2).contiguous().view(B, T, E)

        # Mix heads, optional dropout
        out = self.unifyheads(out)
        out = self.proj_drop(out)

        # Optional final projection: (B,T,reduced_dim)
        if self.projection is not None:
            out = self.projection(out)

        return out

## tiedLinear

In [None]:
# ------------------------------------------------------------
# tiedLinear
# ------------------------------------------------------------
class tiedLinear(nn.Module):
    def __init__(self, in_features, bias=True):
        super(tiedLinear, self).__init__()
        self.in_features = in_features
        self.weight = Parameter(torch.Tensor(in_features))
        if bias:
            self.bias = Parameter(torch.Tensor(1))
        else:
            self.register_parameter('bias', None)
        self.reset_parameters()

    def reset_parameters(self):
        stdv = 1. / math.sqrt(self.weight.size(0))
        self.weight.data.uniform_(-stdv, stdv)
        if self.bias is not None:
            self.bias.data.uniform_(-stdv, stdv)

    def forward(self, input):
        repeated_weight = self.weight.repeat(self.in_features, 1)
        return F.linear(input, repeated_weight, self.bias)


## PositionalEncoding class

In [None]:
# -------------------------------------------------------------------
# PositionalEncoding
# -------------------------------------------------------------------
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


## Mask_block Class

In [None]:
# -------------------------------------------------------------------
# Mask_block
# -------------------------------------------------------------------
class Mask_block(nn.Module):
    """
    Learned token mask head with numerically stable logits:
      1) Feature -> Linear -> raw logits (per token)
      2) Per-sequence standardization: (raw - mean)/std
      3) Global learnable scale & bias
      4) Sigmoid -> (0,1) mask_out, then zero on pads

    Returns:
      mask_out: [B,T] in (0,1)
      p1:       [B]   sum of mask_out per sequence
      p3:       [B]   ((m*(1-m))^2).sum per sequence (peaks at 0.5)
      mean_mask: Python float (mask_out mean) for logging
    """

    def __init__(self, vocab_size, weight_matrix, hidden_dim, n_layers,
                 max_relative_position=2, pivot=0.5, drop_prob=0.5,
                 num_heads=1, reduced_dim=100, temperature=1.0,
                 clip_logits: bool = False, clip_value: float = 8.0):
        super().__init__()

        self.n_layers   = n_layers
        self.hidden_dim = hidden_dim
        self.num_heads  = num_heads
        self.pivot      = float(pivot)          # target initial coverage (pre-sigmoid bias)
        self.temperature= float(temperature)
        self.clip_logits= bool(clip_logits)
        self.clip_value = float(clip_value)

        # --- feature extractor (keep your existing attention) ---
        # Expectation: SelfAttention(..., reduced_dim=reduced_dim) -> [B,T,reduced_dim]
        self.attention1 = SelfAttention(emb=hidden_dim, heads=num_heads, reduced_dim=reduced_dim)
        self.dropout10  = nn.Dropout(0.1)

        # --- NEW: stabilize feature scale going into the head ---
        self.mask_norm  = nn.LayerNorm(reduced_dim, elementwise_affine=True)

        # --- projection to per-token raw logits (unconstrained) ---
        self.mask_head  = nn.Linear(reduced_dim, 1, bias=True)

        # --- NEW: learnable global scale & bias applied AFTER per-seq standardization ---
        # Start with unit scale and bias so sigmoid(bias) ~= pivot initially.
        #self.logit_scale = nn.Parameter(torch.tensor(1.0))                       # scales standardized logits
        #self.logit_bias  = nn.Parameter(torch.tensor(_logit(self.pivot)))        # shifts to desired coverage

        # Init: keep projection neutral so bias dominates early
        with torch.no_grad():
            self.mask_head.weight.zero_()
            self.mask_head.bias.zero_()  # handled by logit_bias

        # misc non-linearities kept (if you reuse them elsewhere)
        self.relu      = nn.ReLU()
        self.leakyrelu = nn.LeakyReLU()
        self.tanh      = nn.Tanh()

    def forward(self, embeds, mask_1, digits):
        """
        embeds: [B,T,E]
        mask_1: [B,T] bool (True for real tokens)
        digits: [B,1] (kept for signature; not used here)
        """
        # 1) features
        feats = self.attention1(embeds)          # [B,T,R]
        feats = self.dropout10(feats)
        feats = self.mask_norm(feats)            # stabilize scale

        # 2) raw logits (unbounded), then squeeze channel
        logits = self.mask_head(feats).squeeze(-1)  # [B,T]


        # 5) sigmoid with optional temperature
        if self.temperature != 1.0:
            mask = torch.sigmoid(logits / self.temperature)
        else:
            mask = torch.sigmoid(logits)                         # [B,T] in (0,1)

        # 6) zero-out pads but keep gradients
        m1f = mask_1.float()
        mask_out = mask * m1f                                    # [B,T]

        # --- regularizers compatible with your old code ---
        # p1: sum activation per sequence (you can divide by length if you prefer)
        p1 = mask_out.sum(dim=1)                                 # [B]
        # p3: pushes away from 0.5 (max penalty near 0.5)
        p3 = ((mask_out * (1.0 - mask_out)) ** 2).sum(dim=1)     # [B]

        mean_mask = float(mask_out.mean().detach().item())
        return mask_out, p1, p3, mean_mask


## Sentiment_block

In [None]:
# -------------------------------------------------------------------
# Sentiment_block
# -------------------------------------------------------------------
class Sentiment_block(nn.Module):

    def __init__(self, vocab_size, weight_matrix, hidden_dim, n_layers, max_relative_position= 2, pivot=0.5,drop_prob=0.5,num_heads=1, reduced_dim=100):
        """
        Initialize the model by setting up the layers.
        """
        super().__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        embedding_dim = hidden_dim
        # dropout layer
        self.dropout = nn.Dropout(drop_prob)
        self.dropout10 = nn.Dropout(0.1)
        # linear and sigmoid layers
        #self.fc = nn.Linear(embedding_dim, embedding_dim) #regular linear layer
        self.fc1 = tiedLinear(reduced_dim, 1) #tied linear layer
        self.fc3 = nn.Linear(1 + digits_dim, 1 )
        self.pos_enc = PositionalEncoding(d_model = embedding_dim)
        self.attention1 = SelfAttention(emb = embedding_dim, heads= self.num_heads, reduced_dim=reduced_dim)
        self.attention2 = SelfAttention(emb = embedding_dim, heads= self.num_heads, reduced_dim=reduced_dim)

    #    self.pos_attention = Pos_atten_1(hid_dim = embedding_dim, n_heads = self.num_heads )
      #  self.re_pos_attention = MultiHeadAttentionLayer(hid_dim = embedding_dim, n_heads = self.num_heads, max_relative_position = max_relative_position )
        self.fc2 = tiedLinear(reduced_dim)
       # self.fc12 = nn.Linear(embedding_dim, embedding_dim)
       # self.fc2 = nn.Linear(embedding_dim, 1)
        #self.fc2 = tiedLinear(embedding_dim, embedding_dim)
        self.relu = torch.nn.ReLU()
        self.leakyrelu = torch.nn.LeakyReLU()
        self.sig = nn.Sigmoid()
        self.tanh = nn.Tanh()

        self.layer_norm = nn.LayerNorm(reduced_dim)

    def forward(self, embeds ,mask_1 , mask, digits):
        """
        Perform a forward pass of our model on some input and hidden state.
        """
        batch_size = embeds.shape[0]

        embdes = embeds * mask.ge(0.001).unsqueeze(2)
     #   author = torch.argmax(digits[:8])
        #embeds = self.pos_enc(embeds)
       # embeds = self.fc12(embeds)
        embeds = self.attention1(embeds)
        embeds = self.layer_norm(embeds)
        embeds = self.dropout10(embeds)

#         embeds = self.attention2(embeds)
#         embeds = self.layer_norm(embeds)
#         embeds = self.dropout10(embeds)
        # pos = (pos*mask_1).int()
        # pos = self.pos_embed(pos)
        # x3 = self.attention1(pos)
      #  x2 = self.pos_attention(embeds, mask_1, pos)


     #   out = self.fc2(0*x2 + 1* x3)
        save_out0 = embeds
        # print('embeddings size:', embeds.size())
        out = self.fc2(embeds)
        # print('out1 size: ', out.size())
        save_out = out
        out  = torch.mean(out, 2) # average out embedding dim
        # print('output out size: ', out.size())
        #out = self.tanh(out/10)
        # return last sigmoid output and hidden state

        return out, save_out, save_out0


## Synthesizer class

In [None]:
# -------------------------------------------------------------------
# Synthesizer
# -------------------------------------------------------------------
class SynthesizerVIvec(nn.Module):
    """
    Bayesian head that consumes a vector sentence embedding [B, S] (+ optional digits).
    """
    def __init__(self, feature_dim, digits_dim=0, prior_sigma=1.0, dropout_p=0.1):
        super().__init__()
        self.feature_dim = feature_dim
        self.digits_dim  = int(digits_dim)
        self.dropout     = nn.Dropout(dropout_p)
        self.bayes_head  = BayesLinear(feature_dim + self.digits_dim, 1, prior_sigma=prior_sigma)

    def forward(self, h_vec, digits=None, sample=True):
        # h_vec: [B, S]
        x = self.dropout(h_vec)
        if digits is not None and self.digits_dim > 0:
            x = torch.cat([x, digits], dim=1)  # [B, S + Dg]
        logits = self.bayes_head(x, sample=sample).squeeze(-1)  # [B]
        probs  = torch.sigmoid(logits)
        return probs, logits

    def kl(self):
        return self.bayes_head.kl()

class Synthesizer(nn.Module):

    def __init__(self, vocab_size, weight_matrix, hidden_dim, n_layers, max_relative_position= 2, pivot=0.5,drop_prob=0.2,num_heads=1, reduced_dim=100):
        """
        Initialize the model by setting up the layers.
        """
        super().__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        # embedding
        # dropout layer
        self.dropout = nn.Dropout(drop_prob)
        self.fc2 = tiedLinear( hidden_dim)
        # linear and sigmoid layers
        #self.fc = nn.Linear(embedding_dim, embedding_dim) #regular linear layer
        self.fc1 = tiedLinear( hidden_dim, 1) #tied linear layer

       # self.pos_enc = PositionalEncoding(d_model = embedding_dim)
        self.attention = SelfAttention(emb =  hidden_dim, heads= self.num_heads, reduced_dim=reduced_dim)

       # self.re_pos_attention = MultiHeadAttentionLayer(hid_dim = embedding_dim, n_heads = self.num_heads, max_relative_position = max_relative_position )
        self.fc2 = tiedLinear( hidden_dim)
        self.fc3 = nn.Linear(1 + digits_dim, 1)
       # self.fc2 = nn.Linear(embedding_dim, 1)
        #self.fc2 = tiedLinear(embedding_dim, embedding_dim)
        self.relu = torch.nn.ReLU()
        self.leakyrelu = torch.nn.LeakyReLU()
        self.sig = nn.Sigmoid()
        self.tanh = nn.Tanh()


    def forward(self, sent, digits, mask, use_mask = False):

        #p2 = torch.norm(sent, p=2, dim=1)
      #  sent=self.tanh(sent)*5
        p2 = sent

        ###################
        '''
        for s in sent:
            print(s.size())
        for m in mask:
            print(m.size())
        '''
        ###################
      #  out = out * mask_1
        if use_mask == True:
            '''
            o1 = sent[0] * mask
            o2 = sent[1] * mask.unsqueeze(-1)
            o3 = sent[2] * mask.unsqueeze(-1)
            out = (o1, o2, o3)
            '''
            out = sent * mask
        else:
            out = sent

        save_out = out
     #   out = torch.multiply(out,mask)
        out = self.dropout(out)

        out = torch.mean(out,1)
        out = out.reshape((batch_size,1))

        #out  = torch.cat((out, digits), dim=1)
       # out  = self.fc3(out)
        #out = self.tanh(out)
        out = torch.mean(out,1)
        # sigmoid function
        sig_out = self.sig(out)
        # reshape to be batch_size first
        sig_out = sig_out.view(batch_size, -1)
        sig_out = sig_out[:, -1] # get last batch of labels

        # return last sigmoid output and hidden state
        return sig_out, p2, save_out


## Embeds Class

In [None]:
# -------------------------------------------------------------------
# Embeds
# -------------------------------------------------------------------
class Embeds(nn.Module):

    def __init__(self, vocab_size, weight_matrix, hidden_dim):
        """
        Initialize the model by setting up the layers.
        """
        super().__init__()

        self.hidden_dim = hidden_dim
        # embedding False = trainable
        self.embedding, num_embeddings, embedding_dim =  create_emb_layer(weight_matrix, False)
       # self.embedding = torch.nn.Embedding(vocab_size, hidden_dim)
        self.pos_enc = PositionalEncoding(d_model = hidden_dim)

    def forward(self, x):
        """
        Perform a forward pass of our model on some input and hidden state.
        """
        batch_size = x.size(0)
      #  mask = torch.transpose(mask,0,1)
        mask_1 = x.ge(0.1)
        #beta
       # x = (x * mask).int()
        embeds = self.embedding(x)
    #    embeds = self.pos_enc(embeds)
        # return last sigmoid output and hidden state
        return embeds, mask_1