In [3]:
# standard libraries
from __future__ import print_function
import os
import re
import sys
from pathlib import Path
from enum import Enum
import json

# pysat library
from pysat.solvers import Solver, SolverNames
from pysat.formula import CNF, WCNF
from pysat.examples.fm import FM
from pysat.examples.musx import MUSX

# or-tools library
from ortools.linear_solver import pywraplp

# numpy
#import numpy as np

# configs
import pprint

# utilities
ppprint = pprint.PrettyPrinter(indent=4).pprint
def debug(info, verbose=True):
    if verbose:
        print(info)

def debug_ppprint(info, verbose=False):
    if verbose:
        print(json.dumps(info, indent=4))

# Pysat pre-processing

In [4]:
folderPaths={
    'easyInstances':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/easy_instances/',
    'instance':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/instance/',
    'aaaiInstances':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/hard_instances/aaai_instances',
    'isingModel':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/hard_instances/Generalized_Ising_model',
    'maxSat':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/hard_instances/maxsat_staffscheduling_instances',
    'circuitDebugging':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/hard_instances/ms_industrial/circuit-debugging-problems',
    'safarpour':'/home/crunchmonster/Documents/VUB/02_Research/02_Notes/02_OMUS/B_data/hard_instances/ms_industrial/sean-safarpour'
}

class Difficulty(Enum):
    EASY = 1
    MEDIUM = 2
    HARD = 3
    ALL = 0

def instanceDiff(fileSize):
    
    mb = 10* 1024 ** 1
    mediumUb = 10 * mb
    if fileSize < mb:
        return Difficulty.EASY
    elif fileSize < mediumUb:
        return Difficulty.MEDIUM
    else:
        return Difficulty.HARD

    
def allInstances(difficulty, cnfExtensions=['.cnf', '.wcnf']):
    instances = []
    for folder in folderPaths:
        instanceFolder = Path(folderPaths[folder])
        instances += [x for x in instanceFolder.iterdir() if x.is_file() and x.suffix in cnfExtensions]
    
    if difficulty is Difficulty.ALL:
        return instances

    sizeFilteredInstances = list(filter(lambda x: instanceDiff(x.stat().st_size) is difficulty, instances))

    return sizeFilteredInstances
    
def getDataPaths(cnfExtensions=['.cnf', '.wcnf'], difficulty= Difficulty.EASY):
    
    if difficulty not in Difficulty:
        ppprint('Difficulty must be in ' +str(difficulties) + ' defaulting to easy.')
        difficulty = Difficulty.EASY
    
    instances = allInstances(difficulty, cnfExtensions)

    return instances

def cnfInstances(difficulty=Difficulty.EASY):
    instances = [instance for instance in getDataPaths(cnfExtensions=['.cnf'], difficulty= difficulty)] 
    return instances

def wcnfInstances(difficulty=Difficulty.EASY):
    instances = [instance for instance in getDataPaths(cnfExtensions=['.wcnf'], difficulty= difficulty)] 
    return instances

def CNFisUnsat(instance, verbose=True):
    with Solver() as s:
        cnf = CNF(from_file=instance)
        added = s.append_formula(cnf.clauses, no_return=False)
        solved = s.solve()
    return solved

def WCNFisUnsat(instance, verbose=True):
    with Solver(name = SolverNames.minisat22[0]) as s:
        wcnf = WCNF(from_file=instance)
        added = s.append_formula(wcnf.clauses, no_return=False)
        solved = s.solve()
    return solved

def cnfUnsatInstances():
    return [instance  for instance in cnfInstances() if CNFisUnsat(instance)]

# Test extensions on simple cases

In [8]:
def test_flatten_set():
    print("Testing flatten set")
    assert sorted(flatten_set([[1, 2, 3],[4, 1 ,3]])) == [1,2,3, 4], "Should be 6"

def test_extension1():
    input_cnf_clauses = [
        [1],
        [2, 3, 5],
        [3, 6, 7],
        [-4, -1],
        [-4],
        [-4, -8]
    ]
    input_F_prime = {0, 4}
    
    output_F_prime = {0, 3, 4, 5}
    
    ext1_F_prime = extension1(input_cnf_clauses, input_F_prime)
    
    assert sorted(output_F_prime) == sorted(ext1_F_prime), "Should be equal"

def test_extension2():
    input_cnf_clauses = [
        [1],
        [2, 3, 5],
        [3, 6, 7, -8],
        [-4, -1],
        [-4],
        [-4, -8]
    ]
    input_F_prime = {0, 4}
    
    expected_output = {0, 2, 3, 4, 5}
    
    ext1_F_prime = extension2(input_cnf_clauses, input_F_prime)
    
    assert sorted(expected_output) == sorted(ext1_F_prime), f"{ext1_F_prime} == {expected_output}"
    
def run_tests():
    test_flatten_set()
    test_extension1()
    test_extension2()
    print("Everything passed")

# MIP model

In [5]:
def print_omus(H, h, F_prime, C, weights, clauses, write_file=True):
    d = {
        'H':H,
        'h':h,
        'F_prime':F_prime,
        'C':C, 
        #'weights': weights,
        #'clauses': clauses
    }
    
    ppprint(d)

def complement(F, F_prime):
    return set([i for i in range(len(F))]) - set(F_prime)

# def unique_clauses_hs(H):
#    return flatten_set(H)

def flatten_set(H):
    return set([val for sublist in H for val in sublist])

def f(x):
    # weighted based on the number of literals inside the clause
    # return 1
    return len(x)

def cnf_weights(cnf, weight = f):
    return [weight(clause) for clause in cnf.clauses]

def clauses_weights(clauses, weight = f):
    return [weight(clause) for clause in clauses]

   
def create_data_model(H, weights):
    """Stores the data for the problem."""
    
    # indices of clauses used
    indices_H = sorted(flatten_set(H))
    
    n_vars_H = len(indices_H)
    n_constraints = len(H)
    
    data = {}
    data['indices'] = indices_H
    data['num_constraints'] = n_constraints
    data['bounds'] = [1] * n_constraints
    data['num_vars'] = n_vars_H 
    data['obj_coeffs'] = {index:weights[index] for index in indices_H}

    # constraint coefficients hij = 1 if variable x_j is in set h_i
    constraint_coeffs = [[0 for _ in range(n_vars_H)] for _ in range(n_constraints)] 

    for j in range(n_constraints):
        hj = H[j]
        for i in range(n_vars_H):
            if indices_H[i] in hj:
                constraint_coeffs[j][i] = 1

    data['constraint_coefficients'] = constraint_coeffs
    
    # matching clause indices with position in list of clause indices 
    # ex: {3 : 0, 7: 1, ....} clause 3 position 0, clause 7 position 1, ...
    data['matching_table'] = {idx : i for i, idx in enumerate(indices_H) }
    return data

def checkSatClauses(clauses, F_prime):
    c = [clauses[i] for i in F_prime]
    with Solver() as s:
        added = s.append_formula(c, no_return=False)
        solved = s.solve()
    return solved    

In [6]:
def smus_CNF():
    l = 1
    m = 2
    n = 3
    p = 4
    s = 5
    t = 6
    cnf = CNF()
    cnf.append([-s])    # c1: ¬s
    cnf.append([s, -p]) # c2: s or ¬p
    cnf.append([p])     # c3: p
    cnf.append([-p, m]) # c4: ¬p or m
    cnf.append([-m, n]) # c5: ¬m or n
    cnf.append([-n])    # c6: ¬n
    cnf.append([-m, l]) # c7 ¬m or l
    cnf.append([-l])    # c8 ¬l
    return cnf

# Optimal Hitting set MIP implementation

In [7]:
def optimalHittingSet(H, weights):
    data = create_data_model(H, weights)
    # [START solver]
    # Create the mip solver with the CBC backend.
    solver = pywraplp.Solver('OptimalHittingSet', 
                             pywraplp.Solver.BOP_INTEGER_PROGRAMMING)
    # [END solver]
    
    # [START variables]
    #infinity = solver.infinity()
    x = {}
    for j in data['indices']:
        x[j] = solver.BoolVar('x[%i]' % j)
    # [END variables]
    
    # [START constraints]
    for i in range(data['num_constraints']):
        # for all i in {1.. |H|}: sum x[j] * hij >= 1
        constraint_expr = [data['constraint_coefficients'][i][j] * x[idx] for j, idx in enumerate(data['indices'])]
        solver.Add(sum(constraint_expr) >= data['bounds'][i])
    # [END constraints]
    
    # [START objective]
    # Maximize sum w[i] * x[i]
    objective = solver.Objective()
    for idx in data['indices']:
        objective.SetCoefficient(x[idx], data['obj_coeffs'][idx])
    objective.SetMinimization()
    # [END objective]
    
    # Solve the problem and print the solution.
    # [START print_solution]
    status = solver.Solve()

    if status == pywraplp.Solver.OPTIMAL:
        #print('Objective value =', solver.Objective().Value())
        return [x[j].solution_value() for j in data['indices']]
    else:
        print('The problem does not have an optimal solution.', status)
        return []
    # [END print_solution]

In [39]:
def checkConflict(literals):
    for l in literals:
        assert -l not in literals, f"conflicting literals are present {l}, {-l}"

def default_extension(cnf_clauses, F_prime):
    return F_prime

def extension1(cnf_clauses, F_prime):
    """
    
        Add all clauses that are true based on the assignment by the model of Fprime
        :param cnf_clauses: a collection of clauses (list of literals).
        :param F_prime: hitting set : a list of clauses.
        
        :type cnf_clauses: iterable(iterable(int))
        :type F_prime: iterable(int)    
        
    """
    # new set of validated literals
    new_F_prime = set(F_prime)
    
    # all literals (clauses with 1 element) validated in current hitting set
    validated_literals = {cnf_clauses[index][0] for index in new_F_prime if len(cnf_clauses[index]) == 1}
    
    # remaining clauses to check for validation
    remaining_clauses = {i for i in range(len(cnf_clauses))} - F_prime
    
    for c in remaining_clauses:
        clause = cnf_clauses[c]
        for literal in validated_literals:
            if literal in clause:
                new_F_prime.add(c)

    #validated_variables = flatten_set(validated_clauses)
    return new_F_prime

def find_best_literal(clauses, remaining_clauses, conflictual_literals):
    literal_validatable_clauses = {l : 0 for l in conflictual_literals}
    
    validatable_clauses = {}
    for c in remaining_clauses:
        clause = clauses[c]
        
        for literal in clause:
            if literal in conflictual_literals:
                validatable_clauses.add(c)
    
    for c in validatable_clauses:
        clause = clauses[c]
        for literal in clause:
            if literal in conflictual_literals:
                literal_validatable_clauses[literal] += 1
    
    # https://stackoverflow.com/questions/268272/getting-key-with-maximum-value-in-dictionary
    best_literal = max(literal_validatable_clauses.iterkeys(), key=(lambda key: literal_validatable_clauses[key]))
    return best_literal
    
def extension2(cnf_clauses, F_prime, random_assignment = True):
    """
    
            Step 1 : Compute clauses with unique literals
            
            Step 2 :
                2.1 Compute validated clauses
                2.2 Compute remaining clauses
            
            Step 3:
                3.1 Compute all literal in validated clauses
                3.2 remove already validated literals from unique literal clauses
                3.3 remove negation of already validated literals from unique clauses
            
            Step 4:
                4.1 Seperate all remaining literals in validated clauses in 2 parts:
                    4.1.1 literals without negation of literal present
                    4.1.2. literals with negation present
            
            Step 5: Add all remaining clauses validated by assignement literals w/o negation
            
            Step 6:
                6.1a choose first literal from all literals with negation
                                or
                6.1b choose literal with best clause propagation
                6.2 Add all remaining clauses validated by assignement of literal                 
                
    """
    # TODO: 
    # - add while loop to propagate as long as possible as long as possible
    #   whenever len(other_literals ) > 0 
    # 
    # - exploit validatable clauses
    
    new_F_prime = set(F_prime)

    # Step 1 : clauses with unique literals
    # clauses with 1 literal 
    unique_literal_validated_clauses = {index for index in new_F_prime if len(cnf_clauses[index]) == 1}
    
    # literals validated in clauses with 1 literal
    validated_literals = {cnf_clauses[index][0] for index in unique_literal_validated_clauses}
    
    # non-unique clauses
    remaining_clauses = {i for i in range(len(cnf_clauses))} - unique_literal_validated_clauses
    
    # Step 2 : clauses with unique literals
    # all clauses satisfied by current single literal assignments of Fprime
    satisfied_clauses = set()
    
    # for every literal in validated literal check for any clause satisfied 
    for literal in validated_literals:
        for c in remaining_clauses:
            clause = cnf_clauses[c]
            if literal in clause:
                satisfied_clauses.add(c)
                new_F_prime.add(c)
    
    # remove unique literal clauses already validated
    satisfied_clauses -= unique_literal_validated_clauses

    remaining_clauses -= satisfied_clauses
    
    # remaining validated clauses in F prime
    assert all([True if -i not in validated_literals else False for i in validated_literals]), "literal conflict"
        
    # remaining literals to assign
    other_literals = flatten_set([cnf_clauses[index] for index in satisfied_clauses])

    # remove already validated literals
    other_literals -= validated_literals
    
    # remove negated already validated literals
    other_literals -= {-i for i in validated_literals}
    
    # filtered literals with negated literal also present 
    conflict_free_literals = {i for i in other_literals if -i not in other_literals}
    
    # remove all clauses validated by conflict free literals
    for literal in conflict_free_literals:
        clauses_to_remove = set()
        for c in remaining_clauses:
            clause = cnf_clauses[c]
            if literal in clause:
                clauses_to_remove.add(c)
                new_F_prime.add(c)
        remaining_clauses -= clauses_to_remove

    validated_literals |= conflict_free_literals

    other_literals -= conflict_free_literals
    # remaining conflictual literals to validate
    conflictual_literals = set(other_literals)    
   
    # check if only conflictual literals are present in conflictual literals
    assert all([True if -i in conflictual_literals else False for i in conflictual_literals]), "conflictual literals error"
    assert len(conflictual_literals) % 2 == 0, "check remaining literals are present in pairs"
    
    # for every literal, remove its negation and 
    while len(conflictual_literals) > 0:
        # randomly assigns truthness value
        if random_assignment:
            literal = conflictual_literals[0]
        else: 
            literal = find_best_literal(clauses, remaining_clauses, conflictual_literals)
            
        # SANITY CHECK : add to validated literals
        assert literal not in validated_literals, "literal already present"
        assert -literal not in validated_literals, "negation of literal already present, conflict !!!"
        validated_literals.add(literal)

        # remove literal and its negation
        conflictual_literals.remove(literal)
        conflictual_literals.remove(-literal)

        # remove validated clauses
        clauses_to_remove = set()
        
        for c in remaining_clauses:
            clause = cnf_clauses[c]
            if literal in clause:
                clauses_to_remove.add(c)
                new_F_prime.add(c)
        remaining_clauses -= clauses_to_remove
    print("validated literals:", validated_literals)   
    return new_F_prime

def extension2silly(cnf_clauses, F_prime, random_assignment = True):
    """
    
            Repeatedly apply extension 2 until there is no more progress
            Propagate as much as possible
                
    """
    # TODO: 
    # - add while loop to propagate as long as possible as long as possible
    #   whenever len(other_literals ) > 0 
    # 
    # - exploit validatable clauses
    
    clauses_added = True
    new_F_prime = set(F_prime)

    while(clauses_added):
        ext2_F_prime = extension2(cnf_clauses, new_F_prime, random_assignment)
        ppprint({"new_F_prime" : new_F_prime,
                 "ext2_F_prime":ext2_F_prime,
                 "cnf_clauses": cnf_clauses
                  })
        clauses_added = ext2_F_prime > new_F_prime
        new_F_prime = set(ext2_F_prime)
        
    return new_F_prime

def extension3(cnf_clauses, F_prime, random_assignment = True):

    """
    
        Repeatedly apply extension 2 until there is no more progress
        Propagate as much as possible
        
    """

    new_F_prime = set(F_prime)
    
    # Step 1 : clauses with unique literals
    # clauses with 1 literal 
    unique_literal_validated_clauses = {index for index in new_F_prime if len(cnf_clauses[index]) == 1}
    unique_literals = {cnf_clauses[index][0] for index in unique_literal_validated_clauses}
    
    # all cl
    all_validated_clauses = set()
    
    for literal in unique_literals:
        
    
    
    remaining_validated_clauses = new_F_prime  - 
    
    return new_F_prime


input_cnf_clauses = [
    [1],
    [2, 3, 5],
    [3, 6, 7, -8],
    [3, 6],
    [5, 9],
    [2, -7],
    [-4, -1],
    [-4],
    [-4, -8]
]
input_F_prime = {0, 7}

expected_output = {0, 2, 3, 4, 5}

ext1_F_prime = extension3(input_cnf_clauses, input_F_prime)

def extension4(cnf_clauses, F_prime):
    return F_prime
    
def grow(clauses, F_prime, extensions = None):
    """
    
        Procedure to efficiently grow the list clauses in ``F_prime``. The complement of F_prime is a correction
        set. 
    
        :param cnf_clauses: a collection of clauses (list of literals).
        :param F_prime: hitting set : a list of clauses.
        :param extensions: list of extensions activated
        
        The ``extensions`` argument is a list of extensions on the default grow procedure to optimize
        the building of the minimum correction set. 


        :type cnf_clauses: iterable(iterable(int))
        :type F_prime: iterable(int)
        :type extensions: iterable(int)

        Extension 1 : 
            
            add all clauses that are true based on the assignment by the model of Fprime

        Extension 2 : 
        
            for all variables not in variables assigned by model of Fprime
            give random values => manually check wether any clause is satisfied by this and
            add it to Fprime.

        Extension 3: 
            
            greedy

        Extension 4: 
            
            Maxsat
    """
    if not extensions: 
        return complement(clauses, F_prime)
    
    exts = {
        0 : default_extension,
        1 : extension1,
        2 : extension2,
        3 : extension2optimized,
        4 : extension3,
        5 : extension4
    }
    
    assert all([True if ext in exts else False for ext in extensions]), "extension doest not exist"
    
    for ext in extensions:
        F_prime = exts[ext](clauses, F_prime)
        
    return complement(clauses, F_prime)


validated literals: {-8, 1, -4}
{   'cnf_clauses': [   [1],
                       [2, 3, 5],
                       [3, 6, 7, -8],
                       [3, 6],
                       [5, 9],
                       [2, -7],
                       [-4, -1],
                       [-4],
                       [-4, -8]],
    'ext2_F_prime': {0, 2, 6, 7, 8},
    'new_F_prime': {0, 7}}
validated literals: {-8, 1, -4}
{   'cnf_clauses': [   [1],
                       [2, 3, 5],
                       [3, 6, 7, -8],
                       [3, 6],
                       [5, 9],
                       [2, -7],
                       [-4, -1],
                       [-4],
                       [-4, -8]],
    'ext2_F_prime': {0, 2, 6, 7, 8},
    'new_F_prime': {0, 2, 6, 7, 8}}


In [37]:
run_tests()

Testing flatten set
Everything passed


# OMUS algorithm

In [23]:
def omus(cnf: CNF):    
    clauses = cnf.clauses
    weights = clauses_weights(clauses)
    H = [] # the complement of a max-sat call

    while(True):
        # compute optimal hitting set
        h = optimalHittingSet(H, weights)
        
        # set with all unique clauses from hitting set
        F_prime = {i for i, hi in enumerate(h) if hi > 0}

        # check satisfiability of clauses
        if not checkSatClauses(clauses, F_prime):
            return F_prime
        
        # add all clauses ny building complement
        C = grow(clauses, F_prime, extensions=[3])
        
        if C in H:
            raise "MCS is already in H'"
        
        H.append(C)
        # printing        

In [24]:
# Toy example
smus_cnf = smus_CNF()
omus(smus_cnf)

{0, 1, 2}

In [13]:
# CNf Files 
# cnfs = sorted(cnfUnsatInstances(), key=lambda item: item.stat().st_size)
# c = cnfs[1]
# cnf = CNF(from_file=c)
# print(cnf.clauses)
# omus(cnf)
#for c in cnfs:
#    cnf = CNF(from_file=c)
#    o = omus(cnf)
#    print(c, c.stat().st_size)
#    print(cnf.clauses)

# useless to call mus on cnf files, only on WCNF
#for cnf_name, cnf_dict in cnfs.items():
#    wcnf = CNF(from_file = cnf_dict['path']).weighted()
#    with MUSX(wcnf, verbosity=1) as musx:
#        print(musx.compute())
#wncf = WCNF(from_file = cnf1['path'])