In [1]:
class Augmented_ALS(object):  #use more sparse representation of entries - store as (i,j, weight)
  
  def optimal_partioning(self): #optimal partitioning to minimize wall-clock time per iteration
      return ((8  , 8))
    
  @staticmethod
  def get_size(model):
      M = model.M
      N = model.N
      m = model.m
      n = model.n
      def _get_size(key):
          m1 = M // m + 1*(M % m > 0)
          n1 = N // n + 1*(N % n > 0) 
          if key[0] < (m - 1):
              x1 = m1
          else:
              x1 = M - m1*(m - 1)    
          if key[1] < (n - 1):
              x2 = n1
          else:
              x2 = N - n1*(n - 1)
          return (key, (x1, x2))
      return _get_size
  
  @staticmethod
  def create_edges(model):
      M = model.M
      N = model.N
      m = model.m
      n = model.n
      alpha = model.alpha.value
      def _create_rawentries(x):
          m1 = M // m + 1*(M % m > 0)
          n1 = N // n + 1*(N % n > 0) 
          weight = 1 + alpha*math.log10(1 + float(x[2]))
          return ((x[0] // m1, x[1] // n1), [(x[0] % m1, x[1] % n1 , weight)])
      return _create_rawentries   
    
    
  @staticmethod
  def partitioning(model):
      n = model.n     
      def _partitioning(key):
          return key[0]*n + key[1]
      return _partitioning
 
  def create_entries(self):   
      frames = sc.parallelize([(x,y) for x in range(self.m) for y in range(self.n)])
      frames = frames.map(Augmented_ALS.get_size(self)).partitionBy(self.numpartitions, Augmented_ALS.partitioning(self)).persist() 
      if self.fraction is not None:
          dev = self.ratings.sample(False, self.fraction).persist()
          train = self.ratings.subtract(dev)                       
      else:
          train = self.ratings
          
      if self.ineligible is not None:
          ineligible = self.ineligible_ratings.map(lambda x: (x[0], x[1], float('-inf'))) # -ve weight indicates ineligibility
          train = train.union(ineligible)
      rw_entries = train.map(Augmented_ALS.create_edges(self)).reduceByKey(lambda x, y : x + y)      
      train = frames.leftOuterJoin(rw_entries).cache()       
      if self.fraction is not None:
          rw_dev = dev.map(Augmented_ALS.create_edges(self)).reduceByKey(lambda x, y : x + y)
          test = frames.leftOuterJoin(rw_dev).cache()
      else:
          test = None         
      frames.unpersist()
      return train, test
   
  @staticmethod
  def flatten_customerfeatures(model):
      M = model.M      
      m = model.m
      n = model.n
      def _flatten(x):
          m1 = M // m + 1*(M % m > 0) 
          return [((int(x[0]) // m1, i),  (int(x[0]) % m1, x[1:])) for i in range(n)]
      return _flatten
    
  @staticmethod
  def flatten_productfeatures(model):
      N = model.N      
      m = model.m
      n = model.n
      def _flatten(x):
          n1 = N // n + 1*(N % n > 0) 
          return [((i, int(x[0]) // n1),  (int(x[0]) % n1, x[1:])) for i in range(m)]
      return _flatten
    
  @staticmethod
  def create_vecs(values):  
      sorted_values = sorted(values, key = (lambda x: x[0]))
      return np.array([t[1] for t in  sorted_values])
  
  @staticmethod
  def generate_factors(model):
      f = model.f
      def _generate(key):
          return (int(key[0]), np.random.randn(f + 1))
      return _generate
  
  @staticmethod
  def propagate_customerfactors(model):
      M = model.M
      m = model.m
      n = model.n
      def _prop(value):
          m1 = M // m + 1*(M % m > 0)
          return [((value[0] // m1, i), (value[0] % m1, value[1])) for i in range(n)]
      return _prop
    
  @staticmethod
  def propagate_productfactors(model):   
      N = model.N
      m = model.m
      n = model.n
      def _prop(value):
          n1 = N // n + 1*(N % n > 0)
          return [((i, value[0] // n1), (value[0] % n1, value[1])) for i in range(m)]
      return  _prop                     

  def load_customerfeatures(self):
      c_features = self.c_features_rdd.flatMap(Augmented_ALS.flatten_customerfeatures(self)).groupByKey().mapValues(Augmented_ALS.create_vecs)
      A = self.entries.join(c_features).mapValues(lambda x: x[1]).cache()      
      return A
  
  def load_productfeatures(self):
      p_features = self.p_features_rdd.flatMap(Augmented_ALS.flatten_productfeatures(self)).groupByKey().mapValues(Augmented_ALS.create_vecs)
      B = self.entries.join(p_features).mapValues(lambda x: x[1]).cache()      
      return B
    
  def initialize_customer_factors(self):
      c_factors = self.c_features_rdd.map(Augmented_ALS.generate_factors(self))
      c_factors = c_factors.flatMap(Augmented_ALS.propagate_customerfactors(self)).groupByKey().mapValues(Augmented_ALS.create_vecs)
      X = self.entries.join(c_factors).mapValues(lambda x: x[1]).cache()      
      return X
    
  def initialize_product_factors(self):
      p_factors = self.p_features_rdd.map(Augmented_ALS.generate_factors(self))
      p_factors = p_factors.flatMap(Augmented_ALS.propagate_productfactors(self)).groupByKey().mapValues(Augmented_ALS.create_vecs)
      Y = self.entries.join(p_factors).mapValues(lambda x: x[1]).cache()      
      return Y
    
    
  def intitialize_embeddings(self):
      G = np.random.randn(self.k, self.f)
      g = np.random.randn(self.k) 
      D = np.random.randn(self.l, self.f)
      d = np.random.randn(self.l) 
      G_rdd = self.entries.mapValues(lambda x: G).cache()     
      g_rdd = self.entries.mapValues(lambda x: g).cache()     
      D_rdd = self.entries.mapValues(lambda x: D).cache()      
      d_rdd = self.entries.mapValues(lambda x: d).cache()      
      return G_rdd, g_rdd, D_rdd, d_rdd          

    
  @staticmethod  
  def index_customer_msg(model):
      M = model.M
      m = model.m
      def _index(msg):
          m1 = M // m + 1*(M % m > 0)
          start = msg[0][0]*m1
          end = msg[1].shape[0] + start
          ids = np.expand_dims(np.asarray(list(range(start, end))), axis = 1)
          return np.hstack((ids, msg[1]))  
      return _index
     
    
  def customer_msg_to_rdd(self, msgs):
      return msgs.map(Augmented_ALS.index_customer_msg(self)).flatMap(lambda x: list(x)).map(lambda x: (int(x[0]) , x[1:]))  
  
  @staticmethod  
  def index_product_msg(model):
      N = model.N
      n = model.n
      def _index(msg):
          n1 = N // n + 1*(N % n > 0)
          start = msg[0][1]*n1
          end = msg[1].shape[0] + start
          ids = np.expand_dims(np.asarray(list(range(start, end))), axis = 1)
          return np.hstack((ids, msg[1]))       
      return _index
    
    
  def product_msg_to_rdd(self, msgs):
      return msgs.map(Augmented_ALS.index_product_msg(self)).flatMap(lambda x: list(x)).map(lambda x: (int(x[0]) , x[1:]))  
  
  
  def update_embedding_factors(self, vec, Lambda, k):
      f_ = self.f*k + k
      I_G = Lambda*np.eye(k*self.f + k)
      x1 = vec[:-f_].reshape(f_,-1)
      x2 = np.expand_dims(vec[-f_:], axis = 1)
      temp = inv(x1 + I_G)
      updates = np.squeeze(np.matmul(temp, x2))
      G = np.reshape(updates[:k*self.f],(k,self.f))
      g = updates[-k:]    
      return G, g
             
  @staticmethod
  def update_customer_factors(model):
      Lambda = model.Lambda_x
      f_ = model.f + 1
      update = model.vec_update 
      def _update(vec):        
          return update(vec, Lambda, f_)      
      return _update

  @staticmethod
  def vec_update(vec, Lambda, f_):
      x1 = vec[:-f_].reshape(f_,-1)
      x2 = np.expand_dims(vec[-f_:], axis = 1)
      temp = inv(x1 + Lambda*np.eye(f_))
      return np.squeeze(np.matmul(temp, x2))
    
  @staticmethod
  def update_product_factors(model):
      Lambda = model.Lambda_y
      f_ = model.f + 1
      update = model.vec_update      
      def _update(vec):        
          return update(vec, Lambda, f_)      
      return _update
            
            
  def update_Xblock(self):
      self.Xblock.unpersist()
      D_d = self.D.join(self.d)
      G_g = self.G.join(self.g)
      embeddings = D_d.join(G_g)
      features = self.A.join(self.B)
      msgs = self.entries.join(embeddings).join(features).join(self.Yblock).mapValues(Augmented_ALS.customerfactorupdate(self))
      msgs2 = self.customer_msg_to_rdd(msgs).coalesce(self.numpartitions)
      reduced = msgs2.reduceByKey(lambda x, y : x + y)
      print (reduced.getNumPartitions())
      
      c1_factors = reduced.mapValues(Augmented_ALS.update_customer_factors(self)).flatMap(Augmented_ALS.propagate_customerfactors(self))
      
      print (c1_factors.getNumPartitions())
      c1 = c1_factors.groupByKey().mapValues(Augmented_ALS.create_vecs)   
     
      self.Xblock = self.entries.join(c1).mapValues(lambda x: x[1]).cache()      
      self.Xblock.count()      

      
  def update_Gg_block(self):
      self.G.unpersist()
      self.g.unpersist()
      blocks = self.Xblock.join(self.Yblock)
      features = self.A.join(self.B)
      frame = self.entries.join(blocks).join(features).join(self.D.join(self.d)).mapValues(Augmented_ALS.userembedding_update)
      updated = frame.map(lambda x: x[1]).reduce(lambda x, y: x + y)
      G, g =  self.update_embedding_factors(updated, self.Lambda_G, self.k)
      self.G = self.entries.mapValues(lambda x: G).cache()
      self.g = self.entries.mapValues(lambda x: g).cache()    
      self.G.count()
      self.g.count()     
      
  def update_Dd_block(self):
      self.D.unpersist()
      self.d.unpersist()
      blocks = self.Xblock.join(self.Yblock)
      features = self.A.join(self.B)
      frame = self.entries.join(blocks).join(features).join( self.G.join(self.g)).mapValues(Augmented_ALS.productembedding_update)
      updated = frame.map(lambda x: x[1]).reduce(lambda x, y: x + y)
      D, d =  self.update_embedding_factors(updated, self.Lambda_D, self.l)
      self.D = self.entries.mapValues(lambda x: D).cache()
      self.d = self.entries.mapValues(lambda x: d).cache()    
          
  def update_Yblock(self):
      self.Yblock.unpersist()
      D_d = self.D.join(self.d)
      G_g = self.G.join(self.g)
      embeddings = D_d.join(G_g)
      features = self.A.join(self.B)
      msgs = self.entries.join(embeddings).join(features).join(self.Xblock).mapValues(Augmented_ALS.productfactorupdate(self))
      msgs2 = self.product_msg_to_rdd(msgs).coalesce(self.numpartitions)
      reduced = msgs2.reduceByKey(lambda x, y: x + y)
      print (reduced.getNumPartitions())      
      
      updated = reduced.mapValues(Augmented_ALS.update_product_factors(self))
      p1_factors = updated.flatMap(Augmented_ALS.propagate_productfactors(self))
      print (p1_factors.getNumPartitions())
      
      p1 = p1_factors.groupByKey().mapValues(Augmented_ALS.create_vecs)     
      self.Yblock = self.entries.join(p1).mapValues(lambda x: x[1]).cache()
      self.Yblock.count()
     
              
  def train(self, iterations = 10): 
      for itr in range(0, iterations):
          print ('%s %s' % ('Starting iteration #: ', itr + 1)) 
      
          start = datetime.datetime.now()
          start0 = start
          self.update_Xblock()
          el = datetime.datetime.now() - start
          print ('%s %s' % ('Finshed Xblock with time #: ', el)) 
          
          start = datetime.datetime.now()          
          self.update_Yblock()
          el = datetime.datetime.now() - start
          print ('%s %s' % ('Finshed Yblock with time #: ', el)) 
          
          
          start = datetime.datetime.now()            
          self.update_Gg_block()
          el = datetime.datetime.now() - start
          print ('%s %s' % ('Finshed Ggblock with time #: ', el))
          
          start = datetime.datetime.now() 
          self.update_Dd_block()
          print str(datetime.datetime.now() - start)
          el = datetime.datetime.now() - start
          print ('%s %s' % ('Finshed Ddblock with time #: ', el))
          el = datetime.datetime.now() - start0
          print ('%s %s' % ('Total time for iteration #: ', el))
          
      
  @staticmethod
  def local_props(model):       
      m1 = model.M // model.m + 1*(model.M % model.m > 0)
      def _index(msg):          
          key, _ = msg[0]
          dev = msg[1][0][1]
          m, n = msg[1][0][0]
          propensities = msg[1][1]
          start = m1*key
          def update_msg_axis(x, n,  dev, propensities, start): #update as per v2
              props = propensities[x, :]
              if dev is not None:
                  indices = [item[1] for item in dev if item[0] == x]
                  props_ = list(props[indices])                 
              else:
                  props_ = []
              return (start + x, props_)     
          return  map(lambda x: update_msg_axis(x, n,  dev, propensities, start), list(range(m)))  
      return _index
    
  def test_props(self): #create partioned RDD of (user, list of test props) 
      self.propensities  = self.generate_propensities().persist()
      local_props = self.dev.join(self.propensities).flatMap(Augmented_ALS.local_props(self))
      global_props  = local_props.reduceByKey(lambda x, y: x + y).flatMap(Augmented_ALS.propagate_customerfactors(self)).groupByKey()      
      return self.entries.join(global_props).mapValues(lambda x : list(x[1]))
    
 
  def get_AUC2(self):
    global_props = self.test_props()     
    globe = self.entries.join(self.dev).join(self.propensities).join(global_props).mapValues(Augmented_ALS.AUC2)
    AUC = globe.flatMap(lambda x: x[1]).reduceByKey(lambda x, y: (x[0]+ y[0], x[1]+ y[1])).filter(lambda x: x[1][1] > 0).persist()
    count = AUC.count()
    result = AUC.map(lambda x: x[1][0]/x[1][1]).reduce(lambda a, b: a + b)
    self.propensities.unpersist()
    AUC.unpersist()
    return result/count
      
  
    
  @staticmethod
  def AUC2(values):
      #self.entries.join(self.dev).join(self.propensities).join(global_props).mapValues(Augmented_ALS.AUC2)
      entries = values[0][0][0][1]
      dev = values[0][0][1][1]
      propensities = values[0][1]
      global_props = values[1]
      m, n = values[0][0][0][0]
      assert n == propensities.shape[1], "incompatible dimensions"
      assert m == propensities.shape[0], "incompatible dimensions"
      def AUC_axis(x, entries, globe, propensities, n):
          props = propensities[x, :]
          if entries is not None:
              a = [item[1] for item in entries if item[0] == x and item[2] > 0 ]   
          else:
              a = []
          if dev is not None:
              b = [item[1] for item in dev if item[0] == x]
          else:
              b = []
         
          a.extend(b)          
          l = [item[1] for item in global_props if item[0] == x]
          candidate_props = [item for sublist in l for item in sublist]
          
          indices = [x for x in list(range(n)) if x not in a]
          print (indices)         
          target_props = props[indices]
         
          l = len(target_props)
          result = 0.  
          count = 0
          for prop in candidate_props:
              result += np.sum(target_props < prop)
          count = l*len(candidate_props)
          return (x, (result, count))
      return  map(lambda x: AUC_axis(x, entries, dev, propensities, n), list(range(m)))      
    
  def generate_propensities(self): #generates dense matrix of customer-item propensities
      D_d = self.D.join(self.d)
      G_g = self.G.join(self.g)
      embeddings = D_d.join(G_g)
      features = self.A.join(self.B)
      factors = self.Xblock.join(self.Yblock)
      return self.entries.join(embeddings).join(features).join(factors).mapValues(Augmented_ALS.compute_propensities)   
    
  @staticmethod 
  def compute_propensities(value):
      #extract  relavent values
      X = value[1][0][:, :-1]
      Y = value[1][1][:, :-1]
      Bias_user = value[1][0][:, -1]  
      Bias_item = value[1][1][:, -1]        
      A = value[0][1][0]
      B = value[0][1][1]
      D = value[0][0][1][0][0]
      d = value[0][0][1][0][1]      
     
      G = value[0][0][1][1][0]
      g = value[0][0][1][1][1]
      
      Bu = np.expand_dims(Bias_user, axis = 1)
      Bi = np.expand_dims(Bias_item, axis = 0)
      
      user_factors = np.matmul(A,G) + X
      product_factors = np.matmul(B,D) + Y
      temp1 =  np.matmul(user_factors, product_factors.T)
      temp2 = np.expand_dims(np.matmul(A,g), axis = 1)
      temp3 = np.expand_dims(np.matmul(B,d), axis = 0)
      propensities = temp1 + temp2 + temp3 + Bu + Bi      
      return propensities      
    
      
  def __init__(self, ratings_rdd, c_features, p_features, ineligible_edges = None, alpha = 41, f = 3, Lambda_x = 2, Lambda_y = 2, Lambda_G = 2, Lambda_D = 2, fraction = 0.2):
      self.numpartitions  = 64   #placeholder; will later obtain from spark context
      self.ratings = ratings_rdd
      self.ineligible = ineligible_edges
      self.N = ratings_rdd.map(lambda x: x[1]).top(1)[0] + 1
      self.M = ratings_rdd.map(lambda x: x[0]).top(1)[0] + 1
      m, n = self.optimal_partioning()  
      self.m = m
      self.n = n
      self.alpha = sc.broadcast(alpha)
      self.fraction = fraction
      self.entries, self.dev = self.create_entries() #partition purchases, weights
      self.entries.count()
      self.c_features_rdd = c_features
      self.A  = self.load_customerfeatures() #partition customer features
      self.k = self.A.lookup((0,0))[0].shape[1]        
      self.p_features_rdd = p_features
      self.B  = self.load_productfeatures() #partition product features  
      self.l = self.B.lookup((0,0))[0].shape[1]   
      self.f = f
      self.Xblock = self.initialize_customer_factors()     
      self.Xblock.count()
      self.Yblock = self.initialize_product_factors()    
      self.Yblock.count()
      self.G, self.g, self.D, self.d  = self.intitialize_embeddings()          
      self.Lambda_x = Lambda_x
      self.Lambda_y = Lambda_y
      self.Lambda_G = Lambda_G
      self.Lambda_D = Lambda_D     

 
  
  @staticmethod 
  def userembedding_update(value):
      #extract  relavent values
      X = value[0][0][1][0][:, :-1]
      Y = value[0][0][1][1][:, :-1]
      Bias_item = value[0][0][1][1][:, -1] 
      Bias_user = value[0][0][1][0][:, -1]       
      A = value[0][1][0]
      B = value[0][1][1]
      D = value[1][0]
      d = value[1][1]      
      
      ''''generate R and C '''''
      entries = value[0][0][0][1]
      m, n = value[0][0][0][0]
      assert ((n == B.shape[0]) and (m == A.shape[0])), "dimension incompatibility"
      assert ((n == Y.shape[0]) and (m == X.shape[0])), "dimension incompatibility"
      
      I = np.array([x[0] for x in entries])
      J = np.array([x[1] for x in entries])
      V = np.array([x[2] for x in entries], dtype= float)       
      R = np.zeros((m,n))      
      C = sparse.coo_matrix((V,(I,J)), shape=(m,n)).toarray()
      R[C > 0] = 1      
      C[C == 0] = 1     
           
      
      k = A.shape[1]
      l = B.shape[1]
      
      B_ = np.matmul(B, D) + Y
      Y_ = np.reshape(np.swapaxes(np.tensordot(A, B_, axes = 0), 1,2), (m, n, -1))
      
      X_ = np.matmul(X, B_.T)    
      Bu = np.expand_dims(Bias_user, axis = 1)
      Bi = np.expand_dims(Bias_item, axis = 0)
      
      R_adj = R  -  Bi -  np.expand_dims(np.matmul(B, d), axis = 0)  - Bu - X_
      
      A_ = np.zeros((m,n,k)) + np.expand_dims(A, axis = 1)
      Y_ = np.concatenate((Y_, A_), axis = 2)
    
      Y_ = np.reshape(Y_, (m*n, -1))
      C_ = np.squeeze(np.reshape(C, (m*n, -1)))
      R_adj = np.reshape(R_adj, (m*n, -1))
      Filter = C_
      
      C_adj = C_[Filter > -1]    
      n_adj = C_adj.size    
    
      C_ui = sparse.spdiags(C_adj, 0, C_adj.size, C_adj.size).tocsr()
      p = R_adj[Filter > -1]
      Y_ = Y_[Filter > -1, :]      
           
      term1 = C_ui* sparse.csr_matrix(Y_).toarray()
      term2 = C_ui* sparse.csr_matrix(p).toarray()
      msg1 = np.matmul(Y_.T, term1).ravel() 
      msg2 = np.squeeze(np.matmul(Y_.T, term2).T)
      return np.hstack((msg1, msg2)) #returns the flatenned matrices          
            
  @staticmethod 
  def productembedding_update(value):      
      #extract  relavent values
      X = value[0][0][1][0][:, :-1]
      Y = value[0][0][1][1][:, :-1]
      Bias_item = value[0][0][1][1][:, -1] 
      Bias_user = value[0][0][1][0][:, -1]       
      A = value[0][1][0]
      B = value[0][1][1]
      G = value[1][0]
      g = value[1][1]
      m = X.shape[0] 
      n = Y.shape[0]
      
      ''''generate R and C '''''
      entries = value[0][0][0][1]
      m, n = value[0][0][0][0]
      assert ((n == B.shape[0]) and (m == A.shape[0])), "dimension incompatibility"
      assert ((n == Y.shape[0]) and (m == X.shape[0])), "dimension incompatibility"
      
      I = np.array([x[0] for x in entries])
      J = np.array([x[1] for x in entries])
      V = np.array([x[2] for x in entries], dtype= float)       
      R = np.zeros((n,m))      
      C = sparse.coo_matrix((V,(J,I)), shape=(n,m)).toarray()
      R[C > 0] = 1      
      C[C == 0] = 1    
      
      k = A.shape[1]
      l = B.shape[1]
      
      A_ = np.matmul(A, G) + X
      X_ = np.reshape(np.swapaxes(np.tensordot(B, A_, axes = 0), 1,2), (n, m, -1))
      
      Y_ = np.matmul(Y, A_.T)    
      Bi = np.expand_dims(Bias_item, axis = 1)
      Bu = np.expand_dims(Bias_user, axis = 0)
      
      R_adj = R  -  Bu -  np.expand_dims(np.matmul(A, g), axis = 0)  - Bi - Y_
      
      B_ = np.zeros((n,m,l)) + np.expand_dims(B, axis = 1)
      X_ = np.concatenate((X_, B_), axis = 2)
    
      X_ = np.reshape(X_, (n*m, -1))
      C_ = np.squeeze(np.reshape(C, (n*m, -1)))
      R_adj = np.reshape(R_adj, (n*m, -1))
      Filter = C_
      
      C_adj = C_[Filter > -1]    
      n_adj = C_adj.size    
     
      C_ui = sparse.spdiags(C_adj, 0, C_adj.size, C_adj.size).tocsr()
      p = R_adj[Filter > -1]
      X_ = X_[Filter > -1, :]      
           
      term1 = C_ui* sparse.csr_matrix(X_).toarray()
      term2 = C_ui* sparse.csr_matrix(p).toarray()
      msg1 = np.matmul(X_.T, term1).ravel() 
      msg2 = np.squeeze(np.matmul(X_.T, term2).T)
      return np.hstack((msg1, msg2)) #returns the flatenned matrices 
 
    
  @staticmethod
  def update_msg_axis(index, flag, entries, A, Y_, Bias_item, B, D, d, G, g, n, alpha): 
          k = G.shape[0]          
          dim = n  
        
          #create the pu and filter matrices
          p_u = np.zeros(dim)
          if flag: #flag indicates we are updating for users             
              ents = [(x[1], x[2]) for x in entries if x[0] == index]
          else: #we are updating for items             
              ents = [(x[0], x[2]) for x in entries if x[1] == index]          
          
          inds = [x[0] for x in ents if x[1] > 0]
          p_u[inds] = 1
          
          Filter = 1 + np.zeros(dim)
          inds = [x[0] for x in ents]
          vals = [x[1] for x in ents]
          Filter[inds] = vals
                   
          p_u = p_u[Filter > 0]
          
          alpha_u = A[index, :]       
    
          term1 = np.matmul(Y_, np.matmul(alpha_u, G))
          term1 = term1[Filter > -1]            
          term2 = np.matmul(B, d)
          term2 = term2[Filter > -1]            
          term3 = np.matmul(g, alpha_u)            
          term4 = Bias_item[Filter > -1]            
          p_u = p_u  - term1 - term2  - term3 - term4
          
          n_adj = p_u.size         
          temp = Filter[Filter > -1]
          c_u = temp*np.eye(n_adj) 
            
          ones = np.ones((n_adj, 1))   
          Yadj = Y_[Filter > -1, :]            
          Yadj = np.concatenate((Yadj, ones), axis = 1)   
          
          msg1 = np.matmul(Yadj.T, (np.matmul(c_u,Yadj))).ravel()         
          msg2 = np.matmul(Yadj.T, np.matmul(c_u, p_u)).T          
          return np.hstack((msg1, msg2)) #returns the flatenned matrices of dimenion 1 X [(f+1)*(f+1) +f+1]      
  
  
  @staticmethod 
  def customerfactorupdate(model):       
      udateX = model.update_msg_axis
      alpha = model.alpha
      def _update(value):
          #extract  relavent values
          Y = value[1][:, :-1]
          Bias_item =value[1][:, -1]  
          A = value[0][1][0]
          B = value[0][1][1]
          D = value[0][0][1][0][0]
          B_ = np.matmul(B, D)
          Y_ = Y + B_
          d = value[0][0][1][0][1]
          G = value[0][0][1][1][0]
          g = value[0][0][1][1][1]
          ratings = value[0][0][0][1]   #entries of weighted ratings                    
          m, n = value[0][0][0][0]     
          assert ((n == B.shape[0]) and (m == A.shape[0])), "dimension incompatibility"
          return  np.array(map(lambda index: udateX(index, True, ratings, A, Y_, Bias_item, B, D, d, G, g, n, alpha.value), list(range(m))))
      return _update
    
  @staticmethod 
  def productfactorupdate(model):        
      udateY = model.update_msg_axis
      alpha = model.alpha
      def _update(value):
      #extract  relavent values
          X = value[1][:, :-1]
          Bias_user =value[1][:, -1]  
          A = value[0][1][0]
          B = value[0][1][1]
          D = value[0][0][1][0][0]
          G = value[0][0][1][1][0]
          A_ = np.matmul(A, G)
          X_ = X + A_
          d = value[0][0][1][0][1]     
          g = value[0][0][1][1][1]
           
          ratings = value[0][0][0][1]   #entries of weighted ratings                    
          m, n = value[0][0][0][0]     
          assert ((n == B.shape[0]) and (m == A.shape[0])), "dimension incompatibility"
          return  np.array(map(lambda index: udateY(index, False, ratings, B, X_, Bias_user, A, G, g, D, d, m, alpha.value), list(range(n))))
      return _update

In [2]:
import numpy as np
from numpy.linalg import inv
from scipy import sparse
import datetime
import random as rd
import math
#sc.setCheckpointDir("/FileStore7")

In [3]:
data = sc.textFile("/FileStore/tables/wkmurkxo1506499092834/Purchases.csv")
header = data.first() #extract header
ratings = data.filter(lambda row : row != header).map(lambda l: l.split(",")).map(lambda rating: (int(rating[0]) , int(rating[1]),  )).distinct() #remove duplicate entries

ratings = ratings.map(lambda x: (x[0], x[1], rd.lognormvariate(3, 1))).persist() #add weights to ratings

customer_features = sc.textFile("FileStore/tables/elwqnkv61506837540758/Customer_Features.csv")
header = customer_features.first() #extract header
customer_features = customer_features.filter(lambda row : row != header) 
#customer_features = customer_features.map(lambda l: l.split(",")).map(lambda x: map(float, x)).filter(lambda x: (x[0] < 10)).persist()
customer_features = customer_features.map(lambda l: l.split(",")).map(lambda x: map(float, x)).persist()
print customer_features.count()

product_features = sc.textFile("/FileStore/tables/elwqnkv61506837540758/Product_Features.csv")
header = product_features .first() #extract header
product_features  = product_features .filter(lambda row : row != header) 
#product_features  = product_features .map(lambda l: l.split(",")).map(lambda x: map(float, x)).filter(lambda x: (x[0] < 8)).persist()
product_features  = product_features .map(lambda l: l.split(",")).map(lambda x: map(float, x)).persist()
print product_features.count()

ratings.take(3)

In [4]:
r3 = Augmented_ALS(ratings, customer_features, product_features)

In [5]:
r3.train(10)

In [6]:
r3.get_AUC2()