In [2]:
from imp import reload
import numpy as np
from utils import *

In [10]:
# lmo
def add(x, y):
    return x + y

def loground(t, c = 1):
    return 1 + np.int(c * np.log10(.99 + t))

def centralize(rdd, svd = viaSVD, **kwargs):
    grad = rdd.map(lambda z: z['grad']).reduce(add)
    return svd(grad)

def warmstart(rdd, svd = viaSVD, **kwargs):
    return rdd.map(lambda z: svd(z['grad']))

def avgmix(rdd, **kwargs):
    return warmstart(rdd).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

def poweriter(rdd, max_iter, v = None, t = 0):
    if v is None:
        v = warmstart(rdd).map(lambda z: z[1]).reduce(add)
    elif v == "random":
        v = np.random.randn(kwargs['m'])
    
    k = max_iter(t)
    for _ in range(int(k)):
        u = rdd.map(lambda z: np.dot(z['grad'], v)).reduce(add)
        u /= np.linalg.norm(u)
        v = rdd.map(lambda z: np.dot(u, z['grad'])).reduce(add)
        v /= np.linalg.norm(v)
    if k > int(k):
        u = rdd.map(lambda z: np.dot(z['grad'], v)).reduce(add)
        u /= np.linalg.norm(u)        
    return u, v

def regularize(u, v, nn):
    u /= np.linalg.norm(u)
    v /= np.linalg.norm(v)
    u *= -nn
    return u, v

In [8]:
# ls
def naivestep(*args, t, **kwargs):
    return 2./ (t + 2)

def linearsearch(*args, rdd, u, v, ls, **kwargs):
    a = rdd.map(lambda z: ls(**z, D = LRmatrix([1], [u], [v]))).reduce(add)
    return min(a[0] / a[1], 1)

In [9]:
# update
def update(rdd, u, v, a, f):
    return rdd.map(lambda z: f(z, LRmatrix([1], [u], [v]), a))

In [11]:
import mls as md
# X: n*p  W: p*m  Y: n*m
# =======================

# default parameters
param = {'n':16, 'm':5, 'p':4, 'r':3, 'nn':1, 'seed':0}

# generate data
data, W = md.generate(**param)

# prepare data
points = mat2point(*data)
dataRDD = sc.parallelize(points).mapPartitions(point2mat)
dataRDD.count()
statRDD = dataRDD.map(md.stats).cache()
statRDD.count()

# lmo
#assert np.isclose(prod(centralize(statRDD)), 7.186559722e-06)
#assert np.isclose(prod(avgmix(statRDD)), 275.7011134)
assert np.isclose(prod(poweriter(statRDD, lambda t: 3)), 2.863723520e-06)
u, v = regularize(*centralize(statRDD), 1)
assert np.isclose(prod((u, v)), 2.852219061e-06)

# ls
assert np.isclose(naivestep(t = 2), 0.5)
a = linearsearch(rdd = statRDD, u = u, v = v, ls = md.linesearch)
assert np.isclose(a, 0.3897265)

# update
statRDD = update(statRDD, u, v, a, md.update)
#assert np.isclose(prod(centralize(statRDD)), -0.00195662557)

AssertionError: 

In [9]:
(centralize(statRDD))

(array([ 0.84906909,  0.38234866,  0.35159612,  0.09628789]),
 array([-0.23446779,  0.13106294, -0.1648418 , -0.05406997, -0.94749722]))