<a href="https://colab.research.google.com/github/cerdwin/AIC-IBL/blob/main/Project1_student_ipynb_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Initialization
Install and import required libraries

In [3]:
!pip install sympy
from sympy import Symbol, symbols
from sympy.logic import And, Or, Not, Implies, Equivalent
from sympy.logic.inference import satisfiable
import random



The following class defines the environment (a.k.a. "oracle") with which the agent interacts.

In [4]:
class SimpleEnvironment:
  def __init__(self, target_concept, variables):
    self.variables = variables
    self._target_concept = target_concept
    self._last_interpretation = {variable: False for variable in variables} # we start with all zeros
    self._last_observation = And() # empty conjunction a.k.a. True
    assert target_concept.free_symbols <= variables

  @staticmethod
  def interpretation_to_conjunction(interpretation):
    """
    Converts a propositional dict-represented interpretation (i.e. the truth-value
    assignment to propositional variables) to the conjunction for which it is the
    unique model (i.e. a satisfying assignment).
    """
    literals = (
      variable if positive else Not(variable)
      for variable, positive
      in interpretation.items()
      )
    return And(*literals)

  def _random_variable(self):
    return random.choice(tuple(self.variables))

  def _next_observation(self):
    """
    Generates another interpretation by randomly flipping one bit in the 
    previous one. Note that is certainly NOT i.i.d. sampling.
    """
    variable = self._random_variable()          # Which one to flip?
    self._last_interpretation[variable] ^= True # Only used internally

    observation = self._last_interpretation.copy()
    if random.random() < 0.5:  # Incompleteneness of information
      del observation[self._random_variable()]   
    self._last_observation = self.interpretation_to_conjunction(observation)
    return self._last_observation

  def action(self, a):
    """
    Accepts a prediction of the class of the previous sample.
    Returns the corrsponding reward (-1 or 0) and another sample.
    """
    # Check if the presented observation is a model of the concept
    # this code works for arbitrarily complicated concepts.
    correct_answer = not satisfiable(
        Not(self._target_concept) & self._last_observation
    )

    reward = 0 if a == correct_answer else -1
    observation = self._next_observation()

    return reward, observation

The following class defines the generalizing agent for learning propositional conjunctions. It assumes the agent-environment interaction framework where
the environment is provided by the class above.

In [162]:
from sympy.core.operations import AssocOp
from sympy.logic.boolalg import BooleanAtom
def get_literals(formula):
  """Safely extracts all literals from a conjunction or disjunction"""
  if isinstance(formula, AssocOp): # Clauses, terms
    return set(formula.args)
  elif isinstance(formula, Not) or isinstance(formula, Symbol): # literals
    return {formula}
  elif isinstance(formula, bool) or isinstance(formula, BooleanAtom): # Includes True, False
    return set()
  else:
    raise TypeError("invalid input to get_literals")


class GeneralizingAgent:
  def __init__(self, environment):
    self.environment = environment
    self.mistakes = 0
    self.initLearning()

  def initLearning(self):
    # Initialize to the "most specific" hypothesis containing all literals.
    # Using postivie literals only restricts the concept space to monotone 
    # conjunctions only.
    # Simply including negative literals here would generalize it to non-monotone conjunctions.
    self.hypothesis =  And(*environment.variables, *map(Not, environment.variables))

    self.last_observation = self.hypothesis # Just a dummy value for first iteration
    self.observation = None

  @staticmethod
  def lgg(c1, c2):
    """
    Computes the least general generalization of two propositional conjunctions.
    """
    intersection = get_literals(c1) & get_literals(c2)
    return And(*intersection)

  def perform_action(self, a):
    """
    Sends one action to environment. Adjusts hypothesis on negative reward.
    """
    reward, observation = self.environment.action(a)
    if reward == -1: # Misclassified
      self.hypothesis = self.lgg(self.last_observation, self.hypothesis)
      self.mistakes += 1
    return observation

  def classify(self, observation):
    """
    Make a prediction given the observation and current hypothesis by
    determining the subsumption relation.
    """
    return get_literals(self.hypothesis) <= get_literals(observation)
    # Note: This is the same as checking whether the observation models the hypothesis
    # The following would be logically equivalent, but computationally an overkill:
    # return not satisfiable(Not(self.hypothesis) & observation)

  def run_experiment(self, k):
    """ Iterate the agent-environment interaction k times """
    prediction = False # initialize the first prediction to false
    for _ in range(k): # for the number of iterations
      observation = self.perform_action(prediction) # we get X, i.e. the observation by performing an action based on previous hypothesis which gave us a prediction (in first iteration initialised to False)
      prediction = self.classify(observation)
      self.last_observation = observation

  def get_hypothesis(self):
    """ Just a simple getter here, more interesting implementation later on. """
    return self.hypothesis

Simple example where the agent learns a monotone conjunction. The concept hiden in the environment is $p_1 \land p_3$ and the observations are NOT i.i.d. In this case, 20 observations is usually more than enough for the agent to learn the concept exactly.

In [163]:
p_1, p_2, p_3 = symbols(["p_1", "p_2", "p_3"])
variables = {p_1, p_2, p_3}
target_concept = p_1 & p_3
environment = SimpleEnvironment(target_concept, variables)
agent = GeneralizingAgent(environment)
agent.run_experiment(100)

agent.get_hypothesis()

p_1 & p_3

The number of mistakes the agent made:

In [164]:
agent.mistakes

2

Looks good! But what if the concept is not equivalent to a monotone conjunction? In the following example the correct concept is:

In [199]:
target_concept = (p_2 << p_1) & (p_2 << p_3)
target_concept

(Implies(p_1, p_2)) & (Implies(p_3, p_2))

In [200]:
environment = SimpleEnvironment(target_concept, variables)
agent = GeneralizingAgent(environment)
agent.run_experiment(100)

agent.get_hypothesis()
print("hypothesis type is:", type(agent.get_hypothesis()))
print("hypothesis itself is:", agent.get_hypothesis())

hypothesis type is: <class 'sympy.logic.boolalg.BooleanTrue'>
hypothesis itself is: True


In [201]:
agent.mistakes

51

This agent did not sucessfully learn the concept and most likely also made way too many classification mistakes. The reason is that it is not searching a concept space expressive enough.

# YOUR TASK:

Adapt the agent defined above so that it is capable of learning any s-CNF concept. It will be tested on a set of concepts we do not provide to you and possibly with a different environment (with the same API). The rules are:


*   Do not attempt to access the concept stored in the oracle directly. Rely on the observations provided by the API only. 
*   The agent should not make more classification mistakes than necessary (i.e. it should learn the concept as good as possible given the environment properties.)
*   Only store the last observation in the memory, same as in the implementation above.
*   Your implementation should not excess the necessary algorithmic complexity by too much.
*   You MAY implement any auxiliary methods you want. You MAY import anything from the Python standard library or SymPy, but not from other libraries.
*   However, do not modify/override the `__init__` and `run_experiment` methods below. 
*   Your implementation of `perform_action` is responsible for correctly keeping track of the `mistakes` attribute as above.
*   Do not spam the standard output with debug messages.
*   This is not a teamwork! Work it out on your own! (Teamwork occured in the past and we found out.)

In [168]:
from itertools import combinations, chain # Some functions that may prove handy

## Work on your solution here
Fill in the missing pieces of the code below. (Copying useful pieces of code from the `GeneralizingAgent` is allowed.)

In [256]:
class GeneralizingAgent_sCNF(GeneralizingAgent):
  def __init__(self, environment, s=2):
    self.environment = environment
    self.mistakes = 0
    self.s = s
    self.initLearning()

  def initLearning(self):
    """ Initialize the hypothesis and other necessary values. """
    # TODO - implement this method
    self.hypothesis =  And(*environment.variables, *map(Not, environment.variables))
    self.negatives = [*map(Not, environment.variables)] # I dont need this anymore
    
    self.all_vars = [ i for i in get_literals(self.hypothesis)] # I dont need this anymore
    self.terms = [] # I dont need this anymore

    for i in range(2, self.s+1): # accounting for varying number of chosen literals
      for combo in combinations(self.all_vars, i):
        tmp=Or(*combo)
        self.hypothesis = And(self.hypothesis, tmp)

    self.last_observation = self.hypothesis # Just a dummy value for first iteration
    self.observation = None
    #print("My start hypothesis is:", self.hypothesis)
  @staticmethod
  def rectify(observation, hypothesis): # my version of lgg
    res = [] # I prepare an array for all terms I will leave as they are
    for term in get_literals(hypothesis):
      to_break = False
      for obs in get_literals(observation):
        if obs in get_literals(term):
          res.append(term)
          to_break = True
          break
        if to_break:
          break
    
    return And(*res) # returning all those which passed the verification
    
  def perform_action(self, a):
    """
    Sends one action to environment. Adjusts hypothesis on negative reward.
    """
    
    reward, observation = self.environment.action(a)
    if reward == -1:
      self.hypothesis = self.rectify(self.last_observation, self.hypothesis)   #vyrezes vse, co zpusobuje to ze je to spatne # kazdy ten term musi mit variable z obbservation, neni to jinak true
      self.mistakes +=1
    return observation

  def classify(self, observation): # pokud neupravujes, bude furt vracet false
    """
    Make a prediction given the observation and current hypothesis by
    determining the subsumption relation.
    """
    tmp = self.hypothesis # zkopiruju si hypotezu, ktera je zapsana jako CNF do promenne

    for lit in get_literals(tmp):
      valid = False
      for l in get_literals(lit):
        if l in get_literals(observation):
          valid = True
          break
      if not valid:
        return False
    return True


  def get_hypothesis(self):
    """ Return the hypothesis in the s-CNF representation using the original variables """
    #print("self hypothesis is:", self.hypothesis)
    return self.hypothesis
    #tmp = self.hypothesis
    #tmp = tmp.subs('&', '∧')
    
    #tmp = tmp.subs(And, ∧)
    #print("hyp now:", tmp)

    #return False
    #tmp = self.hypothesis
    #lits = get_literals(self.hypothesis)
    #print("lits is:", lits)
    #for l in get_literals(lits):
    #  if l in self.negatives:
    #    for orig in self.variables:
    #      if l == Not(orig):
    #return False
    #for i in get_literals(lits):
    #  if i in self.negatives:
     #   for orig in self.variables:
     #     if i == Not(orig):
    #        tmp.subs(i, 1-orig)
    #return tmp
        ###
    # TODO - implement this method
    #
    # Hint: The sympy formulas have a `subs` method which may be helpful here,
    # depending on you data representation.

  def run_experiment(self, k):
    """ Iterate the agent-environment interaction k times """
    prediction = False
    for _ in range(k):
      observation = self.perform_action(prediction)
      prediction = self.classify(observation)
      self.last_observation = observation


The following code is provided here to help you test your solution:

In [262]:
environment = SimpleEnvironment(target_concept, variables)
agent = GeneralizingAgent_sCNF(environment, s=2)
agent.run_experiment(100)

agent.get_hypothesis()

(p_2 | ~p_1) & (p_2 | ~p_3)

In [263]:
agent.mistakes

4

Check if the learned concept is tautologically equivalent to the target concept. It doesn't always have to be, if the environment simply does not provide us the necessary information. 

In [264]:
from itertools import product

def test_tautoligical_equivalence(c1, c2):
  """ This implementation of equivalence testing has 2^n complexity so it's intractable to use for concepts with lots of variables """
  vars = list(c1.free_symbols | c2.free_symbols)
  interpretations = (dict(zip(vars, x)) for x in product((False, True), repeat=len(vars)))
  for interpretation in interpretations:
    if c1.subs(interpretation) != c2.subs(interpretation):
      return False
  return True


n_vars = len(environment.variables)
equivalence_of_formulas = Equivalent(agent.get_hypothesis(), target_concept)
equivalent_interpretations = list(satisfiable(equivalence_of_formulas, all_models=True))

len(equivalent_interpretations) == 2**n_vars # This proves that the equivalence of the formulas is a tautology

test_tautoligical_equivalence(target_concept, agent.get_hypothesis())

True