In [1]:
import numpy as np 
import pandas as pd 
import pytensor

In [2]:
"""
Define all model classes following the definition of Abstract_Model.
Updated to use PyTensor instead of Theano.
"""
import pytensor
import pytensor.tensor as pt
import numpy as np
from pytensor.compile import SharedVariable

data_type = 'float32'
pytensor.config.floatX = data_type

# Helper function to replace randn
def randn(*shape):
    return np.random.randn(*shape).astype(data_type)

def L2_proj(x):
    """L2 normalization"""
    return x / np.linalg.norm(x, axis=1, keepdims=True)


class Abstract_Model(object):

    def __init__(self):
        self.name = self.__class__.__name__
        
        self.pred_func = None
        self.pred_func_compiled = None
        self.loss_func = None
        self.regul_func = None
        self.loss_to_opt = None
        
        # Symbolic variables for training values
        self.ys = pt.vector('ys')
        self.rows = pt.lvector('rows')
        self.cols = pt.lvector('cols')
        self.tubes = pt.lvector('tubes')

        # Dimensions
        self.n = 0  # Number of subject entities
        self.m = 0  # Number of relations
        self.l = 0  # Number of object entities
        self.k = 0  # Rank
        self.nb_params = 0

    def allocate_params(self):
        nb_params = 0
        params = self.get_init_params()
        for name, val in params.items():
            setattr(self, name, pytensor.shared(val, name=name))
            nb_params += val.size
        self.nb_params = nb_params

    def get_pred_symb_vars(self):
        return [self.rows, self.cols, self.tubes]

    def get_pred_args(self, test_idxs):
        return [test_idxs[:, 0], test_idxs[:, 1], test_idxs[:, 2]]

    def compile_prediction(self):
        """Compile the prediction function"""
        self.pred_func_compiled = pytensor.function(
            self.get_pred_symb_vars(), 
            self.pred_func
        )

    def predict(self, test_idxs):
        """Predict on test indices"""
        return self.pred_func_compiled(*self.get_pred_args(test_idxs))

    def get_init_params(self):
        """Abstract method - must be implemented by child classes"""
        raise NotImplementedError

    def define_loss(self):
        """Abstract method - must be implemented by child classes"""
        raise NotImplementedError


class DistMult_Model(Abstract_Model):
    """DistMult model"""

    def __init__(self):
        super(DistMult_Model, self).__init__()
        self.name = self.__class__.__name__
        self.e = None
        self.r = None

    def get_init_params(self):
        params = {
            'e': randn(max(self.n, self.l), self.k),
            'r': randn(self.m, self.k)
        }
        return params

    def define_loss(self):
        self.pred_func = pt.sum(
            self.e[self.rows, :] * self.r[self.cols, :] * self.e[self.tubes, :], 
            1
        )
        self.loss = pt.sqr(self.ys - self.pred_func).mean()
        self.regul_func = (
            pt.sqr(self.e[self.rows, :]).mean() +
            pt.sqr(self.r[self.cols, :]).mean() +
            pt.sqr(self.e[self.tubes, :]).mean()
        )


class Complex_Model(Abstract_Model):
    """Factorization in complex numbers"""

    def __init__(self):
        super(Complex_Model, self).__init__()
        self.name = self.__class__.__name__
        self.e1 = None
        self.e2 = None
        self.r1 = None
        self.r2 = None

    def get_init_params(self):
        params = {
            'e1': randn(max(self.n, self.l), self.k),
            'e2': randn(max(self.n, self.l), self.k),
            'r1': randn(self.m, self.k),
            'r2': randn(self.m, self.k)
        }
        return params

    def define_loss(self):
        self.pred_func = (
            pt.sum(self.e1[self.rows, :] * self.r1[self.cols, :] * self.e1[self.tubes, :], 1) +
            pt.sum(self.e2[self.rows, :] * self.r1[self.cols, :] * self.e2[self.tubes, :], 1) +
            pt.sum(self.e1[self.rows, :] * self.r2[self.cols, :] * self.e2[self.tubes, :], 1) -
            pt.sum(self.e2[self.rows, :] * self.r2[self.cols, :] * self.e1[self.tubes, :], 1)
        )
        
        self.loss = pt.sqr(self.ys - self.pred_func).mean()
        
        self.regul_func = (
            pt.sqr(self.e1[self.rows, :]).mean() +
            pt.sqr(self.e2[self.rows, :]).mean() +
            pt.sqr(self.e1[self.tubes, :]).mean() +
            pt.sqr(self.e2[self.tubes, :]).mean() +
            pt.sqr(self.r1[self.cols, :]).mean() +
            pt.sqr(self.r2[self.cols, :]).mean()
        )

In [4]:
import numpy as np
import sys
sys.path.append('..')
from efe.models_pytensor import DistMult_Model, Complex_Model


ModuleNotFoundError: No module named 'efe.models_pytensor'