In [2]:
#load data
import pandas as pd
def load_data(csv_file, sample_fraction=1.0):
    """Loads Question Pairs from a CSV file

    Args:
        csv_file (str): Path to csv_file
        sample_fraction (float): Fraction of data to sample, default is 1.0
    Returns:
        tuple: A tuple containing supervised data pairs
        returns [],[] on error
    """
    sentences1 = []
    sentences2 = []
    is_duplicate = []
    
    try:
        df = pd.read_csv(csv_file, encoding='utf-8')
        #print("Column names:", df.columns)
        
        if sample_fraction < 1.0:
            df = df.sample(frac=sample_fraction, random_state=42).reset_index(drop=True)
        
        sentence1_series = df['question1']
        sentence2_series = df['question2']
        is_duplicate_series = df['is_duplicate']
        
        sentences1 = sentence1_series.tolist()
        sentences2 = sentence2_series.tolist()
        is_duplicate = is_duplicate_series.tolist()
        
        if len(sentences1) != len(sentences2):
            raise ValueError("The number of sentences in question1 and question2 do not match.")
        else:
            print(f"Loaded {len(sentences1)} sentences.")
        return sentences1, sentences2, is_duplicate
        
    except FileNotFoundError:
        print("Wrong Path")
        return [],[],[]
    
    except Exception as e:
        print(f"An {e} Error Occurred")
        return [],[],[]

DATA_PATH = r'C:/Users/Jash\Documents/Research\Semantic Equivilance\SemanticEquivilance/question_pairs/questions.csv'
sentences1, sentences2, value = load_data(DATA_PATH, sample_fraction=0.001)


Loaded 404 sentences.


In [None]:
import pennylane as qml
import numpy as np
from lambeq import AtomicType, BobcatParser, IQPAnsatz, PennyLaneModel, Symbol
from discopy.quantum import Circuit

def get_circuit_state_vector(qml_circuit_func, num_wires, params=None):
    """
    Executes a PennyLane QNode and returns the state vector.

    Args:
        qml_circuit_func (callable): The PennyLane QNode function.
        num_wires (int): The number of wires in the circuit.
        params (dict, optional): A dictionary of parameters for the QNode. Defaults to None.

    Returns:
        np.ndarray: The state vector produced by the circuit.
    """
    dev = qml.device("default.qubit", wires=num_wires)

    @qml.qnode(dev)
    def state_vector_circuit():
        if params is not None:
            qml_circuit_func(params)
        else:
            qml_circuit_func()
        return qml.state()

    return state_vector_circuit()


def swap_test(state1_vec, state2_vec):
    """
    Implements the Swap Test to estimate the overlap between two quantum states.

    Args:
        state1_vec (np.ndarray): The state vector of the first quantum state.
        state2_vec (np.ndarray): The state vector of the second quantum state.

    Returns:
        float: The estimated squared overlap |<psi|phi>|^2.
    """
    # Determine the number of qubits required for each state
    num_qubits = int(np.log2(len(state1_vec)))
    if 2**num_qubits != len(state1_vec):
        raise ValueError("State vectors must have a length that is a power of 2.")

    # Total qubits: 1 for ancilla + num_qubits for state1 + num_qubits for state2
    total_qubits = 1 + 2 * num_qubits

    # Define the device for the swap test
    dev = qml.device("default.qubit", wires=total_qubits, shots=1000)

    @qml.qnode(dev)
    def circuit():
        # Step 1: Initialize the ancilla qubit in a superposition
        qml.Hadamard(wires=0)

        # Step 2: Encode the two states onto their respective wire registers
        # Wires 1 to num_qubits for state1
        qml.StatePrep(state1_vec, wires=range(1, 1 + num_qubits))
        # Wires (1 + num_qubits) to (1 + 2 * num_qubits) for state2
        qml.StatePrep(state2_vec, wires=range(1 + num_qubits, 1 + 2 * num_qubits))

        # Step 3: Apply controlled-SWAP operations
        # The ancilla (wire 0) controls the SWAP between corresponding qubits of state1 and state2
        for i in range(num_qubits):
            qml.CSWAP(wires=[0, 1 + i, 1 + num_qubits + i])

        # Step 4: Apply Hadamard to the ancilla qubit
        qml.Hadamard(wires=0)

        # Step 5: Measure the ancilla qubit
        return qml.sample(wires=0)

    # Execute the circuit
    measurement_results = circuit()

    # Calculate the probability of measuring 0 on the ancilla qubit
    prob_0 = np.sum(measurement_results == 0) / len(measurement_results)

    # The probability of measuring 0 on the ancilla qubit is P(0) = 0.5 * (1 + |<psi|phi>|^2)
    # Therefore, |<psi|phi>|^2 = 2 * P(0) - 1
    squared_overlap = 2 * prob_0 - 1

    return squared_overlap

def lambeq_sentence_to_state_vector(sentence, ansatz, parser):
    """
    Converts a sentence to a quantum circuit using lambeq and returns its state vector.

    Args:
        sentence (str): The input sentence.
        ansatz (lambeq.ansatz.Ansatz): The ansatz to convert the diagram to a circuit.
        parser (lambeq.parser.Parser): The parser to convert the sentence to a diagram.

    Returns:
        np.ndarray: The state vector generated by the lambeq circuit.
        int: The number of qubits in the generated circuit.
    """
    diagram = parser.sentence2diagram(sentence)
    circuit = ansatz(diagram)

    # lambeq circuits often contain sympy Symbols as parameters.
    # We need to assign numerical values to these for PennyLane.
    # For a simple overlap test, we can just set them to 0 or random values.
    # In a real QNLP application, these would be trained parameters.

    # Extract free symbols (parameters) from the circuit
    free_symbols = sorted(list(circuit.free_symbols), key=str)

    # Create a PennyLane model to get the state vector
    # We need to explicitly define the QNode for PennyLane to get the state vector.
    # lambeq's PennyLaneModel often returns probabilities, so we'll build a custom QNode.

    num_qubits = len(circuit.dom)

    if num_qubits == 0:
        print(f"Warning: {sentence} resulted in a 0-qubit circuit. No state vector can be generated.")
        return np.array([1,0]), 1 #Default to |0> for a single qubit 
    if not free_symbols:
        # If there are no parameters, the circuit is fixed.
        # We can just convert the DisCoPy circuit to a PennyLane QNode directly.
        def qml_circuit_fixed():
            qml.from_discopy(circuit)
        state_vec = get_circuit_state_vector(qml_circuit_fixed, num_qubits)
    else:
        # If there are parameters, we need to provide a parameter mapping.
        # For demonstration, we'll map them to arbitrary values (e.g., 0).
        subs_map = [(sym, 0.0) for sym in free_symbols]

        # Substitute parameters in the DisCoPy circuit
        bound_circuit = circuit.subs(subs_map)

        def qml_circuit_parametric():
            qml.from_discopy(bound_circuit) # This will apply the operations from the bound circuit
        state_vec = get_circuit_state_vector(qml_circuit_parametric, num_qubits)

    # Normalize the state vector if it's not already
    state_vec = state_vec / np.linalg.norm(state_vec)

    return state_vec, num_qubits

# --- Main execution ---
if __name__ == "__main__":
    # Initialize lambeq components
    parser = BobcatParser()
    # IQPAnsatz is a common choice for QNLP
    # The AtomicType.NOUN: 1 means that each noun will be represented by 1 qubit.
    # AtomicType.SENTENCE: 1 means the output sentence type will be 1 qubit.
    ansatz = IQPAnsatz({AtomicType.NOUN: 1, AtomicType.SENTENCE: 1}, n_layers=1)

    print("--- Generating states from sentences ---")

    # Sentence 1
    sentence1 = "Alice loves Bob."
    state1_vec, num_qubits1 = lambeq_sentence_to_state_vector(sentence1, ansatz, parser)
    print(f"Sentence 1: '{sentence1}'")
    print(f"Number of qubits for state 1: {num_qubits1}")
    print(f"State vector 1 shape: {state1_vec.shape}")

    # Sentence 2 (similar meaning to sentence 1)
    sentence2 = "Bob loves Alice."
    state2_vec, num_qubits2 = lambeq_sentence_to_state_vector(sentence2, ansatz, parser)
    print(f"\nSentence 2: '{sentence2}'")
    print(f"Number of qubits for state 2: {num_qubits2}")
    print(f"State vector 2 shape: {state2_vec.shape}")

    # Sentence 3 (different meaning)
    sentence3 = "The cat sits."
    state3_vec, num_qubits3 = lambeq_sentence_to_state_vector(sentence3, ansatz, parser)
    print(f"\nSentence 3: '{sentence3}'")
    print(f"Number of qubits for state 3: {num_qubits3}")
    print(f"State vector 3 shape: {state3_vec.shape}")

    # Ensure states have the same number of qubits for the Swap Test
    if num_qubits1 != num_qubits2 or num_qubits1 != num_qubits3:
        # This is a common issue when different sentences produce circuits with different numbers of wires.
        # lambeq's IQPAnsatz generally tries to keep the number of qubits consistent for a given AtomicType mapping.
        # However, complex sentences or different ansaetze might lead to varying qubit counts.
        # For the Swap Test, the input states MUST have the same number of qubits.
        print("\nWarning: Sentences resulted in circuits with different numbers of qubits.")
        print("Swap Test requires states to have the same number of qubits.")
        print("Please choose sentences that result in the same number of output qubits.")
        print("For instance, using very simple sentences or ensuring the 'n_single_qubit_params' and 'n_layers' are carefully chosen.")
        # You might need to pad the smaller state vector with zeros if the meaning allows for it,
        # or use a different ansatz/parser that guarantees consistent qubit counts.
    else:
        print("\n--- Performing Swap Tests ---")

        # Swap Test between similar sentences
        print(f"Swap Test between '{sentence1}' and '{sentence2}':")
        overlap_sim = swap_test(state1_vec, state2_vec)
        print(f"Estimated squared overlap: {overlap_sim:.4f}")
        # Expect overlap to be relatively high for semantically similar sentences.

        # Swap Test between dissimilar sentences
        print(f"\nSwap Test between '{sentence1}' and '{sentence3}':")
        overlap_dissim = swap_test(state1_vec, state3_vec)
        print(f"Estimated squared overlap: {overlap_dissim:.4f}")
        # Expect overlap to be relatively low for semantically dissimilar sentences.

        # Swap Test between identical circuits (should be close to 1)
        print(f"\nSwap Test between '{sentence1}' and '{sentence1}':")
        overlap_identical = swap_test(state1_vec, state1_vec)
        print(f"Estimated squared overlap: {overlap_identical:.4f}")

--- Generating states from sentences ---
Sentence 1: 'Alice loves Bob.'
Number of qubits for state 1: 1
State vector 1 shape: (2,)

Sentence 2: 'Bob loves Alice.'
Number of qubits for state 2: 1
State vector 2 shape: (2,)

Sentence 3: 'The cat sits.'
Number of qubits for state 3: 1
State vector 3 shape: (2,)

--- Performing Swap Tests ---
Swap Test between 'Alice loves Bob.' and 'Bob loves Alice.':


AttributeError: module 'pennylane' has no attribute 'QubitStateVector'

In [24]:
from lambeq import BobcatParser, SpacyTokeniser, Rewriter, AtomicType, IQPAnsatz
from lambeq.backend.grammar import Diagram as grammatical_diagram
from lambeq.backend.quantum import Diagram as quantum_circuit
from typing import Optional
import os, time, multiprocessing
os.environ["TOKENIZERS_PARALLELISM"] = "true" #environment variable for multithreading

#Global data sequencing variables
num_processes = multiprocessing.cpu_count()
print(f"Using {num_processes} processes.")

_tokenizer = None
_parser = None
_rewriter = None
_ansatz = None

def _initializer():
    global _tokenizer, _parser, _rewriter, _ansatz
    _tokenizer = SpacyTokeniser()  # Initialize tokenizer
    _parser = BobcatParser(verbose="suppress")  # Initialize parser 
    _rewriter = Rewriter(['prepositional_phrase', 'determiner'])  # Initialize rewriter
    _ansatz = IQPAnsatz({AtomicType.NOUN: 1, AtomicType.SENTENCE: 1}, n_layers=2, n_single_qubit_params=3)  # Initialize ansatz


Using 12 processes.


In [None]:
def process_data(sentence: str, tokeniser, parser, rewriter, ansatz) -> Optional[grammatical_diagram]:
    """Process a single sentence to a diagram.

    Args:
        sentence (str): Sentence to be converted to a diagram.

    Returns:
        Optional[quantum_circuit]: Either returns a diagram or None if an error occurs.
    """
    try:
        sentence = sentence.strip().lower()
        tokens = tokeniser.tokenise_sentence(sentence)
        diagram = parser.sentence2diagram(tokens, tokenised=True) #remove tokenization? might remove some errors 
        if diagram is not None:
            diagram = rewriter(diagram)
            normalised_diagram = diagram.normal_form()
            curry_functor = Rewriter(['curry'])
            curried_diagram = curry_functor(normalised_diagram)
            circuit = ansatz(curried_diagram)
            return circuit
        else:
            return None
    except Exception as e:
        print(f"Error processing sentence {sentence}")
        return None
def _process_data_for_pool(sentence: str) -> Optional[grammatical_diagram]:
    """Process a single sentence for the multiprocessing pool."""
    return process_data(sentence, _tokenizer, _parser, _rewriter, _ansatz)

def process_sentences(sentences: list[str]) -> list[Optional[grammatical_diagram]]:
    """Process sentences in parallel using multiprocessing.

    Args:
        sentences (list[str]): List of sentences to be processed.

    Returns:
        list[Optional[quantum_circuit]]: List of processed diagrams or None for errors.
    """
    start_time = time.time()
    batch_size = 50
    with multiprocessing.Pool(processes=num_processes, initializer=_initializer) as pool:
        for i in range(0, len(sentences), batch_size):
            batch = sentences[i:i + batch_size]
            print(f"Processing batch {i // batch_size + 1} with {len(batch)} sentences.")
            current_batch = sentences[i:i + batch_size]
            
            batch_results = pool.map(_process_data_for_pool, current_batch)
        # Collect results from all batches
        #results = pool.map(_process_data_for_pool, sentences)
        end_time = time.time()
        print(f"Processed {len(sentences)} sentences in {end_time - start_time:.4f} seconds.")
    return batch_results