In [1]:
import numpy as np
import pandas as pd
import torch
import torch.optim as optim
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

column_names = ['variance', 'skewness', 'curtosis', 'entropy', 'class']
data = pd.read_csv('./code/data/banknote.txt', names=column_names)

x = data.drop('class', axis=1)
y = data['class']

logistic_model = LogisticRegression() 

logistic_model.fit(x, y)

y_pred = logistic_model.predict(x)

accuracy = accuracy_score(y, y_pred)
print(f'Accuracy of the logistic regression model: {accuracy:.2f}')

print('Model coefficients (θ):', logistic_model.coef_) # Theta
print('Model intercept (θ0):', logistic_model.intercept_)


  from .autonotebook import tqdm as notebook_tqdm


Accuracy of the logistic regression model: 0.99
Model coefficients (θ): [[-3.36825847 -1.88751582 -2.30754002 -0.0880308 ]]
Model intercept (θ0): [3.73908395]


In [2]:
def strategy1(x,theta_lgt,k):
    eta_x = (1 + np.exp(-np.dot(theta_lgt.T, x)))**(-1/k)
    return eta_x

def strategy2(x,theta_lgt,k):
    eta_x = 1 - strategy1(x,theta_lgt,k)
    return eta_x


In [4]:
x_values = x.values
theta_lgt = logistic_model.coef_[0]
k = 10
pi = 0.3
data['eta'] = [strategy1(x_i, theta_lgt, k) for x_i in x_values]

data_with_pu_label = data.sort_values(by='eta', ascending=False)

P_num = sum(data_with_pu_label['class']==0)

# 选择前 (1-pi)*正例 的样本作为正例
data_with_pu_label['pu_label'] = 0  # 创建一个新列以存储 PU 标签
data_with_pu_label.iloc[:int((1-pi)*P_num), -1] = 1  # 前设置为观测到的正例 即 s = 1

data_group = data_with_pu_label.groupby('pu_label')
S_P = data_group.get_group(1)
S_U = data_group.get_group(0)

In [5]:
S_P = S_P.iloc[:, :4]
S_P['y1'] = 1
S_P['y0'] = 0
S_P['si'] = 1
S_P['yi'] = 1

S_U = S_U.iloc[:, :4]
S_U['y1'] = 0 # 这里S_U在进入EM算法前应该根据已初始化的theta值，计算出widetilde{P(y_i)}的值
S_U['y0'] = 0
S_U['si'] = 0
S_U['yi'] = 3 # 用3 表示 Unknown





xi_SU = torch.tensor(S_U.drop(['y1','y0','si','yi'], axis=1).values, dtype=torch.float32)
y1_SU = torch.tensor(S_U['y1'].values, dtype=torch.int64)
y0_SU = torch.tensor(S_U['y0'].values, dtype=torch.int64)
si_SU = torch.tensor(S_U['si'].values, dtype=torch.int64)
yi_SU = torch.tensor(S_U['yi'].values, dtype=torch.int64)



S_U = [[xi_SU[i], y1_SU[i],y0_SU[i],si_SU[i],yi_SU[i]] for i in range(len(S_U))]

xi_SP = torch.tensor(S_P.drop(['y1','y0','si','yi'], axis=1).values, dtype=torch.float32)
y1_SP = torch.tensor(S_P['y1'].values, dtype=torch.int64)
y0_SP = torch.tensor(S_P['y0'].values, dtype=torch.int64)
si_SP = torch.tensor(S_P['si'].values, dtype=torch.int64)
yi_SP = torch.tensor(S_P['yi'].values, dtype=torch.int64)



S_P = [[xi_SP[i], y1_SP[i],y0_SP[i],si_SP[i],yi_SP[i]] for i in range(len(S_P))]



In [6]:
# init theta1 theta2

x = data_with_pu_label.iloc[:, :4]
y = data_with_pu_label['pu_label']

logistic_model = LogisticRegression()
logistic_model.fit(x, y)

theta1 = torch.tensor(logistic_model.coef_[0],dtype=torch.float32)
theta2 = torch.zeros(4,dtype=torch.float32)




In [7]:
theta1,theta2

(tensor([-5.1456, -2.5409, -3.2537, -0.0525]), tensor([0., 0., 0., 0.]))

In [8]:
import torch
import torch.optim as optim

class PU_EM_Model:
    def __init__(self, theta1, theta2, S_U, S_P):
        """
        dataset = [x_i(4D),y_1,y_0,s_i]
        """

        self.S_U = S_U
        self.S_P = S_P
        self.dataset = S_U + S_P
        self.theta1 = theta1.clone().detach().requires_grad_(True)
        self.theta2 = theta2.clone().detach().requires_grad_(True)

    def _logistic_function(self, theta, x):
        # logistic function
        return 1.0 / (1.0 + torch.exp(-torch.matmul(theta.T, x)))

    def eta(self, x):
        # 公式(9)
        return self._logistic_function(self.theta2, x)

    def h(self, x):
        # 公式(8)
        return self._logistic_function(self.theta1, x)

    def grad_expressions_theta1(self):
        grad_theta1 = torch.zeros_like(self.theta1)
        for data in self.dataset:
            x = data[0]
            y_1 = data[1]
            y_0 = data[2]

            h_x = self.h(x)
            grad_theta1 += y_1 * (h_x - 1) * x + y_0 * h_x * x
        return grad_theta1

    def grad_expressions_theta2(self):
        grad_theta2 = torch.zeros_like(self.theta2)
        for data in self.dataset:
            x = data[0]
            y_1 = data[1]
            y_0 = data[2]
            s = data[3]

            if not s.item():
                # s 为 0的情况下
                grad_theta2 += y_1 * self.eta(x)
            else:
                # s 为 1的情况下
                grad_theta2 += y_1 * (self.eta(x) - 1)

        return grad_theta2

    def expectation_step(self):
        # 在E步中更新隐变量的期望
        for i in range(len(self.dataset)):
            # 为了可读性
            x = self.dataset[i][0]
            s = self.dataset[i][3]

            if not s.item():
                # s 为 0的情况下 ,通过公式(12)对\cap{P(y)}的值进行更新
                p_y1 = (1 - self.eta(x)) * self.h(x)
                p_y0 = 1 - self.h(x)
                # 更新数据集中的 y_1 和 y_0
                self.dataset[i][1] = p_y1
                self.dataset[i][2] = p_y0
            else:
                # s 为 1的情况下
                p_y1 = self.eta(x) * self.h(x)
                p_y0 = 0
                # 更新数据集中的 y_1 和 y_0
                self.dataset[i][1] = p_y1  # 1
                self.dataset[i][2] = p_y0  # 0

    def maximization_step(self, optimizer):
        # 在M步中更新参数 theta1 和 theta2
        optimizer.zero_grad()
        # 计算梯度表达式
        self.theta1.grad = self.grad_expressions_theta1()  # 计算 theta1 的梯度
        self.theta2.grad = self.grad_expressions_theta2()  # 计算 theta2 的梯度
        optimizer.step()


# 定义优化器参数
lr = 0.01  # 学习率
rho1 = 0.9  # beta1，用于一阶矩估计的衰减率
rho2 = 0.999  # beta2，用于二阶矩估计的衰减率
num_epochs = 100

model = PU_EM_Model(theta1, theta2, S_U, S_P)
optimizer = optim.Adam([model.theta1, model.theta2], lr=lr)

for epoch in range(num_epochs):
    # E步
    model.expectation_step()

    # M步
    model.maximization_step(optimizer)

    # 输出当前参数
    print(f'Epoch [{epoch + 1}/{num_epochs}], theta1: {model.theta1}, theta2: {model.theta2}')



  return 1.0 / (1.0 + torch.exp(-torch.matmul(theta.T, x)))


Epoch [1/100], theta1: tensor([-5.1556, -2.5509, -3.2437, -0.0625], requires_grad=True), theta2: tensor([0.0100, 0.0100, 0.0100, 0.0100], requires_grad=True)
Epoch [2/100], theta1: tensor([-5.1654, -2.5603, -3.2344, -0.0725], requires_grad=True), theta2: tensor([0.0200, 0.0200, 0.0200, 0.0200], requires_grad=True)
Epoch [3/100], theta1: tensor([-5.1748, -2.5681, -3.2270, -0.0824], requires_grad=True), theta2: tensor([0.0300, 0.0300, 0.0300, 0.0300], requires_grad=True)
Epoch [4/100], theta1: tensor([-5.1836, -2.5732, -3.2228, -0.0923], requires_grad=True), theta2: tensor([0.0400, 0.0400, 0.0400, 0.0400], requires_grad=True)
Epoch [5/100], theta1: tensor([-5.1916, -2.5754, -3.2219, -0.1022], requires_grad=True), theta2: tensor([0.0500, 0.0500, 0.0500, 0.0500], requires_grad=True)
Epoch [6/100], theta1: tensor([-5.1990, -2.5752, -3.2234, -0.1119], requires_grad=True), theta2: tensor([0.0600, 0.0600, 0.0600, 0.0600], requires_grad=True)
Epoch [7/100], theta1: tensor([-5.2059, -2.5734, -3.

In [10]:
# 测试
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

column_names = ['variance', 'skewness', 'curtosis', 'entropy', 'class']
data = pd.read_csv('./code/data/banknote.txt', names=column_names)

x = data.drop('class', axis=1).values
y = data['class'].values



theta = np.array([-5.8385, -2.8133, -3.5887,  0.0601]) 
theta_0 = 0

z = 1/(np.exp(-np.dot(x, theta))+1)

y_pred = (z -0.5 > 0).astype(int)


accuracy = accuracy_score(y, y_pred)
print(f'Accuracy of the manually defined logistic regression model: {accuracy:.2f}')

# 显示手动设置的参数
print('Manually defined model coefficients (θ):', theta)


Accuracy of the manually defined logistic regression model: 0.94
Manually defined model coefficients (θ): [-5.8385 -2.8133 -3.5887  0.0601]
