In [1]:
import numpy as np
import scipy.io
import os
import torch
from dnmf import DNMF
from robust import robust
from side import np_to_var, var_to_np, np_to_tensor

class Namespace:
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

## GPU or CPU
GPU = True
if GPU:
    torch.backends.cudnn.enabled = True
    torch.backends.cudnn.benchmark = True
    os.environ['CUDA_VISIBLE_DEVICES'] = '0'
    print("num GPUs", torch.cuda.device_count())
    dtype = torch.cuda.FloatTensor
else:
    dtype = torch.FloatTensor
    print("CPU")

num GPUs 1


### Dataset & Parameters

In [4]:
V = torch.load('COCO/V.pt').type(dtype)/255
J = torch.load('COCO/mask1.pt').type(dtype)

args = Namespace()
args.type = dtype
args.seed = 42

# Attack parameter \lambda
l_r = 2

# Number of pre-training iterations
args.pre_iterations = 300

# Number of outer(attacker) iterations
args.outIterations = 10

# Number of inner(learner) iterations
args.iterations = 50

# Size of latents
args.layers = [400, 300, 200]

# Model cost. 'el' means Elastic loss and 'F' means Frobenius loss. 
args.cost = 'el'

# scale parameter \delta (just for Elastic loss)
args.para = 2

# For L21 loss, set cost to 'el' and set 'para' to '0'.

cost1 = np.zeros(args.outIterations + 1)
tol = 1e-10

### EADNMF model

In [None]:
# Create Model
model = DNMF(J, V, args)

# Pre-Training with shallow NMF
model.pre_training()

# Error of the pre=trained model
W = model.U_s[0]
for i in range(model.p-1):
    W = W @ model.U_s[i+1]
H = model.V_s[model.p-1]

cost1[0] = torch.norm(J * V - J * (W @ H))**2
print('Current iteration 0 and cost is %.3f\n' % (cost1[0]))

# Start Adversarial Training
iterator = (range(args.outIterations))
for k in iterator:
    #print("Outer : ", k+1)
    if k==0:
        V_pre = W @ H
        R = torch.zeros(J.shape[0], J.shape[1])
    else:
        V_pre = model.P @ model.V_s[model.p-1]


    # Update R
    M = J * V - (J * V_pre)
    C = robust(M, R, args.cost, 'F', l_r, args.para, args.type)
    R = torch.max(M @ C, -J * V)

    # Add attack matrix to input matrix
    new_V = J * (V + R)
    model.A = new_V

    # Learner
    rmse_train, rmse_test, tsp, rmse_train0 = model.training()

    W = model.P
    H = model.V_s[model.p - 1]
    cost1[k + 1] = torch.norm(J * V - J * (W @ H))**2

    # Check convergence
    V_cur = model.P @ model.V_s[model.p-1]
    stp1 = (V_pre - V_cur) / V_pre
    stp1[stp1 != stp1] = 0
    if torch.norm(stp1, 'fro') < tol:
        print('reach outer stopping criterion %d' % (k))
        iterator.close()
        break

# Evaluation
pred = (model.P @ model.V_s[-1])
Jb = (1 - J)
rmse = torch.norm((Jb * V) - (Jb * pred), 'fro') / np.sqrt(J[J < 1].shape[0])
print("RMSE = ", rmse)