In [1]:
# Simulation for SAT solving shardware
# reference: Shim, Chaeyun, Jooyoung Bae, and Bongjin Kim. "30.3 VIP-Sat: A Boolean Satisfiability Solver Featuring 5× 12 Variable In-Memory Processing Elements with 98% Solvability for 50-Variables 218-Clauses 3-SAT Problems." 2024 IEEE International Solid-State Circuits Conference (ISSCC). Vol. 67. IEEE, 2024.

In [2]:
# 데이터셋 위치  
# /data2/sat/uf50-218  
# /data2/sat/uf20-91 
# 출처: https://www.cs.ubc.ca/~hoos/SATLIB/benchm.html

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import copy 
import os

In [4]:
def count_files_in_directory(directory):
    try:
        # 폴더 내의 모든 파일 목록 가져오기
        files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
        return files
    except FileNotFoundError:
        print(f"Error: The directory '{directory}' does not exist.")
        return None


In [5]:
def read_cnf_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    clauses = []

    clause_save_start = False
    clause_count = 0
    for line in lines:
        if clause_save_start == True:
            clause = list(map(int, line.strip().split()))
            assert clause[-1] == 0
            clause = clause[:-1]  # Remove the trailing 0
            clauses.append(clause)
            clause_count += 1
            if clause_count == num_clauses:
                break

        if line.startswith(('p')) and clause_save_start == False:
            clause_save_start = True
            _, _, num_vars, num_clauses = line.split()
            num_vars = int(num_vars)
            num_clauses = int(num_clauses)
    assert clause_save_start == True

    clauses = np.array(clauses)
    return clauses, num_vars, num_clauses

In [6]:
class VPE:
    def __init__(self, MY_VAR):
        """
        VPE 초기화 메서드
        :param MY_VAR: 이 VPE가 담당할 변수 (예: v1, v2 등)
        """
        self._MY_VAR = MY_VAR  # 이 VPE가 처리할 변수
        self.MY_STATE = 1       # 변수의 상태 

        self.clauses = []         # 변수가 속한 절들의 리스트
        self.neighbors = set()    # 상관관계 있는 이웃 변수들

    @property # MY_VAR 속성은 읽기 전용
    def MY_VAR(self):
        return self._MY_VAR
    
    def add_clause(self, all_clauses):
        """
        변수가 속한 절을 추가하는 메서드
        :param all_clauses: 전체 절들의 리스트
        """
        clauses_clone = copy.deepcopy(all_clauses)
        for clause in clauses_clone:
            clause_abs = [abs(x) for x in clause]

            if self.MY_VAR in clause_abs:
                self_index = clause_abs.index(self.MY_VAR)
                temp = clause[self_index]
                clause = np.concatenate((clause[:self_index], clause[self_index + 1:]))  # self_index에서 요소 제거
                clause_abs = clause_abs[:self_index] + clause_abs[self_index+1:]
                temp2 = np.insert(clause, 0, temp)
                self.clauses.append(temp2)

                # 절 내에서 다른 변수들을 이웃으로 추가
                for var_abs in clause_abs:
                    self.neighbors.add(var_abs)
        
    def get_neighbors(self):
        """
        상관관계 있는 이웃 변수들을 반환하는 메서드
        """
        return list(self.neighbors)

    def process(self, var_state_array):
        """
        변수를 처리하는 메서드 (예: 값 업데이트 등)
        이 부분은 문제 해결 알고리즘에 따라 달라질 수 있음.
        """

        # Adder Tree
        tree_sum = 0

        # Clause Column
        for clause in self.clauses:
            SI = 0 if clause[0] < 0 else 1 # 음수면 0, 양수면 1 # sign bit 같은 거임.
            SL = 0 if clause[1] < 0 else 1
            SR = 0 if clause[2] < 0 else 1

            VI = var_state_array[abs(clause[0])-1] # 현재 V값 가져오기
            VL = var_state_array[abs(clause[1])-1]
            VR = var_state_array[abs(clause[2])-1]

            assert VI == self.MY_STATE, f'{VI}, {self.MY_STATE}'

            # S와 V의 XOR값(같으면 0 다르면 1)이 3개가 나옴. (SIVI, SLVL, SRVR) 한개라도 XOR값이 0이어야 Clause satisfied.
            
            # Clause Logic
            if (SL ^ VL) and (SR ^ VR) == 1: 
                C0 = 1 # MY_VAR가 중요해졌다는 flag.
                C1 = 1 if SI == 0 else 0 # SI 토글
            else:
                C0 = 0
                C1 = 0

            # Adder Tree
            if C0 == 1:
                if C1 == 1:
                    tree_sum += -1
                else: # C1 == 0
                    tree_sum += 1
            else: # C0 == 0
                tree_sum += 0

        # Variable Update Logic
        if tree_sum < 0:
            self.MY_STATE = 0
        elif tree_sum == 0:
            self.MY_STATE = 1 if self.MY_STATE == 0 else 0
        elif tree_sum > 0:
            self.MY_STATE = 1
        else: 
            assert False
        return self.MY_STATE
    

In [7]:
class VPE_ARRAY:
    def __init__(self, my_seed=42,  num_rows=5, num_cols=12, clauses=[[1,2,-3],[2,-1,57]], num_vars=50, num_clauses=218):
        """
        VPE_ARRAY 초기화 메서드
        :param num_rows: VPE 배열의 행 개수 (기본값: 5)
        :param num_cols: VPE 배열의 열 개수 (기본값: 12)
        :param clauses: 3-SAT 문제의 절(clause) 리스트
        :param num_vars: 변수의 총 개수
        :param num_clauses: 절의 총 개수
        """
        self.my_seed = my_seed
        self.num_rows = num_rows  # 행 개수
        self.clauses = clauses
        self.num_cols = num_cols  # 열 개수
        self.num_vars = num_vars  # 변수의 총 개수

        # 랜덤 시드 초기화
        random.seed(self.my_seed)

        # 상관 관계 행렬 
        self.correlation_matrix = self.create_correlation_matrix()

        # VPE 배열 생성
        self.VPES, self.VPE_index, self.assigned_abs_vars = self.assign_variables_to_vpe()

        # uncorrelated columns check
        assert self.check_uncorrelated_columns(self.VPE_index, self.correlation_matrix)

        # VPE 배열에 모든 절 삽입
        self.add_clause_to_vpe()
        
        # variable state initialize
        self.var_state_array = self.variable_state_initialize()
        
        # run
        # 외부에서 실행하세요.    


    def create_correlation_matrix(self):
        """
        상관 관계 행렬을 생성하는 메서드

        Args:
            clauses (list): 변수의 절(clause) 리스트
        """
        # 각 절(clause)에서 변수를 두 개씩 비교하며 상관 관계를 행렬에 기록
        correlation_matrix = np.zeros((self.num_vars, self.num_vars), dtype=int)
        for clause in self.clauses:
            for i, var1 in enumerate(clause):
                for var2 in clause[i + 1:]:
                    abs_var1, abs_var2 = abs(var1) - 1, abs(var2) - 1  # 변수 인덱스는 0부터 시작하므로 -1 처리
                    correlation_matrix[abs_var1, abs_var2] = 1  # 상관된 변수 간에는 1을 표시
                    correlation_matrix[abs_var2, abs_var1] = 1  # 상관 행렬은 대칭이므로 반대 방향도 동일하게 처리
        
        return correlation_matrix
    
    def assign_variables_to_vpe(self):
        assigned_abs_vars = set()
        VPES = [[None for col in range(self.num_cols)] for row in range(self.num_rows)]

        deploy_search_iter = 0
        while True:
            deploy_search_iter += 1

            # 사용한 변수를 추적하기 위한 리스트
            assigned_abs_vars = set()

            # 2D 어레이(행렬)로 VPE 배열 생성 (각 column에는 상관 관계가 없는 변수들 할당)
            VPES = [[None for col in range(self.num_cols)] for row in range(self.num_rows)]
            
            # Stage 1 : numvars//numcols 만큼의 row까지만 할당
            # 변수 할당을 위한 작업
            current_row_at_column = [0 for _ in range(self.num_cols)]
            for col in range(self.num_cols):
                # 상관 관계가 없는 변수들을 찾음
                shuffled_abs_vars_range = list(range(1, self.num_vars + 1))
                random.shuffle(shuffled_abs_vars_range)
                for abs_var in shuffled_abs_vars_range:
                    if abs_var in assigned_abs_vars:
                        continue  # 이미 할당된 변수는 건너뜀

                    # 상관되지 않은 변수인지 확인
                    can_assign = True
                    for prev_row in range(current_row_at_column[col]):
                        prev_abs_var = VPES[prev_row][col].MY_VAR  # 이전에 같은 column에 배치된 변수
                        if self.correlation_matrix[abs_var - 1][prev_abs_var - 1] == 1:  # 상관된 변수면 할당 불가
                            can_assign = False
                            break

                    if can_assign == True:
                        assert VPES[current_row_at_column[col]][col] == None
                        VPES[current_row_at_column[col]][col] = VPE(abs_var)  # 변수 할당
                        assigned_abs_vars.add(abs_var)  # 할당된 변수로 등록
                        current_row_at_column[col] += 1

                    if current_row_at_column[col] >= self.num_vars//self.num_cols:  # 이 column에 모두 할당했으면 다음 column으로
                        break

            # Stage 2: 나머지 row에 할당
            shuffled_col_list = list(range(self.num_cols))
            random.shuffle(shuffled_col_list)
            for col in shuffled_col_list:
                # 상관 관계가 없는 변수들을 찾음
                shuffled_abs_vars_range = list(range(1, self.num_vars + 1))
                random.shuffle(shuffled_abs_vars_range)
                for abs_var in shuffled_abs_vars_range:
                    if abs_var in assigned_abs_vars:
                        continue  # 이미 할당된 변수는 건너뜀

                    # 상관되지 않은 변수인지 확인
                    can_assign = True
                    for prev_row in range(current_row_at_column[col]):
                        prev_abs_var = VPES[prev_row][col].MY_VAR  # 이전에 같은 column에 배치된 변수
                        if self.correlation_matrix[abs_var - 1][prev_abs_var - 1] == 1:  # 상관된 변수면 할당 불가
                            can_assign = False
                            break

                    if can_assign == True:
                        VPES[current_row_at_column[col]][col] = VPE(abs_var)  # 변수 할당
                        assigned_abs_vars.add(abs_var)  # 할당된 변수로 등록
                        current_row_at_column[col] += 1

                    if current_row_at_column[col] >= self.num_rows:  # 이 column에 모두 할당했으면 다음 column으로
                        break
            
            if len(assigned_abs_vars) == self.num_vars:
                # print('deploy_search_iter', deploy_search_iter)
                VPES_index = np.array([[vpe.MY_VAR if vpe else None for vpe in row] for row in VPES])
                break
        return VPES, VPES_index, assigned_abs_vars

    def print_vpe_index(self):
        """
        VPE 배열을 2D 행렬로 출력하는 메서드
        """
        print('VPE Array:')
        for row_index, row in enumerate(self.VPE_index):
            formatted_row = '| '.join(f'v{row_index * len(row) + col_index:<2} {str(elem) if elem is not None else "No":>2}' 
                                        for col_index, elem in enumerate(row))
            print(formatted_row)
        print('\nnum of assigned_abs_vars:', len(self.assigned_abs_vars))
        

    def check_uncorrelated_columns(self, VPE_index, correlation_matrix):
        # VPE_index: VPE 배열의 인덱스를 나타냄. 각 값은 변수 번호를 의미함.
        # correlation_matrix: 변수들 간의 상관관계를 나타내는 행렬. (num_vars x num_vars 크기의 행렬)

        num_rows = len(VPE_index)
        # VPE_index의 행(row) 수를 얻음. 즉, VPE 배열에서 몇 개의 행이 있는지 확인.

        num_cols = len(VPE_index[0])
        # VPE_index의 열(column) 수를 얻음. 즉, 각 열에 몇 개의 변수가 배치되었는지 확인.

        for col in range(num_cols):
            # 각 열(column)을 순회함. 이때 col은 현재 처리 중인 열의 인덱스를 나타냄.

            col_values = [VPE_index[row][col] for row in range(num_rows) if VPE_index[row][col] is not None]
            # 현재 열에서 변수가 할당된 값을 모두 수집. None이 아닌 값만 선택하여 리스트로 만듦.
            # 즉, 해당 열에 배치된 변수 번호들을 수집하는 과정.

            for i in range(len(col_values)):
                # 수집한 변수 번호 리스트에서 첫 번째 변수부터 시작하여 다른 변수들과 비교하기 위해 반복문 시작.
                # i는 현재 비교하고자 하는 첫 번째 변수의 인덱스를 나타냄.

                for j in range(i + 1, len(col_values)):
                    # i 이후에 있는 변수들과 비교하기 위한 반복문.
                    # j는 두 번째로 비교할 변수의 인덱스를 나타냄.

                    var1 = col_values[i]
                    # 첫 번째 변수의 값(var1)을 col_values 리스트에서 가져옴.

                    var2 = col_values[j]
                    # 두 번째 변수의 값(var2)을 col_values 리스트에서 가져옴.

                    # 상관된 경우를 찾음
                    if correlation_matrix[var1 - 1][var2 - 1] == 1:
                        # correlation_matrix에서 var1과 var2가 상관된 경우를 확인.
                        # 변수 번호는 1부터 시작하므로, 0-based 인덱스에 맞추기 위해 -1 처리.
                        # 상관된 경우, correlation_matrix에서 해당 값이 1임을 이용하여 상관된 변수를 확인함.

                        # print(f"Error: Column {col+1} contains correlated variables {var1} and {var2}.")
                        # 상관된 변수를 발견한 경우, 그 변수가 어느 column에 있는지와 함께 에러 메시지를 출력.

                        return False
                        # 상관된 변수가 발견되면 더 이상 검사를 진행하지 않고 False를 반환하여 함수 종료.

        # 모든 열에 대해 상관된 변수가 없을 경우, 오류가 없음을 나타냄.
        # print("All columns are internally uncorrelated.\n")
        return True
        # 모든 열에서 상관된 변수가 없는 경우, True를 반환함.

    def add_clause_to_vpe(self):
        for row in range(self.num_rows):
            for col in range(self.num_cols):
                if self.VPES[row][col] != None:
                    self.VPES[row][col].add_clause(self.clauses)

    
    def variable_state_initialize(self):
        var_state_array = [random.choice([0, 1]) for _ in range(self.num_vars)]
        for row in range(self.num_rows):
            for col in range(self.num_cols):
                if self.VPES[row][col] != None:
                    self.VPES[row][col].MY_STATE = var_state_array[self.VPES[row][col].MY_VAR - 1]
        return var_state_array
    


    def satisfy_check(self, var_state_array, clauses):
        """
        절을 만족하는지 확인하는 메서드
        :param var_state_array: 변수들의 상태를 나타내는 배열 (0 또는 1로 구성된 리스트)
        :param clauses: 각 절을 표현하는 리스트의 리스트 (변수는 양수 또는 음수로 표현됨)
        :return: num_unsat_clause, unsat_clauses_list (만족되지 않은 절의 개수와 그 절들의 리스트)
        """
        unsat_clauses_list = []

        # 각 절에 대해 만족 여부를 확인
        for clause in clauses:
            satisfied = False  # 해당 절이 만족되는지 여부를 나타내는 변수

            for var in clause:
                abs_var = abs(var) - 1  # 변수의 절대값으로 인덱스를 맞춤 (0부터 시작)
                var_state = var_state_array[abs_var]  # 해당 변수의 상태

                if (var > 0 and var_state == 1) or (var < 0 and var_state == 0):
                    satisfied = True  # 하나라도 만족하는 변수가 있으면 절이 만족됨
                    break

            if satisfied == False:
                unsat_clauses_list.append(clause)  # 만족되지 않은 절을 리스트에 추가

        num_unsat_clause = len(unsat_clauses_list)  # 만족되지 않은 절의 개수

        return num_unsat_clause, unsat_clauses_list
    
    def print_var_state_array(self):
        print('print self.var_state_array')
        for row in range(self.num_rows):
            for col in range(self.num_cols):
                if self.VPE_index[row][col] != None:
                    print(f'v{self.VPE_index[row][col]:<2}={self.var_state_array[self.VPE_index[row][col]-1]}', end=' | ')
                else: 
                    # print(f'vNo=None', end=' | ')
                    print(f'None ', end=' | ')
            print()
        print()

    def run(self, max_iter, iter_verbose):
        """
        VPE 배열을 실행하는 메서드
        :param max_iter: 최대 반복 횟수 (기본값: 100)
        """
        num_unsat_clause, unsat_clauses_list = self.satisfy_check(self.var_state_array, self.clauses)
        if iter_verbose == True:
            print(f'iteration {0:>4} | num_unsat_clause: {num_unsat_clause}')
        for i in range(max_iter):
            # self.print_var_state_array()

            # row-wise parellel processing
            for col in range(self.num_cols):
                state_temp_at_now_col = [None for _ in range(self.num_rows)]
                for row in range(self.num_rows):
                    if self.VPES[row][col] != None:
                        state_temp_at_now_col[row] = self.VPES[row][col].process(self.var_state_array)

                    # updated state update!!
                for row in range(self.num_rows):
                    # print(state_temp_at_now_col)
                    if state_temp_at_now_col[row] != None:
                        self.var_state_array[self.VPES[row][col].MY_VAR - 1] = state_temp_at_now_col[row]

            num_unsat_clause, unsat_clauses_list = self.satisfy_check(self.var_state_array, self.clauses)
            if iter_verbose == True:
                print(f'iteration {i+1:>4} | num_unsat_clause: {num_unsat_clause}')


            if num_unsat_clause == 0:
                if iter_verbose == True:
                    print(f'SATISFIED!!!! iter{i+1}')
                return i+1
            
            # # iteration 마다 실패하면 다시 리셋하기
            # self.var_state_array = self.variable_state_initialize()


        else:
            if iter_verbose == True:
                print(f"NOT SATISFIED! num_unsat_clause: {num_unsat_clause}, unsat_clauses_list: {unsat_clauses_list}, iteration: {i+1}")
            return -num_unsat_clause



In [8]:

def HYPER_RUN(HYPER_MAX_ITER=1, hyper_seed=42, num_rows=5, num_cols=18, file_path='/data2/sat/uf50-218/uf50-03.cnf', max_iter=100, iter_verbose=False):
    clauses, num_vars, num_clauses = read_cnf_file(file_path)

    SUCCESS_OR_NOT = 0
    for hyper_iter in range(HYPER_MAX_ITER):
        # print(f'hyper_iter: {hyper_iter+1}', end = '  ')
        my_seed = hyper_seed * (hyper_iter+1)
        vpe_array = VPE_ARRAY(my_seed=my_seed, num_rows=num_rows, num_cols=num_cols, clauses=clauses, num_vars=num_vars, num_clauses=num_clauses)  # 5x12 VPE 배열 생성
        success_iter = vpe_array.run(max_iter, iter_verbose)  # VPE processing 실행
        if success_iter > 0:
            print(f'SATISFIED!!!! success_iter: {success_iter}')
            SUCCESS_OR_NOT = 1
            break
        # print(f'not satisfied. num_unsat_clause: {-success_iter}')
        # print()
    else:
        print(f'HYPER_MAX_ITER: {HYPER_MAX_ITER}, NOT SATISFIED!  last num_unsat_clause: {-success_iter}')

    return SUCCESS_OR_NOT

    # vpe_array.print_vpe_index()  # VPE 배열 출력

In [9]:

HYPER_MAX_ITER = 1

# file_path = 'example_uf50-03.cnf'
file_path = '/data2/sat/uf50-218/uf50-02.cnf'
hyper_seed = 42
num_rows = 5
num_cols = 12
max_iter = 100
iter_verbose = False

# HYPER_RUN(HYPER_MAX_ITER=HYPER_MAX_ITER, hyper_seed=hyper_seed, num_rows=num_rows, num_cols=num_cols, file_path=file_path, max_iter=max_iter, iter_verbose=iter_verbose)

In [10]:


def FULL_FILE_RUN(HYPER_MAX_ITER=1, hyper_seed=42, num_rows=5, num_cols=12, BASE_PATH='/data2/sat/uf50-218/', max_iter=10, iter_verbose=False):
    
    files = count_files_in_directory(BASE_PATH)
    assert files != None, '파일 못 불러오고 있따.'

    total_file = len(files)
    num_success = 0
    current_file = 0
    for file in files:
        file_path = os.path.join(BASE_PATH, file)
        print(f'\nfile: {file}, {current_file+1}/{total_file}')
        SUCCESS_OR_NOT = HYPER_RUN(HYPER_MAX_ITER=HYPER_MAX_ITER, hyper_seed=hyper_seed, num_rows=num_rows, num_cols=num_cols, file_path=file_path, max_iter=max_iter, iter_verbose=iter_verbose)
        num_success += SUCCESS_OR_NOT
        current_file += 1
        print(f'Current Solvability: {100*num_success/current_file:.2f}%')
    assert current_file == total_file
    print(f'\n\nFinal Solvability: {100*num_success/total_file:.2f}%')

In [11]:
    
HYPER_MAX_ITER = 1

# BASE_PATH = '/data2/sat/uf50-218/uf50-02.cnf'
BASE_PATH = '/data2/sat/uf50-218/'
# BASE_PATH = '/data2/sat/uf20-91'
hyper_seed = 42
num_rows = 5
num_cols = 12
max_iter = 10
iter_verbose = False

FULL_FILE_RUN(HYPER_MAX_ITER=HYPER_MAX_ITER, 
                hyper_seed=hyper_seed, 
                num_rows=num_rows, 
                num_cols=num_cols, 
                BASE_PATH=BASE_PATH, 
                max_iter=max_iter, 
                iter_verbose=iter_verbose)


file: uf50-0643.cnf, 1/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 3
Current Solvability: 0.00%

file: uf50-065.cnf, 2/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 1
Current Solvability: 0.00%

file: uf50-0509.cnf, 3/1000


SATISFIED!!!! success_iter: 5
Current Solvability: 33.33%

file: uf50-0923.cnf, 4/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 3
Current Solvability: 25.00%

file: uf50-0448.cnf, 5/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 2
Current Solvability: 20.00%

file: uf50-04.cnf, 6/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 1
Current Solvability: 16.67%

file: uf50-0484.cnf, 7/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 1
Current Solvability: 14.29%

file: uf50-0616.cnf, 8/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 5
Current Solvability: 12.50%

file: uf50-0386.cnf, 9/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 1
Current Solvability: 11.11%

file: uf50-0357.cnf, 10/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 2
Current Solvability: 10.00%

file: uf50-0216.cnf, 11/1000
HYPER_MAX_ITER: 1, NOT SATISFIED!  last num_unsat_clause: 2
Current Solvability: 9.09%

file: 