In [37]:
import random
import multiprocessing as mp
import matplotlib.pyplot as plt
import numpy as np
from pysat.solvers import Solver

"""
This is class for finding a course graining from A to B with a super cell width of n

functions:
    run()
Performs the general DPLL recursion call and then does random testing afterwards

    run_bf()
Same as run but performs a brute force search for testing

    bt_run()
Same as run
Returns the root projection along with the depth at which it back tracked out of the rec call
Used for generating stats for finding good/bad heuristics

    LSH(X)
Does the left hand side calculation (A^n) on X
Note: P is applied later on in helper.

    RHS(X)
Does the right hand side calculation (BP) on X

    init_prog_check()
Checks to see if all the initial projections have been tried yet.
Makes sure that before returning false that everything has been tried

    prog_check()
Makes sure that every projection has been assigned a value before exiting
    
    rec(level)
level is used to track the depth of the recursion call
Calls rec within itself twice given it meets conditions of the helper and prog_check
Does a lot of stuff to make sure that nothing gets left unassigned and it procudes 
a somewhat good Projection

    helper(b_prog)
b_prog is branch projection
Where the switch is to check the false polynomial time.

Performs the checks to see if 
    cg_Proj[LHS] == RHS : True
    cg_Proj[LHS] is undefined : Assign cg_Proj[LHS] = LSH and return true
    cg_Proj[LHS] != LHS : False
    
    RHS_t(X)
Performs the right hand side calculation on X

    test()
Creates rec_test_amount number of valid inputs with combinations of 
projection rules that have been assigned

    random_testing(amount)
amount is the number of random tests you want
creates and test an amount worth of random tests to check the Projection

    brute_force_check()
Tests all the inputs against the created projection
"""

class cg:
    def __init__(self,A,B,n):
        A_rBits = '{0:08b}'.format(A)
        B_rBits = '{0:08b}'.format(B)
        self.A_rDict = {'{0:03b}'.format(n) : A_rBits[::-1][n] for n in range(8)}
        self.B_rDict = {'{0:03b}'.format(n) : B_rBits[::-1][n] for n in range(8)}
        self.cg_Proj = {'{1:0{0:0d}b}'.format(n,i) : "" for i in range(2**n)}
        self.init_visit = {'{1:0{0:0d}b}'.format(n,i) : False for i in range(2**n)}
        self.n = n
        self.level = 0
        self.back_tracking = True
        self.bt_count = 0
        self.first_projection = None
        self.rec_test_amount = 500 * self.n
        self.debug = False
        self.false_poly_check = False
    
    def run(self):
        result = None
        while (self.back_tracking and self.init_prog_check()):
            if self.debug: print("Entering")
            result = self.rec(0)
        if self.debug: print("Leaving")
        if result[0]:
            if (self.random_testing(10**4) != 0):
                print("Oh no. Random Testing found invalid projection")
        return result
    
    def run_bf(self):
        result = None
        while (self.back_tracking and self.init_prog_check()):
            if self.debug: print("Entering")
            result = self.rec(0)
        if self.debug: print("Leaving")
        if result[0]:
            if (self.brute_force_check() != 0):
                print("Oh no. Brute Force Testing found invalid projection")
        return result
    
    def bt_run(self):
        result = None
        c = 0
        bad_initial_prop = []
        while (self.back_tracking and self.init_prog_check()):
            if debug: print("Entering")
            result = self.rec(0)
            #Came out and was back tracking the entire time
            #if(self.back_tracking):
            bad_initial_prop.append((self.first_projection, self.bt_count))
            self.bt_count = 0
            
        if debug: print("Leaving") 
        if result[0]:
            if (self.random_testing(10**4) != 0):
                print("Oh no. Random Testing found invalid projection")
            else:
                return bad_initial_prop
        return []
        
        
    #PA^n
    def LHS(self,X):
        #Does Rule A n times on input X
        for i in range(self.n):
            X = ''.join(self.A_rDict[X[i-1:i+2]] for i in range(1,len(X)-1))
        return X    

    #BP
    def RHS(self,key):
        X = "{0}{0}{0}".format(self.cg_Proj[key])
        X = ''.join(self.B_rDict[X[i-1:i+2]] for i in range(1,len(X)-1))
        return X
    
    def init_prog_check(self):
        t_list = list(self.init_visit)
        t = next((key for key in t_list if self.init_visit[key] == False), None)
        if t != None:
            return True
        return False
            
    def prog_check(self):
        t_list = list(self.cg_Proj)
        t = next((key for key in t_list if self.cg_Proj[key] == ""), None)
        if t == None:
            if self.debug: print("Prog check -- on way out")
            return True
        return False
    
    def rec(self,level):
        #Go to first entry with negative one value, kinda ((random)) but not really.
        #The goal of level is to track where a backtracking occurs and see if it related to
        #the first choice in the propogation
        
        #Thought: If a coarse graining exists then is there a bad first selection? maybe count the number
        #of backtracks as they might be related to the orginal selection.
        t_list = list(self.cg_Proj)
        random.shuffle(t_list)
        b_prog = next((key for key in t_list if self.cg_Proj[key] == ""), None)
        
        
        self.back_tracking = False

        #If at the first level log the first projection p
        if level == 0:
            self.first_projection = b_prog
            self.init_visit[b_prog] = True
            #Add check here to check if its back tracked to first projection assumption
        #print(b_prog)
        
        if(b_prog != None):
            x = [1,0]
            random.shuffle(x)
            #x[0] Branch
            self.cg_Proj[b_prog] = str(x[0])
            lhs_key = self.LHS('{0}{0}{0}'.format(b_prog))
            lhs_temp = self.cg_Proj[lhs_key]
            if self.debug: print("left branch test ", level)
            if (self.helper(b_prog) and self.test()):
                if self.rec(level+1)[0] and self.prog_check():
                    if self.debug: print("left branch work ", level)
                    self.level = level
                    return True, level
            
            #Undo potenial work from x[0] branch && b_prog
            self.cg_Proj[lhs_key] = lhs_temp
            if self.debug: print("left branch fail ", level)
            
            
            #x[1] Branch
            self.cg_Proj[b_prog] = str(x[1])
            lhs_key = self.LHS('{0}{0}{0}'.format(b_prog))
            lhs_temp = self.cg_Proj[lhs_key]
            if self.debug: print("right branch test ", level)
            if (self.helper(b_prog) and self.test()):
                if self.rec(level+1)[0] and self.prog_check():
                    if self.debug: print("right branch work ", level)
                    self.level = level
                    return True,level
                
            #Undo potenial work from x[1] branch && b_prog
            self.cg_Proj[lhs_key] = lhs_temp
            self.cg_Proj[b_prog] = ''
            
            self.bt_count += 1
            self.back_tracking = True
            #Both Branches Failed
            if self.debug: print("!!!Back Tracking!!!  ", level)
            return False, level
        if self.debug: print("all the way outside  ", level)
        return True, level
    
    def helper(self,b_prog):
        rhs_pred = self.RHS(b_prog)
        lhs_calc = self.LHS('{0}{0}{0}'.format(b_prog))
        if self.debug: print("-Helper- RHS {} LHS {}".format(rhs_pred,lhs_calc))
        #Is the left hand side's projection already defined?
        if(self.cg_Proj[lhs_calc] == ""):
            #Predictive Branching for -1
            if self.debug: print("Unassigned Branch")
            #If I change it too I think it short circuit self.cg_Proj[b_prog]
            #But removes the possibility of odds self mapping
            if self.false_poly_check:
                self.cg_Proj[lhs_calc] = self.cg_Proj[b_prog]
            else:
                self.cg_Proj[lhs_calc] = rhs_pred
            return True
        elif(self.cg_Proj[lhs_calc] == rhs_pred):
            if self.debug: print("Good match")
            #Already Mapped
            return True
        else:
            #Projection mess up, Test other branch then go back up the tree
            if self.debug: print("Projection mess up")
            return False
        
    def RHS_t(self,X):
        X = ''.join(self.cg_Proj[x] for x in [X[:self.n],X[self.n:2*self.n],X[2*self.n:]])
        X = ''.join(self.B_rDict[X[i-1:i+2]] for i in range(1,len(X)-1))
        return X

    def test(self):
        #Similiar to cg_tester but only uses projections that have already been definded
        #This is so PB always works but PA^n doesnt have a value for it
        valid_projection = [key for key in self.cg_Proj if self.cg_Proj[key] != ""]

        for i in range(self.rec_test_amount):
            X = ''.join(random.choices(valid_projection, k = 3))
            #print("Testing : {}".format(X))
            rhs_pred = self.RHS_t(X)
            lhs_pred = self.cg_Proj[self.LHS(X)]

            #if LHS is undefined it will be ignored
            if(lhs_pred != "" and (rhs_pred != lhs_pred)):
                #print("Not a good prediction: {} {} {}".format(X,rhs_pred,lhs_pred))
                if self.debug: print("Test found bad match: {} {} {}".format(X,rhs_pred,lhs_pred))
                return False
        return True
    
    def random_testing(self,amount):
        #Amount is the number of samples you want to test against
        #3*n is the number of bits -> 2^(3*n)-1 is largest decimal number that can be represented
        ub = 2**(3*self.n) - 1
        c = 0
        for i in range(amount):
            test = '{1:0{0:0d}b}'.format(3*self.n,random.randint(0,ub))
            rhs_pred = self.RHS_t(test)
            lhs_pred = self.cg_Proj[self.LHS(test)]
            if(not(rhs_pred == lhs_pred)):
                print("Fail with: {}".format(test))
                print(rhs_pred,lhs_pred)
                c += 1
        #print("fail Rate: " ,c/amount)
        return c
    
    def brute_force_check(self):
        c = 0
        for i in range(2**(3*self.n)-1):
            test ='{1:0{0:0d}b}'.format(3*self.n,i)
            rhs_pred = self.RHS_t(test)
            lhs_pred = self.cg_Proj[self.LHS(test)]
            if(not(rhs_pred == lhs_pred)):
                print("Fail with: {}".format(test))
                print(rhs_pred,lhs_pred)
                c += 1
        #print("fail Rate: " ,c/amount)
        return c
    
"""
cg_sat
Coarse graining while using a sat solver

Incomplete cause I needed a XOR Sat solver
"""
class cg_sat(cg):
    def __init__(self,A,B,n):
        super(cg_sat,self).__init__(A,B,n)
        self.dic = {}
        self.assumptions = []
        self.s = Solver(name='g4', use_timer = True)
    
    def variables(self):
        c = 1
        #These will be assumptions used with the solver
        for b_val,i in zip(self.B_rDict.values(),list(range(8))):
            self.dic[f'B{i}'] = c
            if int(b_val) == 1:
                self.assumptions.append(self.dic[f'B{i}'])
            else:
                
                self.assumptions.append(self.dic[f'B{i}'] * -1)
            c += 1
            
        #Projection variables
        for i in range(2**(self.n) - 1):
            var = '{1:0{0:0d}b}'.format(self.n,i)
            self.dic[var] = c
            c += 1
    
    def clauses(self):
        for i in range(2**(3*self.n)-1):
            binary_input = '{1:0{0:0d}b}'.format(3*self.n,i)
            LHS = self.dic[self.LHS(binary_input)]
            RHS = [self.dic[x] for x 
                   in [X[:self.n],X[self.n:2*self.n],X[2*self.n:]]]
            
            
    
"""
For multiprocessing, used for heuristic stats
"""
def bt_mp(a):
    t = cg(a[0],a[1],3)
    t.rec_test_amount = 400
    return t.bt_run()
"""
A:str, B:str, n:int
Prints graph for heuristics
if n = 0 it pulls from generated data
else it calculates on the spot.
"""
def stats(A,B, n = 0):
    f = open("stats.txt", 'r')
    vals = []
    if(n ==0):
        for line in f.readlines():
            #Will have format A,B,Propagation Rule,Count
            l = line.split(',')
            if(l[0] == A and l[1] == B):
                vals.append([l[2],int(l[3][:-1])])
    else:
        for i in range(20):
            vals + cg(int(A),int(B),n).bt_run()
    if (vals != []):
        print(vals)
        fig,ax = plt.subplots(figsize = (10, 5))
        labels = np.array(vals)[:,0]
        vals = np.array(vals)[:,1].astype(int)
        ax.bar(labels, vals)
        plt.show()
    else:
        print("No Backtracking found")

"""
Shows my claim in the time complexity portion

A/B_start are here to allow you start the search at a different area
"""
def false_poly_check(A_start=0,B_start=0,n=3):
    odd_self = [15,85,51]
    for A in range(A_start,256):
        print(A)
        for B in range(B_start,256):
            t =cg(A,B,3)
            t.false_poly_check = True
            res = t.run()[0]
            t1 = cg(A,B,3)
            res1 = t1.run()[0]
            
            #Shouldn't go in these
            if(not(A in odd_self) and not(B in odd_self) and res1 != res):
                return A,B,res
            
            if( B <= 128 and ( ( B % 2 == 0 and not(res) ) or ( B % 2 == 1 and res )) 
               or (B > 128 and not(res))):
                return A,B,res
 

IndentationError: expected an indented block (<ipython-input-37-9a589d3dff3f>, line 325)

In [40]:
a = cg_sat(1,1,2)
a.variables()

b = str(5)
print(type(b))

<class 'str'>
