# Hierarchical Quantum Text Encoding Pipeline (notebook)

## Imports and Logging Setup

In [None]:
import nltk
nltk.download('punkt', quiet=True)

import logging
import warnings
import numpy as np
import nltk
from nltk.tokenize import word_tokenize

# Hugging Face Transformers for pre-trained embeddings
from transformers import AutoTokenizer, AutoModel

# Dimensionality reduction
from sklearn.decomposition import PCA

# Qiskit imports
from qiskit import QuantumCircuit
#from qiskit.providers.fake_provider import FakeAthens,  USE WHEN YOU RUNOUT OF IBM JOB TIME
from qiskit.primitives import Sampler
from qiskit.circuit import Parameter
from qiskit.circuit.library import RZGate, CRZGate

warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

logging.basicConfig(level=logging.INFO)


: 

## Transformer-Based Embedding Extraction

In [16]:
class TransformerEmbedder:
    """
    Uses a pre-trained Transformer (e.g., BERT, DistilBERT) from Hugging Face
    to produce embeddings for tokens or entire sequences.
    
    By default, this class will:
      1. Tokenize the input text with the model's tokenizer.
      2. Run the model to get hidden states.
      3. Return token-level embeddings (e.g., the final layer's CLS or average).
    """

    def __init__(self, model_name="distilbert-base-uncased", device="cpu"):
        """
        :param model_name: A valid Hugging Face model name.
        :param device: 'cpu' or 'cuda' if GPU is available.
        """
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.model.eval()
        self.device = device
        self.model.to(self.device)

    def tokenize(self, text: str):
        """
        Basic text tokenization using NLTK (optional) plus the model's own tokenizer.
        Note that the transformer tokenizer itself can handle subwords and special tokens.
        """
        # For demonstration, we combine a simple NLTK tokenize with HF tokenization.
        # Usually you'd just rely on self.tokenizer directly.
        tokens_nltk = word_tokenize(text.lower())
        return tokens_nltk

    def get_token_embeddings(self, text: str):
        """
        Returns an array of shape (num_subword_tokens, hidden_size),
        i.e., embeddings for each subword token in the text.
        
        You could also:
         - Return only the [CLS] embedding.
         - Average all subword embeddings per token or per sentence.
        """
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=128)
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        with np.no_grad():
            outputs = self.model(**inputs)
            # Let's take the last hidden state:
            last_hidden_state = outputs.last_hidden_state  # (batch_size=1, seq_len, hidden_dim)
        # Convert to numpy
        embeddings = last_hidden_state.squeeze(0).detach().cpu().numpy()
        return embeddings


## Dimensionality Reduction

In [17]:
class DimensionalityReducer:
    """
    PCA-based approach to reduce embeddings to a dimension = 2^n_qubits (when possible),
    enabling direct amplitude encoding.
    """
    def __init__(self, target_power=2):
        """
        :param target_power: target dimension = 2^(target_power).
                             e.g. target_power=2 => dimension=4
        """
        self.target_dim = 2 ** target_power
        self.pca = PCA(n_components=self.target_dim)

    def fit_transform(self, embeddings):
        """
        :param embeddings: np.array of shape (num_tokens, embed_dim)
        :return: np.array of shape (num_tokens, target_dim)
        If num_tokens < target_dim, PCA adjusts automatically.
        """
        num_samples = embeddings.shape[0]
        if num_samples < self.target_dim:
            self.pca.n_components = max(1, num_samples)
            logging.warning(f"Reduced PCA dimension to {self.pca.n_components} due to sample size.")
        reduced = self.pca.fit_transform(embeddings)
        return reduced


## Quantum Encoder (Amplitude + Sinusoidal Positional Encoding)

In [18]:
class QuantumEncoder:
    """
    Encodes a single vector (dimension 2^n_qubits) into an n_qubit amplitude-encoded state.
    Adds sinusoidal-based phase shifts to capture positional information.
    """
    def __init__(self, n_qubits):
        self.n_qubits = n_qubits
        self.backend = FakeAthens()  # a mock backend for demonstration
        self.sampler = Sampler()

    def amplitude_encode(self, vector):
        """
        Pads/truncates 'vector' to length 2^n_qubits, then normalizes.
        """
        target_len = 2 ** self.n_qubits
        if len(vector) < target_len:
            tmp = np.zeros(target_len)
            tmp[: len(vector)] = vector
            vector = tmp
        elif len(vector) > target_len:
            vector = vector[:target_len]

        norm = np.linalg.norm(vector)
        if norm < 1e-9:
            # fallback uniform
            vector = np.ones(target_len) / np.sqrt(target_len)
        else:
            vector /= norm
        return vector

    def prepare_state(self, vector):
        """
        Builds a circuit that amplitude-encodes the vector using 'initialize()'.
        For large n_qubits, consider more efficient state-prep methods.
        """
        qc = QuantumCircuit(self.n_qubits)
        encoded_vec = self.amplitude_encode(vector)
        qc.initialize(encoded_vec, range(self.n_qubits))
        return qc

    def sinusoidal_position_encoding(self, qc, position):
        """
        Applies a set of RZ gates whose angles are derived from sin/cos of 'position'
        to mimic Transformer-like positional embedding in the phase.
        """
        for q_idx in range(self.n_qubits):
            angle = np.sin(position * 0.1 + q_idx) + np.cos(position * 0.05 + q_idx)
            angle *= (np.pi / 4.0)
            qc.rz(angle, q_idx)

    def encode_with_position(self, vector, position):
        """
        :return: A QuantumCircuit encoding 'vector' + position-based phase shifts.
        """
        qc = self.prepare_state(vector)
        self.sinusoidal_position_encoding(qc, position)
        return qc


## Parametric Entangling Block + Hierarchical Encoder

In [19]:
class ParametricEntangler:
    """
    Demonstrates a parametric entangling pattern (CRZ) among qubits in a block.
    The parameters can be learned or randomly assigned. 
    """
    def __init__(self, n_qubits):
        self.n_qubits = n_qubits
        # One parameter per pair of adjacent qubits
        self.params = [Parameter(f"theta_{i}_{i+1}") for i in range(n_qubits - 1)]

    def apply(self, qc: QuantumCircuit, param_values=None):
        """
        Applies CRZ gates between adjacent qubits.
        :param param_values: array or dict of parameter values. 
                             If None, use random values or a default.
        """
        if param_values is None:
            param_values = [np.random.uniform(0, np.pi / 2) for _ in self.params]

        for i in range(self.n_qubits - 1):
            angle = param_values[i]
            crz_gate = CRZGate(self.params[i])
            qc.append(crz_gate, [i, i+1])
            # Bind numeric angle
            qc = qc.assign_parameters({self.params[i]: angle}, inplace=False)

        return qc


class HierarchicalQuantumEncoder:
    """
    Organizes embeddings into groups -> builds amplitude-encoded subcircuits 
    -> optionally merges groups at multiple levels -> parametric entangling between groups.
    """
    def __init__(self, base_encoder: QuantumEncoder, group_size=2, multi_level=True):
        self.base_encoder = base_encoder
        self.n_qubits = base_encoder.n_qubits
        self.group_size = group_size
        self.multi_level = multi_level
        self.entangler = ParametricEntangler(n_qubits=self.n_qubits)
        self.sampler = base_encoder.sampler

    def _merge_vectors(self, vectors):
        """
        Merge the vectors in a single group. 
        E.g., a simple average or a more advanced approach.
        """
        if len(vectors) == 0:
            return None
        return np.mean(vectors, axis=0)

    def build_group_circuit(self, group_vectors, group_positions):
        """
        Merges the group into a single vector, 
        amplitude-encodes it, adds positional encoding based on average position.
        """
        merged_vec = self._merge_vectors(group_vectors)
        avg_pos = np.mean(group_positions)
        sub_qc = self.base_encoder.encode_with_position(merged_vec, avg_pos)
        return sub_qc

    def build_level_circuit(self, vectors, positions):
        """
        1) Partition embeddings into groups.
        2) For each group, build subcircuit.
        3) Combine subcircuits into a single circuit.
        4) Entangle the group-circuits in a chain using parametric CRZ gates.
        """
        group_circuits = []
        for start_idx in range(0, len(vectors), self.group_size):
            end_idx = start_idx + self.group_size
            sub_vecs = vectors[start_idx:end_idx]
            sub_pos = positions[start_idx:end_idx]
            qc_group = self.build_group_circuit(sub_vecs, sub_pos)
            group_circuits.append(qc_group)

        num_groups = len(group_circuits)
        total_qubits = num_groups * self.n_qubits
        level_qc = QuantumCircuit(total_qubits)

        # Place each group subcircuit in a block
        for i, sub_qc in enumerate(group_circuits):
            offset = i * self.n_qubits
            level_qc.compose(sub_qc, qubits=range(offset, offset+self.n_qubits), inplace=True)

        # Entangle consecutive groups
        for g_idx in range(num_groups - 1):
            ctrl_q = (g_idx + 1)*self.n_qubits - 1
            targ_q = (g_idx + 1)*self.n_qubits
            # We'll pick random angles or a small array
            ent_angles = [np.random.uniform(0, np.pi/2) for _ in range(self.n_qubits - 1)]
            # parametric entangling block on the boundary [ctrl_q, targ_q]
            # For simplicity, we only apply it to that boundary (like a single CRZ).
            # Or you can do a small loop if you'd like more gates.
            # We'll reuse the first param if it's simpler.
            crz_param_value = ent_angles[0]
            crz_gate = CRZGate(self.entangler.params[0])
            level_qc.append(crz_gate, [ctrl_q, targ_q])
            level_qc = level_qc.assign_parameters({self.entangler.params[0]: crz_param_value}, inplace=False)

        return level_qc, group_circuits

    def encode_multilevel(self, vectors, positions):
        """
        Builds a multi-level circuit. For demonstration, we do at most two levels.
        If multi_level is False or there's only one group, we finalize at one level.
        """
        # ----------------------  First Level  ----------------------
        level1_qc, subcircuits = self.build_level_circuit(vectors, positions)
        if (not self.multi_level) or len(subcircuits) <= 1:
            level1_qc.measure_all()
            result = self.sampler.run(level1_qc)
            quasi = result.result().quasi_dists[0]
            probs = np.array([quasi.get(i, 0.0) for i in range(2**level1_qc.num_qubits)])
            probs = probs / np.sum(probs) if np.sum(probs) > 0 else probs
            return probs

        # ----------------------  Second Level  ----------------------
        # For demonstration, we'll re-construct vectors from each group.
        # In a real system, you'd store the merged vectors from build_group_circuit.
        # We'll create placeholders for level-2 grouping.
        group_level_vectors = []
        group_positions = []
        for i, _ in enumerate(subcircuits):
            # placeholder, e.g. a single "spike" vector
            v = np.zeros(2**self.n_qubits)
            v[0] = 1.0
            group_level_vectors.append(v)
            group_positions.append(i)

        level2_qc, _ = self.build_level_circuit(group_level_vectors, group_positions)
        level2_qc.measure_all()
        result = self.sampler.run(level2_qc)
        quasi = result.result().quasi_dists[0]
        probs = np.array([quasi.get(i, 0.0) for i in range(2**level2_qc.num_qubits)])
        probs = probs / np.sum(probs) if np.sum(probs) > 0 else probs
        return probs


## End-to-End Demo Function and Example Usage

In [None]:
pip install torch torchvision torchaudio 

Note: you may need to restart the kernel to use updated packages.


In [1]:
import torch
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())


ModuleNotFoundError: No module named 'torch'

In [20]:
def demo_transformer_based_encoding(
    text: str,
    hf_model_name="distilbert-base-uncased",
    pca_power=2,            # 2^2 = 4 dims after PCA
    n_qubits=2,             # must match pca_power => amplitude encoding dimension=4
    group_size=2,
    multi_level=True
):
    """
    1. Extract embeddings from a Hugging Face Transformer.
    2. Reduce dimensionality to 2^n_qubits via PCA.
    3. Encode them hierarchically in a quantum circuit.
    4. Return final probability distribution.
    """

    logging.info(f"Loading Transformer model '{hf_model_name}' for embeddings...")
    embedder = TransformerEmbedder(model_name=hf_model_name)
    tokens = embedder.tokenize(text)
    logging.info(f"Tokens: {tokens}")

    # Step 1: Get subword embeddings from the transformer
    embeddings = embedder.get_token_embeddings(text)  # shape: (seq_len, hidden_dim)
    logging.info(f"Raw embedding shape: {embeddings.shape}")

    # Step 2: PCA to 2^pca_power
    reducer = DimensionalityReducer(target_power=pca_power)
    reduced_vectors = reducer.fit_transform(embeddings)
    logging.info(f"Reduced embedding shape: {reduced_vectors.shape}")

    # Step 3: Hierarchical Quantum Encoding
    base_encoder = QuantumEncoder(n_qubits=n_qubits)
    hierarchical_encoder = HierarchicalQuantumEncoder(
        base_encoder=base_encoder,
        group_size=group_size,
        multi_level=multi_level
    )

    positions = list(range(len(reduced_vectors)))  # simple position index
    final_probs = hierarchical_encoder.encode_multilevel(reduced_vectors, positions)
    return final_probs


# Example usage
if __name__ == "__main__":
    sample_texts = [
        "Quantum computing in modern machine learning research",
        "Artificial intelligence uses deep neural networks",
        "Data science and big data analytics revolution"
    ]

    for txt in sample_texts:
        print("=======================================================")
        print(f"Encoding text: {txt}")
        distribution = demo_transformer_based_encoding(
            text=txt,
            hf_model_name="distilbert-base-uncased",
            pca_power=2,    # => dimension=4 after PCA
            n_qubits=2,     # => amplitude-encode 4D vectors
            group_size=2,
            multi_level=True
        )
        print(f"Final distribution (size={len(distribution)}):\n{distribution}\n")


INFO:root:Loading Transformer model 'distilbert-base-uncased' for embeddings...


Encoding text: Quantum computing in modern machine learning research


ImportError: 
AutoModel requires the PyTorch library but it was not found in your environment. Checkout the instructions on the
installation page: https://pytorch.org/get-started/locally/ and follow the ones that match your environment.
Please note that you may need to restart your runtime after installation.
