Environment Setup

In [1]:
# 1. Import Libraries
import numpy as np
from random import randint

Build and Train

In [2]:
# 2. Define RBM Class
class RBM:
  # Initialize w/ hidden/visible units and debug to track progress
  def __init__(self, num_visible, num_hidden):
    self.num_hidden = num_hidden
    self.num_visible = num_visible
    self.debug_print = True

    # Initialize Weights
    np_rng = np.random.RandomState(1234)
    self.weights = np.asarray(np_rng.uniform(
        low = 0.1 * np.sqrt(6. / (num_hidden + num_visible)),
        high = 0.1 * np.sqrt(6. / (num_hidden + num_visible)),
        size = (num_visible, num_hidden)
    ))

    self.weights = np.insert(self.weights, 0, 0, axis = 0)
    self.weights = np.insert(self.weights, 0, 0, axis = 1)

  # Method to train RBM
  def train(self, data, max_epochs, learning_rate):
    num_examples = data.shape[0]
    data = np.insert(data, 0, 1, axis = 1)

    # PE08 Alteration Loop for training
    for epoch in range(max_epochs):
      pos_hidden_activations = np.dot(data, self.weights)
      pos_hidden_probs = self._logistic(pos_hidden_activations)
      pos_hidden_probs[:, 0] = 1
      pos_hidden_states = pos_hidden_probs > np.random.rand(num_examples, self.num_hidden + 1)
      pos_associations = np.dot(data.T, pos_hidden_probs)

      neg_visible_activations = np.dot(pos_hidden_states, self.weights.T)
      neg_visible_probs = self._logistic(neg_visible_activations)
      neg_visible_probs[:, 0] = 1
      neg_hidden_activations = np.dot(neg_visible_probs, self.weights)
      neg_hidden_probs = self._logistic(neg_hidden_activations)
      neg_associations = np.dot(neg_visible_probs.T, neg_hidden_probs)

      self.weights += learning_rate * ((pos_associations - neg_associations))

      error = np.sum((data - neg_visible_probs) ** 2)
    # PE08 Alteration Return Negative Visible Probabilities
    return neg_visible_probs[:, 1:]

  # Method to implement logistic sigmoid activation function
  def _logistic(self, x):
    return 1.0 / (1 + np.exp(-x)) # Ensures larger pos to 1, neg to 0

Main Function to Run

In [3]:
from re import L
# 3. Define main function
def main():
  pt = 0
  titles = [
      "24H in Kamba", "Lost", "Cube Adventures", " A Holiday",
      "Jonathan Brooks", "The Melbourne File", "WNC Detectives",
      "Stars", "Space L", "Zone 77"
  ]

  movies_feature_map = np.array([
      [1, 1, 0, 0, 1, 1], [1, 1, 0, 1, 1, 1], [1, 0, 0, 0, 0, 1],
      [1, 1, 0, 1, 1, 1], [1, 0, 0, 0, 1, 1], [1, 1, 0, 1, 1, 0],
      [1, 0, 0, 0, 0, 0], [1, 1, 0, 1, 1, 0], [1, 1, 0, 0, 0, 1],
      [1, 0, 0, 1, 1, 1], [1, 1, 0, 0, 0, 0], [1, 0, 0, 1, 1, 1],
      [1, 1, 0, 0, 1, 0], [1, 1, 0, 1, 1, 1], [1, 1, 0, 0, 1, 1],
  ])

  dialog_output = np.zeros((6, 6), dtype = int)
  mc = 0
  a = "no"

  if pt == 1:
    print("Movie likes:")

  while mc < 6:
    m = randint(0, 9)
    b = randint(0, 1)

    if mc < 6 and (a == "yes" or b == 1):
      if pt == 1:
        print("title likes:", titles[m])
      for i in range(6):
        dialog_output[mc, i] = movies_feature_map[m, i]
      mc += 1

    if mc >=6:
      break

  if pt == 1:
    print("dialog output", dialog_output)

  training_data = dialog_output
  r = RBM(num_visible = 6, num_hidden = 2)
  max_epochs = 5000
  learning_rate = 0.001

  # PE08 Alteration train and print reconstructed data
  reconstructed = r.train(training_data, max_epochs, learning_rate)
  print(f"Reconstucted: \n{np.round(reconstructed, 3)}")
  print(f"\nReconstructed (Rounded): \n{np.round(reconstructed)}")


  F = ["love", "happiness", "family", "horizons", "action", "violence"]
  f = open("features.tsv", "a")
  f1 = 0

  best1 = 1000
  pos = 10
  control = 0

  if (control == 1):
    tcont = 0
    control = [0, 0, 0, 0, 0, 0]

    for j in range(0, 6):
      for i in range(0, 6):
        control[i] += dialog_output[j][i]
        tcont += dialog_output[j][i]

    tcontrol = control

    cfi = 0

    for b in range(0, 6):
      if tcontrol[b] >= tcontrol[cf1]:
        cf1 = b

    first = 0

    for b in range(0, 6):
      if b != cf1:
        if first == 0:
          cf2 = b; first = 1
        if first > 0:
          if tcontrol[b] >= tcontrol[cf2]:
            cf2 = b

    totw = 0

    for w in range(1, 7):
      if w > 0:
        print(F[w - 1], ":", r.weights[w, 0])
        per = r.weights[w, 0] + pos
        totw += per

    if totw <= 0:
      totw = 1

    for w in range(1, 7):
      if w > 0:
        per = r.weights[w, 0] + pos
        print("CONTROL", F[w - 1], ":", round(r.weights[w, 0], 3),
              totw, round(per / totw, 3))

    control = [0, 0, 0, 0, 0, 0]

    for j in range(0, 6):
      for i in range(0, 6):
        control[i] += dialog_output[j][i]

  for w in range(1, 7):
    if w > 0:
      if pt == 1:
        print(F[w - 1], ":", r.weights[w, 0])

      tw = r.weights[w, 0] + pos

      if tw > best1:
        f1 = w - 1
        best1 = tw

      f.write(str(r.weights[w, 0] + pos) + "\t")

  f.write("\n")
  f.close()
  f2 = 0

  best2 = -1000

  for w in range(1, 7):
    if w > 0:
      tw = r.weights[w, 0] + pos
      if tw > best2 and w - 1 != f1:
        f2 = w - 1
        best2 = tw

  u = randint(1, 10000)
  vname = "viewer_" + str(u)

  if pt == 1:
    print("Control", control)
    print("Principal Features: ", vname, f1, f2, "control")

  f = open("labels.tsv", "a")
  f.write(vname + "\t" + F[f1] + "\t" + F[f2] + "\t")
  f.write("\n")
  f.close()

Run RBM

In [4]:
if __name__ == "__main__":
  main()

Reconstucted: 
[[0.992 0.841 0.01  0.67  0.992 0.498]
 [0.992 0.841 0.01  0.67  0.992 0.498]
 [0.967 0.769 0.042 0.634 0.967 0.521]
 [0.992 0.841 0.01  0.67  0.992 0.498]
 [0.967 0.769 0.042 0.634 0.967 0.521]
 [0.992 0.841 0.01  0.67  0.992 0.498]]

Reconstructed (Rounded): 
[[1. 1. 0. 1. 1. 0.]
 [1. 1. 0. 1. 1. 0.]
 [1. 1. 0. 1. 1. 1.]
 [1. 1. 0. 1. 1. 0.]
 [1. 1. 0. 1. 1. 1.]
 [1. 1. 0. 1. 1. 0.]]
