In [21]:
# -*- coding: utf-8 -*-

import numpy as np
from random import normalvariate #正态分布
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
from scipy.special import expit

$y(x)=w_0+\sum_{i=1}^{d}w_ix_i+\sum_{i=1}^{d}\sum_{j=i+1}^{d}<v_i,v_j>x_ix_j$

In [22]:
class FM(object):
    def __init__(self):
        self.data = None
        self.label = None
        self.data_test = None
        self.label_test = None

        self.alpha = 0.01
        self.iter = 30
        self.k= 10
        self.lamb=0.01
        self._w = None
        self._w_0 = None
        self.v = None

    # 数据归一化，统一标尺
    # 将标签换成-1 1
    def preprocessing(self,DataSet,test_data=False):
        DataSet=np.array(DataSet)
        min_max_scaler = MinMaxScaler()
        data=DataSet[:,:len(DataSet[0])-1]
        label=(DataSet[:,len(DataSet[0])-1:]).T[0]
        for l in range(len(label)):
            if label[l]==0:
                label[l]=-1
        data_minMax = min_max_scaler.fit_transform(data)
        if test_data:
            self.data_test=data_minMax
            self.label_test=label
        else:
            self.data=data_minMax
            self.label=label



    def sigmoid(self,x):  # 定义sigmoid函数
        return 1.0 / (1.0 + expit(-x))

    def kernal(self,v1,v2):
        return sum(v1[i]*v2[i] for i in range(len(v1)))

    # 预测一条数据x
    def getPrediction(self,x,thold,L2=False):
        m, n = np.shape(self.data)
        result = []
        temp = 0
        for i in range(n):
            for j in range(i + 1, n):
                temp += self.kernal(self.v[i], self.v[j]) * x[i] * x[j]
        term1 = self._w_0
        term2 = self.kernal(x, self._w)
        # 该sample的预测值
        if L2:
            pre = self.sigmoid(term1 + term2 + temp+0.5*self.lamb*(pow(np.linalg.norm(self._w_0),2)+pow(np.linalg.norm(self._w),2)+pow(np.linalg.norm(self.v),2)))
        else:
            pre = self.sigmoid(term1 + term2 + temp)
        # print(pre)
        if pre > thold:
            pre = 1
        else:
            pre = -1
        return pre


    # 计算准确率
    def calaccuracy(self,pre_y,act_y):
        cost=[]
        for sampleId in range(len(act_y)):
            if pre_y[sampleId]==act_y[sampleId]:
                cost.append(1)
            else:
                cost.append(0)
        return np.sum(cost)/len(cost)

    def sgd_fm(self):
        # 数据矩阵data是m行n列
        m, n = np.shape(self.data)
        # 初始化w0,wi,V,Y_hat
        w0 = 0
        wi = np.zeros(n)
        V = normalvariate(0, 0.2) * np.ones([n, self.k])
        for it in range(self.iter):

            loss=0
            # 随机梯度下降法，每次使用一个sample更新参数
            for sampleId in range(m):
                # 计算交叉项
                temp=0
                for i in range(n):
                    for j in range(i+1,n):
                        temp+=self.kernal(V[i],V[j])*self.data[sampleId][i]*self.data[sampleId][j]
                term1=w0
                term2=self.kernal(self.data[sampleId],wi)
                # 该sample的预测值
                y_hat=term1+term2+temp
                # 计算损失
                yp=self.sigmoid(y_hat*self.label[sampleId])
                loss=yp-1
                part_df_loss=(yp-1)*self.label[sampleId]
                #  更新w0,wi
                w0-=self.alpha*1*part_df_loss
                for i in range(n):
                    if self.data[sampleId][i]!=0:
                        wi[i]-=self.alpha*self.data[sampleId][i]*part_df_loss
                        for f in range(self.k):
                            V[i][f]-=self.alpha*part_df_loss*self.data[sampleId][i]*sum(V[j][f]*self.data[sampleId][j]-
                                                                                        V[i][f]*self.data[sampleId][i]*self.data[sampleId][i] for j in range(n))

            # print('第%s次训练的误差为：%f' % (it, loss))
        self._w = wi
        self._w_0 = w0
        self.v = V
        
    def sgd_fm_L2(self):
        # 数据矩阵data是m行n列
        m, n = np.shape(self.data)
        # 初始化w0,wi,V,Y_hat
        w0 = 0
        wi = np.zeros(n)
        V = normalvariate(0, 0.2) * np.ones([n, self.k])
        for it in range(self.iter):

            loss=0
            # 随机梯度下降法，每次使用一个sample更新参数
            for sampleId in range(m):
                # 计算交叉项
                temp=0
                for i in range(n):
                    for j in range(i+1,n):
                        temp+=self.kernal(V[i],V[j])*self.data[sampleId][i]*self.data[sampleId][j]
                term1=w0
                term2=self.kernal(self.data[sampleId],wi)
                # 该sample的预测值
                y_hat=term1+term2+temp+0.5*self.lamb*(pow(np.linalg.norm(w0),2)+pow(np.linalg.norm(wi),2)+pow(np.linalg.norm(V),2))
                # 计算损失
                yp=self.sigmoid(y_hat*self.label[sampleId])
                loss=yp-1
                part_df_loss=(yp-1)*self.label[sampleId]
                #  更新w0,wi
                w0-=self.alpha*part_df_loss*self.lamb*(1+2*w0)
                for i in range(n):
                    if self.data[sampleId][i]!=0:
                        wi[i]-=self.alpha*part_df_loss*self.lamb*(self.data[sampleId][i]+2*wi[i])
                        for f in range(self.k):
                            V[i][f]-=self.alpha*part_df_loss*self.lamb*(self.data[sampleId][i]*sum(V[j][f]*self.data[sampleId][j]-
                                                                                        V[i][f]*self.data[sampleId][i]*self.data[sampleId][i] for j in range(n))+2*V[i][f])

            # print('第%s次训练的误差为：%f' % (it, loss))
        self._w = wi
        self._w_0 = w0
        self.v = V
   
        

没有正则化的FM为什么准确率更高

In [23]:
def main():
    os=FM()
    data_train = pd.read_csv('diabetes_train.txt', header=None)
    data_test = pd.read_csv('diabetes_test.txt', header=None)

    os.preprocessing(data_train)
    os.preprocessing(data_test,True)
    # 训练模型
    os.sgd_fm()

    # 计算准确率
    maxacu=0
    best_thold=0
    for t in np.arange(0.4,0.7,0.01):
        pre_y=[]
        for x in os.data_test:
            pre_y.append(os.getPrediction(x,t))
        acu=os.calaccuracy(pre_y,os.label_test)
        if acu>maxacu:
            maxacu=acu
            best_thold=t


    print('准确率：',maxacu*100,'%','最佳阈值:',best_thold)

if __name__ == "__main__":
    main()


准确率： 72.76119402985076 % 最佳阈值: 0.6000000000000002


In [24]:
def main():
    os=FM()
    data_train = pd.read_csv('diabetes_train.txt', header=None)
    data_test = pd.read_csv('diabetes_test.txt', header=None)

    os.preprocessing(data_train)
    os.preprocessing(data_test,True)
    # 训练模型
    os.sgd_fm_L2()

    # 计算准确率
    maxacu=0
    best_thold=0
    for t in np.arange(0.4,0.7,0.01):
        pre_y=[]
        for x in os.data_test:
            pre_y.append(os.getPrediction(x,t,True))
        acu=os.calaccuracy(pre_y,os.label_test)
        if acu>maxacu:
            maxacu=acu
            best_thold=t


    print('准确率：',maxacu*100,'%','最佳阈值:',best_thold)

if __name__ == "__main__":
    main()



准确率： 56.343283582089555 % 最佳阈值: 0.6900000000000003
