In [10]:
import numpy as np
import scipy.optimize



In [11]:
from numba import njit

@njit(fastmath=True)
# from wikipedia
def fwht(a) -> None:
    """In-place Fast Walsh–Hadamard Transform of array a."""
    h = 1
    while h < len(a):
        for i in range(0, len(a), h * 2):
            for j in range(i, i + h):
                x = a[j]
                y = a[j + h]
                a[j] = x + y
                a[j + h] = x - y
        h *= 2

In [12]:
from itertools import combinations


def generate_ops(num_bits, order_ops):
    positions = list(range(num_bits))
    combinations_with_ones = list(combinations(positions, order_ops))

    result = []
    for combo in combinations_with_ones:
        binary_string = ['0'] * num_bits
        for position in combo:
            binary_string[position] = '1'
        result.append(int(''.join(binary_string), 2))

    return result


In [27]:
res = sorted(generate_ops(3,1) + generate_ops(3,2))

print(len(res),2**10,res)

6 1024 [1, 2, 3, 4, 5, 6]


In [30]:
states = [0]
for i in range(3):
    for state in states[:]:
        if state.bit_count() <2:
            states.append(state + 2**i)
np.array(states[1:])

array([1, 2, 3, 4, 5, 6])

In [37]:
res = [i for i in range(2**3) if bin(i).count('1') <= 2 and i > 0]
np.tile(np.array(res).reshape(-1,1),3)

array([[1, 1, 1],
       [2, 2, 2],
       [3, 3, 3],
       [4, 4, 4],
       [5, 5, 5],
       [6, 6, 6]])

In [8]:
# FROM ARONDC60
class ising_model:

    def __init__(self, file):
        """
        Initialize an Ising model object.

        Parameters
        ----------
        file : str
            path to the file containing the data
        """
        # Load in the data
        data = np.loadtxt(file, dtype=str)
        # Convert all observations to integer representation (bitwise xor with all 1s bitstring to have the map (0,1) -> (1, -1))
        self.data = np.array([int(i[::-1], 2) ^ int('1'*len(i), 2) for i in data])
        # Determine the number of variables in the system
        self.n_var = len(data[0])

        # List containing the model distribution
        self.model_distr = np.zeros(2**self.n_var)
        # Generate all pairwise operators in integer representation
        self.spin_op = generate_ops(self.n_var,2)
        # List that contains all modelparameters
        self.param = np.zeros(len(self.spin_op))
        self.set_param(np.random.rand(len(self.spin_op)))

        # List containing empirical distribution
        self.emp_distr = self.calc_emp_distr()
        # Calculate the empirical average for the first and second moment
        self.exp_s_data = self.calc_exp_data()
    
    def set_param(self, param):
        """
        Set the values for the model paramaters.

        Parameters
        ----------
        param : list 
            new values for the model parameters
        """
        # Check if the length of the input is correct
        if len(param) != len(self.spin_op):
            raise ValueError("Number of model parameters does not match the given input.")
        self.param = param
        # Recalculate the model distribution with new parameters
        self.calc_model_distr()
    
    def calc_emp_distr(self):
        """
        Calculate the empirical distribution
        """
        emp_distr = np.empty(2**self.n_var)
        for state in range(2**self.n_var):
            emp_distr[state] = np.count_nonzero(self.data == state)
        return emp_distr / len(self.data)

    def calc_exp_data(self):
        """
        Calculate the empirical averages for all the spinoperators.

        Returns
        -------
        <phi_mu> : array
            array with the empirical average for every spinoperator
        """
        exp_data = fwht(self.emp_distr)
        return exp_data[self.spin_op]

    def calc_model_distr(self):
        """Calculate the model distribution for the current model parameters."""
        g = np.zeros(2**self.n_var)
        g[self.spin_op] = self.param

        energy = fwht(g)
        model_distr = np.exp(energy)
        self.model_distr = model_distr / np.sum(model_distr)
    
    def calc_exp_model(self):
        """
        Calculate the expected value for the spin operators given the current model parameters.

        Returns
        -------
        <phi_mu> : array
            expected value for every spinoperator
        """
        exp_model = utils.fwht(self.model_distr)
        return exp_model[self.spin_op]

    def calc_KL_div(self):
        """
        Calculate the Kullback-Leibler divergence between the empirical distribution and the model distribution.

        Returns
        ------
        kl_div : float
            KL divergence
        """
        div = self.emp_distr / self.model_distr
        log = np.log(div, out=np.zeros_like(div), where=div!=0)

        kl_div = self.emp_distr @ log

        return kl_div

    def calc_jacobian(self):
        """
        Calculate the jacobian (derivative of the negative loglikelihood with respect to the model parameters.)

        Returns
        -------
        jacobian : array
            Jacobian given the current model parameters
        """
        jacobian = fwht(self.model_distr - self.emp_distr)
        return jacobian[self.spin_op]
    
    def f_x(self, param):
        """
        Function to minimize ((KL divergence) in the steepest descent algorithm.

        Parameters
        ----------
        param : array
            new model parameters
        
        Returns
        -------
        kl_div : float
            KL divergence
        """
        # Set new parameters (+ recalculate model distribution)
        self.set_param(param)
        # Calculate new KL divergence with the empirical distribution
        return self.calc_KL_div()
    
    def grad_f(self, param):
        """
        Gradient of the function to minimize in the steepest descent algorithm.

        Parameters
        ----------
        param : array
            new model parameters
        
        Returns
        -------
        jacobian : array
            Jacobian for the given model parameters
        """
        # Set new parameters (+ recalculate model distribution)
        self.set_param(param)
        # Calculate the new gradient
        return self.calc_jacobian()

    def fit_param(self, n_iter = 500):
        """
        Find model parameters using the steepest descent algorithm.

        Parameters
        ----------
        n_iter : int
            Maximum number of iterations in the algorithm
        """
        for _ in range(n_iter):

            param = self.param
            s = - self.calc_jacobian()
            # perform a linesearch
            result = scipy.optimize.line_search(self.f_x, self.grad_f, param, s)
            alpha = result[0]

            # Update the model parameters
            self.set_param(param + alpha * s)

            # Check convergence
            if np.allclose(self.param, param):
                break

In [9]:
class Pairwise_Classifier:
    def __init__(self, n_categories: int, n_variables: int, mcm_filename_format: str, data_filename_format: str, data_path: str, comms_path: str) -> None:
        """
        The Ising Spin Glass Classifier, limited to pairwise interactions.

        Args:
            - n_categories (int): The number of categories in the dataset
            - n_variables (int): The number of variables in the dataset
            - mcm_filename_format (str): The format of the MCM filenames
            - data_filename_format (str): The format of the data filenames
        """
        self.n_categories = n_categories
        self.n_variables = n_variables
        self.__mcm_filename_format = mcm_filename_format
        self.__data_filename_format = data_filename_format

        # Construct probability distributions and MCMs for each category
        self.__P, self.__MCM = ([], [])
        self.predicted_classes = None
        self.probs = None
        self.stats = None
        self.data_path = os.path.join(data_path, "")
        self.comms_path = os.path.join(comms_path, "")










SyntaxError: incomplete input (752188526.py, line 2)