In [1]:
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt
import random

In [2]:
def loadLDPC(filename):
    A = sio.loadmat(filename)
    G = A['G']
    H = A['H']
    return G, H

In [3]:
def apply_channel_noise(y, epsilon):
    y_tilde = y.copy()
    for i in range(len(y_tilde)):
        rand = random.uniform(0, 1)
        if rand >= epsilon:
            if y_tilde[i] == 0:
                y_tilde[i] = 1
            elif y_tilde[i] == 1:
                y_tilde[i] = 0
    return y_tilde

In [4]:
def encode_message(x, G):
    new_message = np.dot(G, x) % 2
    return new_message

---

In [5]:
# unary factor
class FactorTypeOne():
    def __init__(self, y_til, epsilon):
        self.y_til = y_til
        self.epsilon = epsilon
    
    def calculate_value(self, y):
        return self.epsilon if y != self.y_til else 1-self.epsilon

In [6]:
class FactorTypeTwo():
    def __init__(self, scope):
        self.scope = np.array(scope)
        
    def calculate_value(self, y_test):
        s = 0
        for i in self.scope:
            s += y_test[i]
        return 1 if s % 2 == 0 else 0

In [7]:
class FactorGraph():
    
    def __init__(self, H, epsilon, y_tilde):
        self.factors_type1 = [] # list of FactorTypeOne
        self.factors_type2 = [] # list of FactorTypeTwo
        self.var_to_factor = {} # map --> (var, [factors related to this var])
        self.factor_to_var = {} # map --> (factor, [vars related to this factor])
        self.messagesVarToFactor = {}
        self.messagesFactorToVar = {}

        for i,b in enumerate(y_tilde):
            self.factors_type1.append(FactorTypeOne(y_tilde[i], epsilon))
            self.var_to_factor[i] = [(1, len(self.factors_type1) - 1), ] # 1 means that the factor is from the first type
            self.factor_to_var[(1, len(self.factors_type1) - 1)] = [i, ] # 1 means that the factor is from the first type
        
        for row in H:
            scope = [var for var in range(len(y_tilde)) if row[var] == 1]
            self.factors_type2.append(FactorTypeTwo(scope))
            
            for i in scope:
                self.var_to_factor[i].append((2, len(self.factors_type2) - 1)) # 2 means that the factor is from the 2nd type
                
            self.factor_to_var[(2, len(self.factors_type2) - 1)] = scope       # 2 means that the factor is from the 2nd type
        
        
    ############################################################################################################       
        
        
    def assignment_probability(self, assignment):
        prob = 1
        
        # For unary Factors:
        for i, b in enumerate(assignment):
            prob_this_bit = self.factors_type1[i].calculate_value(b)
            prob *= prob_this_bit
        
        # Second Type
        for f2 in self.factors_type2:
            prob *= f2.calculate_value(assignment)
            
        return prob
    

    ############################################################################################################       
    
    
    def LoopyBP(self, n_iteration):
        
        for ite in range(n_iteration):

            prevMessagesVarToFactor = {}
            prevMessagesFactorToVar = {}
            
            for i, fcts in enumerate(self.var_to_factor):
                factors = self.var_to_factor[fcts]
                for s in factors:
                    if (i,s) not in self.messagesVarToFactor:
                        self.messagesVarToFactor[(i, s)] = np.array([0.5, 0.5])
                    prevMessagesVarToFactor[(i, s)] = self.messagesVarToFactor[(i, s)]
                    
            for s, vrbs in enumerate(self.factor_to_var):
                variables = self.factor_to_var[vrbs]
                for i in variables:
                    if (vrbs, i) not in self.messagesFactorToVar:
                        self.messagesFactorToVar[(vrbs, i)] = np.array([0.5, 0.5])
                    prevMessagesFactorToVar[(vrbs, i)] = self.messagesFactorToVar[(vrbs, i)]
                    
            # Update the message var -> factor
            for j, fcts in enumerate(self.var_to_factor):
                factors = self.var_to_factor[fcts]
                for I in factors:
                    s0 = 1
                    s1 = 1
                    for J in factors:
                        if J != I:
                            s0 *= prevMessagesFactorToVar[(J, fcts)][0]
                            s1 *= prevMessagesFactorToVar[(J, fcts)][1]
                    self.messagesVarToFactor[(fcts, I)] = np.array([s0, s1])
                            

            # Update the message factor -> var
            for s, I in enumerate(self.factor_to_var):
                variable = self.factor_to_var[I]
                typee = I[0]
                for i in variable:
                    s0 = 0
                    s1 = 0
                    for mask in range(2**(len(variable)-1)):
                        t = 1
                        temp0 = 1
                        temp1 = 1
                        y = [0 for i in range(len(H)*2)]
                        if typee == 1:
                            for cnt, var in enumerate(variable):
                                if var != i:
                                    y[var] = 1 if (mask & (2**cnt)) == 0 else 0
                                    temp0 *= self.factors_type1[var].calculate_value(y[var])
                                    temp1 *= self.factors_type1[var].calculate_value(y[var])
                                else:
                                    temp0 *= self.factors_type1[var].calculate_value(1)
                                    temp1 *= self.factors_type1[var].calculate_value(0)
                        if typee == 2:
                            y[i] = 1
                            temp0 *= self.factors_type2[I[1]].calculate_value(y)
                            y[i] = 0
                            temp1 *= self.factors_type2[I[1]].calculate_value(y)
                        
                        for j in variable:
                            if j != i:
                                t *= prevMessagesVarToFactor[(j, I)][y[j]]
                        s0 += temp0*t
                        s1 += temp1*t
                    self.messagesFactorToVar[(I, i)] = np.array([s0, s1])
                
            # Warning: Don't forget to normalize the message at each time.
            
            
            if ite % 10 == 0 and ite > 0:
                print("Finished Loopy Iteration %s" % ite)
    
    
    ############################################################################################################       
    
    
    def estimateMarginalProbability(self, var):
        '''
        This method assumes LoopyBP has been run
        '''
        res = [(0.0, 0.0)]
        for factor in self.var_to_factor[var]:
            res += np.array(self.messagesFactorToVar[(factor, var)])
        return sum(res)

    ############################################################################################################       
    
    
    def getMarginalMAP(self):
        output = np.zeros(256)
        for i, var in enumerate(range(256)):
            output[i] = np.argmax(self.estimateMarginalProbability(i))
        return output
    
        
    

---

In [8]:
y_tilde = np.array([[1, 1, 1, 1, 1, 1]]).reshape(6, 1)
H = np.array([
        [0, 1, 0, 1, 1, 0],
        [1, 0, 1, 0, 1, 1],
        [0, 1, 1, 0, 1, 0]])
epsilon = 0.05

Graph = FactorGraph(H, epsilon, y_tilde)
ytest1 = np.array([0, 1, 1, 0, 1, 0])
ytest2 = np.array([1, 0, 1, 1, 1, 1])
ytest3 = np.array([1, 0, 1, 1, 0, 1])

print(Graph.assignment_probability(ytest1))
print(Graph.assignment_probability(ytest2))
print(Graph.assignment_probability(ytest3))


#تنها تست دوم غیر صفر شده است که منطقی است چون بیشترین تعداد 1 را دارد
#و تنها در یک خانه با پیام نویزی تفاوت دارد.

0.0
0.038689046874999994
0.0


---

In [9]:
G, H = loadLDPC('GH.mat')

epsilon = 0.05
N = G.shape[1]
x = np.ones((N, 1), dtype='int32')
y = encode_message(x, G)
yTilde = apply_channel_noise(y, epsilon)
G = FactorGraph(H, epsilon, yTilde)
G.LoopyBP(50)
best_estimation = G.getMarginalMAP()
best = best_estimation.astype(int)
count = 0
for i in range(len(best)):
    if (best[i] == 0) and (y[i] == 0):
        count += 1
    if (best[i] == 1) and (y[i] == 1):
        count += 1
print(str(count) + " out of 256 bits are predicted correctly.") 
# خیر تمام بیت های  این پیام با پیام اصلی یکسان نیستند اما حدود 93 درصد بیت ها را به درستی تشخیص داده است.

Finished Loopy Iteration 10
Finished Loopy Iteration 20
Finished Loopy Iteration 30
Finished Loopy Iteration 40
240 out of 256 bits are predicted correctly.
