In [1]:
from MultiLog import MLR,np
from pyspark import SparkContext,SparkConf
from sklearn.base import copy

In [2]:
def fit_in(x,mlr,classes):
    for it in x:
        X=it[:-1].reshape(1,it.shape[0]-1)
        y=np.array([it[-1]])
        y=y.astype('int')
        mlr.partial_fit(X,y,classes)
    yield mlr
def update_in(m1,m2):
    new_mod=copy.deepcopy(m1)
    new_mod.coefs+=m2.coefs
    new_mod.intercept+=m2.intercept
    return new_mod

def avg_coefs_in(mlr,numpart):
    mlr.coefs/=numpart
    mlr.intercept/=numpart
    return mlr

In [3]:
conf=SparkConf().setAppName("Multilog-SGD").setMaster('local[4]')
sc=SparkContext(conf=conf)

In [4]:
X1=np.random.rand(50,2)+2
y1=np.zeros(X1.shape[0])
X2=2*np.random.rand(50,2)+4
y2=np.ones(X2.shape[0])
X3=3*np.random.rand(50,2)+7
y3=2*np.ones(X3.shape[0])
X4=3*np.random.rand(50,2)+11
y4=3*np.ones(X4.shape[0])
X=np.vstack((X1,X2,X3,X4))
y=np.hstack((y1,y2,y3,y4))
y=y.astype('int')
X=np.hstack((X,y.reshape((y.shape[0],1))))

In [20]:
max_iter=450
mlr=MLR()
n_classes=np.unique(y).shape[0]
prev_coef=np.zeros((n_classes,X.shape[1]-1))
prev_intercept=np.zeros(n_classes)
tol=0.0001

In [21]:
for i in range(max_iter):
    dat=sc.parallelize(X)
    mlr=dat.mapPartitions(lambda x:fit_in(x,mlr,n_classes)).reduce(lambda m1,m2:update_in(m1,m2))
    mlr=avg_coefs_in(mlr,dat.getNumPartitions())
    diff_coef=mlr.coefs-prev_coef
    diff_intercept=mlr.intercept-prev_intercept
    coef_norm=np.linalg.norm(diff_coef,ord='fro')**2
    gradient_sum=np.sqrt(coef_norm+np.sum(diff_intercept**2))
    if gradient_sum<tol:
        break
    prev_coef=mlr.coefs
    prev_intercept=mlr.intercept
    np.random.shuffle(X)
print(mlr.coefs)
print(mlr.intercept)

[[-1.75529612 -1.88643488]
 [-0.42578834 -0.44250291]
 [ 0.53080765  0.42046248]
 [ 1.65027681  1.90847532]]
[ 20.36459757  10.38003819  -1.75425879 -28.99037697]


In [22]:
mlr.predict(X[:,:-1])

In [23]:
mlr.predictions

array([1, 2, 0, 0, 0, 1, 1, 0, 1, 0, 3, 2, 1, 2, 0, 3, 0, 3, 2, 3, 2, 2,
       3, 1, 1, 2, 3, 0, 1, 0, 1, 2, 3, 1, 2, 1, 3, 1, 3, 3, 3, 3, 0, 3,
       1, 0, 2, 3, 1, 0, 0, 1, 1, 3, 3, 0, 1, 0, 2, 3, 2, 0, 3, 0, 0, 0,
       2, 3, 0, 3, 2, 0, 2, 1, 2, 3, 0, 3, 3, 1, 3, 1, 3, 0, 1, 3, 2, 1,
       0, 3, 3, 0, 2, 2, 2, 0, 0, 1, 2, 3, 0, 1, 3, 1, 3, 0, 2, 1, 1, 2,
       3, 0, 0, 2, 2, 2, 1, 3, 2, 2, 0, 3, 2, 1, 2, 1, 3, 0, 3, 3, 1, 2,
       2, 0, 0, 2, 1, 3, 1, 1, 0, 1, 0, 2, 0, 1, 2, 3, 2, 1, 2, 0, 2, 1,
       1, 0, 2, 2, 2, 3, 3, 2, 3, 0, 1, 3, 1, 1, 3, 0, 0, 1, 1, 3, 0, 0,
       2, 2, 1, 0, 1, 1, 3, 1, 3, 0, 2, 0, 0, 2, 3, 3, 1, 2, 2, 2, 1, 3,
       2, 0], dtype=int64)

In [24]:
sum(X[:,-1]==mlr.predictions)

200