In [1]:
import numpy as np
import math
import os
from PIL import Image
import numpy as np
import random
import pickle
random.seed(42)

In [2]:
def similarity(vec1 : list, vec2 : list):
    num = 0
    den = len(vec1)
    for i in range(len(vec1)):
        if(vec1[i]==vec2[i]):
            num += 1
    return num/den

In [3]:
def read_pbm(img_path : str) -> list:
    f = open(img_path);
    i = 0;
    c = 0;
    for line in f:
        line = line.strip().split(" ")
        if(i == 2):
            dim1 = int(line[0])
            dim2 = int(line[1])
            pbm_vec = [0 for _ in range(dim1 * dim2)]
        if(i > 2):
            for j in range(len(line)):
                if(line[j] == "1"):
                    pbm_vec[c] = 1
                elif(line[j] == "0"):
                    pbm_vec[c] = -1
                c += 1
        i += 1
    return pbm_vec

In [4]:
def write_pbm_file(pbm_vec : list, name : str):
    with open(f"./output_images/{name}.pbm", "w") as f:
        s = int(math.sqrt(len(pbm_vec)))
        f.write('P1\n')
        f.write(f'# {s}x{s} {name}\n')
        f.write(f'{s} {s}\n')
        
        for i in range(len(pbm_vec)):
            if(pbm_vec[i]==1):
                f.write(f'{1:2}')
            elif(pbm_vec[i]==-1):
                f.write(f'{0:2}')
            if(i + 1) % 16 == 0:
                f.write('\n')

In [5]:
def viz_pbm_file(img_path : str):
    f = open(img_path)

    for i in range(32): print("━", end='')
    print()
    
    i = 0
    for line in f:
        line = line.strip().split(" ")
        if i > 2:
            for j in range(len(line)):
                if line[j] == "1":
                    print("██", end='')
                elif line[j] == "0":
                    print("  ", end='')
            print()
        i += 1 

    for i in range(32): print("━", end='')
    print()

def viz_pbm_vec(pbm_vec : list):

    for i in range(32): print("━", end='')
    print()

    i = 0
    for i in range(len(pbm_vec)):
        if pbm_vec[i] == 1:
            print("██", end='')
        elif pbm_vec[i] == -1:
            print("  ", end='')
        if(i + 1) % 16 == 0:
            print() 

    for i in range(32): print("━", end='')
    print()



def vec_to_png(vec, fname, w=16):
    h = len(vec) // w
    img = np.array(vec, dtype=np.int8).reshape((h, w))
    img = np.where(img == 1, 0, 255).astype(np.uint8)
    Image.fromarray(img, mode='L').save(fname)


In [6]:
def hebbian_train(input_imgs : list, dims : list):

    num_nodes = dims[0] * dims[1]
    wt_matrix = [[0 for i in range(num_nodes)] for j in range(num_nodes)]

    for input_img in input_imgs:
        if(len(input_img) != num_nodes):
            raise ValueError("The input img is not of the network's dimension")

    for i in range(num_nodes):
        for j in range(num_nodes):
            if(i == j):
                wt_matrix[i][j] = 0
            else:
                node_pdt_sum = 0
                for k in range(0, len(input_imgs)):
                    node_pdt_sum += (input_imgs[k][i] * input_imgs[k][j]) 
                wt_matrix[i][j] = (node_pdt_sum / len(input_imgs))     
    
    return wt_matrix


In [7]:
def hopfield_inference_sync(hopfield_wts : list, input_img : list, name : str):
    prev_state = []
    current_state = input_img.copy()

    fpath = f"./q2_images/{name}/sync"
    os.makedirs(fpath)
    vec_to_png(current_state, f"{fpath}/input.png")

    c = 1
    
    while not np.array_equal(prev_state, current_state):
        
        prev_state = current_state.copy()
        wted_sum = 0

        i = 0
        for i in range(len(hopfield_wts)):
            
            j = 0
            wted_sum = 0
            for j in range(len(hopfield_wts[i])):
                wted_sum += hopfield_wts[i][j] * prev_state[j]
            
            if(wted_sum >= 0):
                current_state[i] = 1
            else:
                current_state[i] = -1
        
        
        vec_to_png(current_state, f"{fpath}/iter_{c}.png")

        c += 1
        
    return current_state 
        
        
def hopfield_inference_async(hopfield_wts : list, input_img : list, name : str):
    prev_state = []
    current_state = input_img.copy();

    fpath = f"./q2_images/{name}/async"
    os.makedirs(fpath)
    vec_to_png(current_state, f"{fpath}/input.png")

    c = 1
    while not np.array_equal(prev_state, current_state):
        
        prev_state = current_state.copy()
        wted_sum = 0

        indices = list(range(len(hopfield_wts)))
        random.shuffle(indices)
        z = 0
        for i in indices:
            wted_sum = 0
            for j in range(len(hopfield_wts[i])):
                wted_sum += hopfield_wts[i][j] * current_state[j]     
            if(wted_sum >= 0):
                current_state[i] = 1
            else:
                current_state[i] = -1
            z += 1

            if(z%100 == 0):
                vec_to_png(current_state, f"{fpath}/iter_{c}_pixel_{z}.png")
                 
        c += 1
        vec_to_png(current_state, f"{fpath}/final.png") 

    return current_state

In [8]:
def corrupt_pbm_vec(pbm_vec : list, prob : float):

    corrupted_vec = pbm_vec.copy()
    for i in range(0, len(pbm_vec)):
        r = random.random()
        if(r < prob):
            if(pbm_vec[i] == 1): 
                corrupted_vec[i] = -1;
            else:
                corrupted_vec[i] = 1;

    return corrupted_vec

In [9]:
def crop_pbm_vec(pbm_vec : list, crop_side : int, tl : list, w : bool):

    cropped_vec = pbm_vec.copy()
    row = 0
    col = 0
    for i in range(0, len(pbm_vec)):
        
        if(not((row >= tl[0]) and (row < (crop_side+tl[0])) and (col >= tl[1]) and (col < (crop_side+tl[1])))):
            cropped_vec[i] = -1 if w else 1
            
        col += 1
        if(col == math.sqrt(len(pbm_vec))):
            col = 0
            row += 1
            
    return cropped_vec

In [10]:
dirname = "./pbm_images/"

dir = os.fsencode(dirname)
pbm_images = []
names = []

for pbm_img in os.listdir(dir):
    pbm_images.append(read_pbm(dirname + os.fsdecode(pbm_img)))
    viz_pbm_file(dirname + os.fsdecode(pbm_img))
    names.append(pbm_img)
    
for i in range(len(names)):
    names[i] = names[i].decode()

hopfield_wts = hebbian_train(pbm_images, dims=[16,16])

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                                
  ████████████████████████████  
  ██                        ██  
  ██                        ██  
  ██      ████████████      ██  
  ██      ████████████      ██  
  ██      ████████████      ██  
  ██      ████████████      ██  
  ██      ████████████      ██  
  ██      ████████████      ██  
  ██      ████████████      ██  
  ██                        ██  
  ██                        ██  
  ██                        ██  
  ████████████████████████████  
                                
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
              ████              
            ████████            
          ████    ████          
        ████        ████        
      ████            ████      
      ████            ████      
    ████████████████████████    
  ████                  ████    
  ████                  ████    
████                      ████  
████                      ████  
████      

In [11]:
for pbm_img in os.listdir(dir):
    hs = hopfield_inference_sync(hopfield_wts=hopfield_wts, input_img=corrupt_pbm_vec(read_pbm(dirname + os.fsdecode(pbm_img)), 0.3), name=f"corrupted_{pbm_img}")
    hs = hopfield_inference_async(hopfield_wts=hopfield_wts, input_img=corrupt_pbm_vec(read_pbm(dirname + os.fsdecode(pbm_img)), 0.3), name=f"corrupted_{pbm_img}")

    hs = hopfield_inference_sync(hopfield_wts=hopfield_wts, input_img=crop_pbm_vec(read_pbm(dirname + os.fsdecode(pbm_img)), 10, [3,3], True), name=f"cropped_{pbm_img}")
    hs = hopfield_inference_async(hopfield_wts=hopfield_wts, input_img=crop_pbm_vec(read_pbm(dirname + os.fsdecode(pbm_img)), 10, [3,3], True), name=f"cropped_{pbm_img}")