In [1]:
!git clone https://github.com/Jithin-Veeragandham/2025-Moodys
!python -m pip install -U giotto-tda
!pip install qiskit-ibm-runtime
!pip install qiskit
!pip install gudhi
!pip install qbraid


fatal: destination path '2025-Moodys' already exists and is not an empty directory.


In [2]:
import pandas as pd
import numpy as np

time_series = pd.read_csv(r"sp500_full.csv", header=None)
time_series = np.log(time_series[1][150:300]).to_numpy().squeeze()
time_series.shape

(150,)

In [3]:
import numpy as np

def takens_embedding(time_series, N, d):

    L = len(time_series)
    Z = []

    for t in range(L - (N - 1) * d):
        z_t = [time_series[t + i * d] for i in range(N)]
        Z.append(z_t)

    return np.array(Z)

def sliding_window(point_cloud, w):

    K = len(point_cloud) - w + 1
    windows = []

    for t in range(K):
        Z_t = point_cloud[t:t + w]
        windows.append(Z_t)

    return windows




def generate_boundary_operator(simplex_tree, k):
    """
    Generates the boundary operator matrix ∂_k for a simplicial complex represented by a simplex tree.

    Args:
        simplex_tree: A simplex tree data structure containing ordered simplices.
        k (int): The dimension of the simplices for which to compute the boundary operator.

    Returns:
        np.ndarray: A sparse matrix where entry [i][j] is the coefficient of the (k-1)-simplex i
                    in the boundary of the k-simplex j.
    """
    # Edge case: boundary of 0-simplices is trivial
    if k == 0:
        return np.zeros((0, len(simplex_tree[k])))  # Empty matrix

    # Extract ordered lists of k-simplices and (k-1)-simplices
    k_simplices = simplex_tree[k]    # Columns of the matrix
    km1_simplices = simplex_tree[k-1] # Rows of the matrix

    # Create a dictionary to map (k-1)-simplices to their row indices
    km1_simplex_to_index = {tuple(s): idx for idx, s in enumerate(km1_simplices)}

    # Initialize the boundary matrix with zeros
    boundary_matrix = np.zeros((len(km1_simplices), len(k_simplices)), dtype=int)

    # Iterate over each k-simplex (column j)
    for j, s_k in enumerate(k_simplices):
        # Remove each vertex at position t and compute the boundary
        for t in range(len(s_k)):
            # Generate the (k-1)-simplex by removing vertex t
            s_km1 = tuple(s_k[:t] + s_k[t+1:])

            # Get the row index for this (k-1)-simplex
            if s_km1 in km1_simplex_to_index:
                i = km1_simplex_to_index[s_km1]
                # Assign coefficient (-1)^t to matrix entry [i][j]
                boundary_matrix[i][j] = (-1) ** t

    return boundary_matrix

def generate_k_laplasian_matrix(grouped_simplices_per_filtration,k,i_simplice):
    simplextree_zero  = grouped_simplices_per_filtration[i_simplice]
    boundary_1=generate_boundary_operator(simplextree_zero, k)
    partial_1=np.dot(boundary_1.T,boundary_1)
    if len(simplextree_zero)==2:
      return partial_1
    boundary_2=generate_boundary_operator(simplextree_zero, k+1)
    partial_2=np.dot(boundary_2,boundary_2.T)
    delta1=partial_1+partial_2
    return delta1

def is_power_of_two(n):
    """Check if n is a power of two."""
    return n != 0 and (n & (n - 1)) == 0

def next_power_of_two(n):
    """Find the smallest power of two greater than or equal to n."""
    if n == 0:
        return 1
    return 1 << (n - 1).bit_length()

def pad_matrix_to_power_of_two(matrix):

    n = len(matrix)
    # Validate the matrix is square
    for row in matrix:
        if len(row) != n:
            raise ValueError("Matrix must be square.")

    # Calculate the next power of two and its exponent k
    m = next_power_of_two(n)
    k = m.bit_length() - 1  # Since m is 2^k

    # Create a new matrix filled with zeros
    padded_matrix = [[0] * m for _ in range(m)]

    # Copy the original matrix into the top-left corner
    for i in range(n):
        for j in range(n):
            padded_matrix[i][j] = matrix[i][j]

    # Fill the new diagonal elements with k
    for i in range(n, m):
        padded_matrix[i][i] = k

    return padded_matrix




In [4]:
#step 1
def generate_point_cloud(time_series, N, d, w):

    Z = takens_embedding(time_series, N, d)
    point_cloud = sliding_window(Z, w)

    return point_cloud





In [5]:
!pip install qiskit qiskit-ionq
from qbraid.runtime import QiskitRuntimeProvider
from qiskit import transpile
from qiskit_ibm_runtime import Sampler

provider = QiskitRuntimeProvider("c7aa35798b4a69664d1529bc5e8ace456e2287509a9ce9631089c37672c4297570128da37f222d875c3dc12b9de9cc48ac9728785ed9fa5dd636839851c671df")

print(provider.get_devices())

device = provider.get_device('ibm_kyiv')

type(device)
# qbraid.runtime.ibm.device.QiskitBackend

device.metadata()


[<qbraid.runtime.ibm.device.QiskitBackend('ibm_brisbane')>, <qbraid.runtime.ibm.device.QiskitBackend('ibm_kyiv')>, <qbraid.runtime.ibm.device.QiskitBackend('ibm_sherbrooke')>]


{'device_id': 'ibm_kyiv',
 'simulator': False,
 'num_qubits': 127,
 'provider_name': 'IBM',
 'basis_gates': None,
 'local': False,
 'instance': 'ibm-q/open/main',
 'max_shots': 100000,
 'queue_depth': 2,
 'status': 'ONLINE',
 'paradigm': None,
 'runtime_config': {'target_ir': 'qiskit',
  'conversion_scheme': {'conversion_graph': None,
   'max_path_attempts': 3,
   'max_path_depth': None},
  'options': {'transpile': True,
   'transform': True,
   'validate': 2,
   'prepare': True,
   'pass_manager': None}}}

In [6]:
 #step 2
def bitstring_to_phase(bitstring):
  return int(bitstring, 2) / (2 ** len(bitstring))
def generate_simplex_groups(point_cloud,epsilon):
  grouped_simplices_per_filtration = []

  for cloud in point_cloud:
      rips_complex = gudhi.RipsComplex(points=cloud, max_edge_length=epsilon)
      simplex_tree = rips_complex.create_simplex_tree(max_dimension=2)

      filtration = simplex_tree.get_filtration()
      grouped_simplices = {}

      for simplex, _ in filtration:
          dim = len(simplex) - 1  # Dimension of the simplex
          if dim not in grouped_simplices:
              grouped_simplices[dim] = []
          grouped_simplices[dim].append(simplex)

      # Convert dictionary to sorted list of lists (by dimension)
      grouped_simplices_per_filtration.append([grouped_simplices[dim] for dim in sorted(grouped_simplices.keys())])

  # Output the list of lists for each filtration
  # for idx, filtration in enumerate(grouped_simplices_per_filtration):
  #     print(f"Filtration {idx}: {filtration}")
  return grouped_simplices_per_filtration
import math
def generate_U(grouped_simplices_per_filtration):
  betty_value_per_cloud=[]
  for i in range(len(grouped_simplices_per_filtration)):
      delta=generate_k_laplasian_matrix(grouped_simplices_per_filtration,1,i)
      H=np.array(pad_matrix_to_power_of_two(delta))
      iH = 1j * H
      U = expm(iH)
      number_of_target_qubits = int(math.log2(U.shape[0]))
      print(U.shape)
      mixed_state_circuit = mixed_state_preparation(number_of_target_qubits)
      qpe_circuit = quantum_phase_estimation(mixed_state_circuit, number_of_target_qubits,U)
      job = device.run(qpe_circuit, shots=1024)

      result = job.result()
      counts = result.data.get_counts()
      print("Measurement counts:", counts)


  # Convert the bitstrings from the result
      phase_estimates = [bitstring_to_phase(bitstring) for bitstring in result.get_counts().keys()]
      alpha = sum(counts.values())

      # Count occurrences of '000' in the first three qubits
      zero_state_count = sum(v for k, v in counts.items() if k[:3] == '000')

      # Compute probability p(0)
      p_0 = zero_state_count / alpha
      print(f"p(0) = {p_0:.4f}")


      q = 3  # Number of qubits for phase estimation
      beta_k_tilde = (2**q) * p_0
      print(f"Estimated Betti Number: {beta_k_tilde:.4f}")
      betty_value_per_cloud.append(beta_k_tilde)

  return betty_value_per_cloud

In [7]:
def generate_betti_numbers(grouped_simplices_per_filtration, q=5):  # Adjust q as needed
    betti_numbers = []
    for i in range(len(grouped_simplices_per_filtration)):
        # Generate Laplacian for k=1 at filtration step i
        laplacian = generate_k_laplasian_matrix(grouped_simplices_per_filtration, k=1, i_simplice=i)

        # Pad to power-of-two and normalize eigenvalues
        padded_laplacian = pad_matrix_to_power_of_two(laplacian)
        max_eigenvalue = np.linalg.norm(padded_laplacian, 2)  # Spectral norm
        H = padded_laplacian / max_eigenvalue * 2 * np.pi  # Scale eigenvalues to [0, 2π)

        # Construct unitary U = e^(iH)
        U = expm(1j * H)

        # Determine target qubits
        num_target_qubits = int(np.log2(U.shape[0]))

        # Prepare uniform superposition state (not mixed)
        state_prep_circuit = mixed_state_preparation(num_target_qubits)

        # Quantum Phase Estimation
        qpe_circuit = quantum_phase_estimation(state_prep_circuit, num_target_qubits, U)
        job = device.run(qpe_circuit, shots=1024)

        result = job.result()
        counts = result.data.get_counts()
        print("Measurement counts:", counts)

        # Estimate Betti number: count zero phases and scale
        zero_phase_counts = sum(v for k, v in counts.items() if k.startswith('0'*q))
        p_0 = zero_phase_counts / sum(counts.values())
        betti = p_0 * (2 ** num_target_qubits)
        print(f"Estimated Betti Number: {betti:.4f}")
        betti_numbers.append(betti)

    return betti_numbers

In [8]:
from qiskit import QuantumCircuit
from qiskit.circuit.library import QFT
from qiskit.circuit.library import UnitaryGate
import numpy as np


def mixed_state_preparation(number_of_target_qubits):
    num_qubits = number_of_target_qubits*3
    qc = QuantumCircuit(num_qubits, num_qubits, name="Custom Circuit")

    # Prepare entangled state between qubits 3-5 and 6-8
    for i in range(number_of_target_qubits, number_of_target_qubits*2):
        qc.h(i)

    output = []
    for i in range(number_of_target_qubits, number_of_target_qubits*2):
      qc.cx(i, i+3)
      output.append(i+number_of_target_qubits)

    # Measure auxiliary qubits (3-5)
    print(output)
    qc.measure(output, output)

    return qc

# def quantum_phase_estimation(qc, U_i ,number_of_target_qubits):
#     U_gate = UnitaryGate(U_i, label="U")

#     # Apply Hadamard to precision qubits (0-2)
#     for i in range(number_of_target_qubits):
#         qc.h(i)

#     count_bits = []
#     unary = []

#     for i in range(number_of_target_qubits, number_of_target_qubits*2):
#       unary.append(i)

#     # Apply controlled-U^(2^j) operations
#     for j in range(number_of_target_qubits-1,-1,-1):
#         print(j)
#         power = 2 ** j
#         count_bits.append(j)
#         controlled_U = U_gate.power(power).control(1)

#         qc.append(controlled_U, [number_of_target_qubits-j-1]+unary)  # Control qubit j, targets 6-8

#     # Apply inverse QFT on precision qubits
#     qft_inv = QFT(number_of_target_qubits, do_swaps=False).inverse()
#     qc.append(qft_inv, count_bits)

#     # Measure precision qubits
#     qc.measure(count_bits, count_bits)

#     return qc




In [9]:
# !pip install qiskit_aer

In [10]:
# time_series = pd.read_csv(r"/content/2025-Moodys/sp500_full.csv",header=None)
# time_series=time_series[1][200:400]
# time_series=np.array(time_series)


In [11]:
# time_series = pd.read_csv(r"/content/2025-Moodys/sp500.csv")


In [12]:
from qiskit import QuantumCircuit
from qiskit.circuit.library import QFT
from qiskit.circuit.library import UnitaryGate
import numpy as np


def mixed_state_preparation(number_of_target_qubits):
    num_qubits = number_of_target_qubits * 3
    qc = QuantumCircuit(num_qubits, num_qubits, name="Custom Circuit")

    # Prepare entangled state between qubits [target_qubits] and [auxiliary_qubits]
    for i in range(number_of_target_qubits, number_of_target_qubits * 2):
        qc.h(i)

    output = []
    for i in range(number_of_target_qubits, number_of_target_qubits * 2):
        qc.cx(i, i + number_of_target_qubits)  # Ensuring correct pairing
        output.append(i + number_of_target_qubits)

    # Measure auxiliary qubits (last set)
    qc.measure(output, output)

    return qc

def quantum_phase_estimation(qc, number_of_target_qubits,U):
    # Define U dynamically based on the number of target qubits
    U_gate = UnitaryGate(U, label="U")

    # Precision qubits (counting qubits)
    count_bits = list(range(number_of_target_qubits))

    # Target qubits (where U applies)
    unary = list(range(number_of_target_qubits, number_of_target_qubits * 2))

    # Apply Hadamard gates to precision qubits
    qc.h(count_bits)

    # Apply controlled-U^(2^j) operations
    for j in range(number_of_target_qubits - 1, -1, -1):
        power = 2 ** j
        controlled_U = U_gate.power(power).control(1)

        qc.append(controlled_U, [count_bits[j]] + unary)  # Control qubit j, targets in unary

    # Apply inverse Quantum Fourier Transform (QFT†)
    qft_inv = QFT(number_of_target_qubits, do_swaps=False).inverse()
    qc.append(qft_inv, count_bits)

    # Measure precision qubits
    qc.measure(count_bits, count_bits)

    return qc

In [None]:
import pandas as pd
import numpy as np
import gudhi
from scipy.linalg import expm
import numpy as np

# Import Qiskit
from qiskit import QuantumCircuit, transpile
from qiskit_aer import AerSimulator
from qiskit.visualization import plot_histogram, plot_state_city
import qiskit.quantum_info as qi
time_series = pd.read_csv(r"sp500_full.csv",header=None)
time_series=time_series[1]
time_series = np.log(time_series).to_numpy().squeeze()
# time_series = np.log(pd.read_csv(r"/content/2025-Moodys/sp500.csv", header=None).to_numpy().squeeze())


betty_value_per_epsilon =[]
for epsilon in np.arange(1,5,1):  # Ensure the upper bound is inclusive15
  point_cloud = generate_point_cloud(time_series, N=4, d=15, w=15)
  print()
  grouped_simplices_per_filtration = generate_simplex_groups(point_cloud, epsilon)

  betty_values= generate_betti_numbers(grouped_simplices_per_filtration)
  print(betty_values)
  betty_value_per_epsilon.append(betty_values)






Measurement counts: {'000000000000001101011': 1, '000000100000000001001': 1, '000000100000000011100': 1, '000000100000000100111': 1, '000000100000000101000': 1, '000000100000000101011': 1, '000000100000000101100': 2, '000000100000000110110': 1, '000000100000000111010': 1, '000000100000000111100': 1, '000000100000001000100': 1, '000000100000001000111': 1, '000000100000001010100': 1, '000000100000001011001': 2, '000000100000001011010': 1, '000000100000001100000': 2, '000000100000001100010': 1, '000000100000001100100': 1, '000000100000001100101': 1, '000000100000001101010': 1, '000000100000001110001': 1, '000000100000001110010': 1, '000000100000001110011': 2, '000000100000001110101': 1, '000000100000001110110': 1, '000000100000001111000': 1, '000000100000001111101': 1, '000001000000001000111': 1, '000001100000000000001': 1, '000001100000000001010': 1, '000001100000000001111': 1, '000001100000000010011': 2, '000001100000000010110': 1, '000001100000000011010': 1, '000001100000000011111': 1

In [None]:
U_range = list(map(list, zip(*betty_value_per_epsilon)))
print(len(U_range),len(U_range[0]))


In [None]:
U_range

In [None]:
components = U_range
# Calculate distances between components
components = np.array(components)
components = np.rint(components).astype(int)

distances = [np.linalg.norm(components[i] - components[i + 1]) for i in range(len(components) - 1)]

# Print distances
len(distances)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Example data: Replace with your actual list of 130 elements
data = distances

# Min-max scaling
min_val = min(data)
max_val = max(data)
scaled_data = [(x - min_val) / (max_val - min_val) for x in data] if max_val != min_val else [0] * len(data)

# Plot the scaled line chart
plt.figure(figsize=(10, 5))
plt.plot(range(1, len(scaled_data) + 1), scaled_data, marker='o', linestyle='-', color='b', label='Scaled Data Trend')

# Labels and Title
plt.xlabel("Index")
plt.ylabel("Scaled Value (0 to 1)")
plt.title("Scaled Line Chart for 130 Elements")
plt.grid(True)
plt.legend()

plt.show()

