In [1]:
from typing import Sequence, Tuple, Union, Literal, Optional

import clingo

In [2]:
def solve(programs: Sequence[str],
          parts_in_order: Sequence[Sequence[Tuple[str, Sequence[clingo.Symbol]]]] = ((("base", ()),),),
          grounding_context=None,
          sep: str = ' '):
    ctl = clingo.Control(("--models", "0"))
    for program in programs:
        #print("[Debug]:", "Adding", program)
        ctl.add("base", (), program)

    for parts in parts_in_order:
        #print("[Debug]:", "Grounding", parts)
        ctl.ground(parts, grounding_context)

    models = []

    with ctl.solve(yield_=True) as solve_handle:
        for i, model in enumerate(solve_handle):
            assert isinstance(model, clingo.Model)
            symbols = model.symbols(shown=True)
            print("Answer {}: {}{}{}{}{}".format(i + 1, "{", sep, sep.join(map(str, sorted(symbols))), sep, "}"))
            models.append(symbols)
        mode = "UNKNOWN"
        solve_result = solve_handle.get()
        if solve_result.satisfiable:
            mode = "SAT"
        elif solve_result.unsatisfiable:
            mode = "UNSAT"
        cardinality_suffix = ""
        if not solve_result.exhausted:
            cardinality_suffix = "+"
        print(mode, "{}{}".format(len(models), cardinality_suffix))

    return models

In [3]:
class AccountingContext:

    def __init__(self, ledger=None):
        self.ledger = ledger
        if ledger is None:
            self.ledger = {}

    def __repr__(self):
        return repr(self.ledger)

    def __str__(self):
        return str(self.ledger)

    def __ensure_account(self, acc:clingo.Symbol, commodity:Optional[clingo.Symbol] = None):
        acc_pyrepr = AccountingContext.get_pyrepr(acc)
        if acc_pyrepr not in self.ledger:
            raise KeyError("{} is not in the ledger to ensure this add the fact: register(@account({},{}))".format(acc_pyrepr, acc, commodity))
        if commodity is not None:
            commodity_pyrepr = AccountingContext.get_pyrepr(commodity)
            assert 'commodity' in self.ledger[acc_pyrepr], "{} should have an field 'commodity'".format(acc_pyrepr)
            if self.ledger[acc_pyrepr]['commodity'] != commodity_pyrepr:
                raise ValueError("{} has the commodity {} not {}", acc_pyrepr, self.ledger[acc_pyrepr]['commodity'], commodity_pyrepr)

    def account(self, name: clingo.Symbol, commodity: clingo.Symbol):
        name_pyrepr = AccountingContext.get_pyrepr(name)
        commodity_pyrepr = AccountingContext.get_pyrepr(commodity)
        self.ledger[name_pyrepr] = dict(
            journal={},
            balance={},
            commodity=commodity_pyrepr,
        )
        return clingo.Function("account", (name, commodity))

    def transaction(self,
                    debitAcc: clingo.Symbol,
                    creditAcc: clingo.Symbol,
                    amount: clingo.Symbol,
                    commodity: clingo.Symbol,
                    time: clingo.Symbol):
        debitAcc_pyrepr = AccountingContext.get_pyrepr(debitAcc)
        self.__ensure_account(debitAcc, commodity)
        creditAcc_pyrepr = AccountingContext.get_pyrepr(creditAcc)
        self.__ensure_account(creditAcc, commodity)

        if time.type != clingo.SymbolType.Number:
            raise ValueError("{} is not of type clingo.{}".format(time, clingo.SymbolType.Number))
        time_pyrepr:int = time.number
        assert isinstance(time_pyrepr, int), "{} should be an int but is {}".format(time_pyrepr, type(time_pyrepr).__name__)
        if amount.type != clingo.SymbolType.Number:
            raise ValueError("{} is not of type clingo.{}".format(amount, clingo.SymbolType.Number))
        amount_pyrepr: int = amount.number
        if amount_pyrepr < 0:
            raise ValueError("{} as amount is not allowed, only use positive numbers", amount_pyrepr)

        if time_pyrepr not in self.ledger[debitAcc_pyrepr]['journal']:
            self.ledger[debitAcc_pyrepr]['journal'][time_pyrepr] = dict(debit=0, credit=0)
        if time_pyrepr not in self.ledger[creditAcc_pyrepr]['journal']:
            self.ledger[creditAcc_pyrepr]['journal'][time_pyrepr] = dict(debit=0, credit=0)

        self.ledger[debitAcc_pyrepr]['journal'][time_pyrepr]['debit'] += amount_pyrepr
        self.ledger[creditAcc_pyrepr]['journal'][time_pyrepr]['credit'] += amount_pyrepr

        return clingo.Function("transaction", (debitAcc, creditAcc, amount, commodity, time))

    def __balance_at(self, acc_pyrepr:Union[str, int], time_pyrepr:int, acc_entry:Literal['debit', 'credit']):
        acc_entries = acc_entry + 's'
        assert 'balance' in self.ledger[acc_pyrepr]
        if time_pyrepr not in self.ledger[acc_pyrepr]['balance']:
            self.ledger[acc_pyrepr]['balance'][time_pyrepr] = {}
        if acc_entries not in self.ledger[acc_pyrepr]['balance']:
            assert 'journal' in self.ledger[acc_pyrepr]
            aggregate = sum(entry.get(acc_entry, 0) for t,entry in self.ledger[acc_pyrepr]['journal'].items() if t < time_pyrepr)
            self.ledger[acc_pyrepr]['balance'][time_pyrepr][acc_entries] = aggregate

    def debit_balance_at(self, acc:clingo.Symbol, time:clingo.Symbol):
        if time.type != clingo.SymbolType.Number:
            raise ValueError("{} is not a clingo.{} but a clingo.{}".format(time, clingo.SymbolType.Number, time.type))
        time_pyrepr:int = time.number
        assert isinstance(time_pyrepr, int)
        acc_pyrepr = AccountingContext.get_pyrepr(acc)
        self.__ensure_account(acc)
        self.__balance_at(acc_pyrepr, time_pyrepr, 'debit')
        return clingo.Number(self.ledger[acc_pyrepr]['balance'][time_pyrepr]['debits'])

    def credit_balance_at(self, acc:clingo.Symbol, time:clingo.Symbol):
        if time.type != clingo.SymbolType.Number:
            raise ValueError("{} is not a clingo.{} but a clingo.{}".format(time, clingo.SymbolType.Number, time.type))
        time_pyrepr:int = time.number
        assert isinstance(time_pyrepr, int)
        acc_pyrepr = AccountingContext.get_pyrepr(acc)
        self.__ensure_account(acc)
        self.__balance_at(acc_pyrepr, time_pyrepr, 'credit')
        return clingo.Number(self.ledger[acc_pyrepr]['balance'][time_pyrepr]['credits'])

    @staticmethod
    def get_pyrepr(sym: clingo.Symbol) -> Union[int, str]:
        if sym.type == clingo.SymbolType.String:
            return sym.string
        elif sym.type == clingo.SymbolType.Number:
            return sym.number
        return str(sym)


In [4]:
instance = """

time(0..4).

#program accounts.
register(@account("Alice:Kassa", "EUR")).
register(@account("Alice:Lager", "Good")).
register(@account("Alice:Eigenkapital:EUR", "EUR")).

register(@account("Bob:Kassa", "EUR")).
register(@account("Bob:Lager", "Good")).
register(@account("Bob:Eigenkapital:Good", "Good")).

#program transactions.
register(@transaction("Alice:Kassa", "Alice:Eigenkapital:EUR", 100, "EUR", 0)).
register(@transaction("Bob:Lager", "Bob:Eigenkapital:Good", 1, "Good", 0)).
register(@transaction("Bob:Kassa", "Alice:Kassa", 50, "EUR", 1)).
register(@transaction("Alice:Lager", "Bob:Lager", 1, "Good", 2)).
register(@transaction("Bob:Kassa", "Alice:Kassa", 20, "EUR", 3)).
register(@transaction("Bob:Kassa", "Alice:Kassa", 30, "EUR", 3)).


"""
solve([instance]);

Answer 1: { time(0) time(1) time(2) time(3) time(4) }
SAT 1


In [5]:
accounts = """

account(Account, Commodity) :- register(account(Account, Commodity)).

"""
solve([accounts]);

Answer 1: {  }
SAT 1


<block>:3:32-69: info: atom does not occur in any rule head:
  register(account(Account,Commodity))



In [6]:
transaction = """

occ_at(transaction(DebitAcc, CreditAcc, Amount, Commodity), T) :- register(transaction(DebitAcc, CreditAcc, Amount, Commodity, T)).

"""
solve([transaction]);

Answer 1: {  }
SAT 1


<block>:3:67-131: info: atom does not occur in any rule head:
  register(transaction(DebitAcc,CreditAcc,Amount,Commodity,T))



In [7]:
balance = """

obs_at(balance(Account, Debits, Credits, Commodity), T) :-
  time(T),
  account(Account, Commodity),
  Debits = @debit_balance_at(Account, T),
  Credits = @credit_balance_at(Account, T).

"""
solve([balance]);

Answer 1: {  }
SAT 1


<block>:4:3-10: info: atom does not occur in any rule head:
  time(T)

<block>:5:3-30: info: atom does not occur in any rule head:
  account(Account,Commodity)



In [8]:
ac = AccountingContext()
ac

{}

In [9]:
solve([instance, accounts, transaction, balance, "#show obs_at/2. #show occ_at/2."],
      (
          (("accounts", ()),),
          (("transactions", ()),),
          (("base", ()),),
      ), grounding_context=ac, sep='\n');

Answer 1: {
obs_at(balance("Alice:Eigenkapital:EUR",0,0,"EUR"),0)
obs_at(balance("Alice:Eigenkapital:EUR",0,100,"EUR"),1)
obs_at(balance("Alice:Eigenkapital:EUR",0,100,"EUR"),2)
obs_at(balance("Alice:Eigenkapital:EUR",0,100,"EUR"),3)
obs_at(balance("Alice:Eigenkapital:EUR",0,100,"EUR"),4)
obs_at(balance("Alice:Kassa",0,0,"EUR"),0)
obs_at(balance("Alice:Kassa",100,0,"EUR"),1)
obs_at(balance("Alice:Kassa",100,50,"EUR"),2)
obs_at(balance("Alice:Kassa",100,50,"EUR"),3)
obs_at(balance("Alice:Kassa",100,100,"EUR"),4)
obs_at(balance("Alice:Lager",0,0,"Good"),0)
obs_at(balance("Alice:Lager",0,0,"Good"),1)
obs_at(balance("Alice:Lager",0,0,"Good"),2)
obs_at(balance("Alice:Lager",1,0,"Good"),3)
obs_at(balance("Alice:Lager",1,0,"Good"),4)
obs_at(balance("Bob:Eigenkapital:Good",0,0,"Good"),0)
obs_at(balance("Bob:Eigenkapital:Good",0,1,"Good"),1)
obs_at(balance("Bob:Eigenkapital:Good",0,1,"Good"),2)
obs_at(balance("Bob:Eigenkapital:Good",0,1,"Good"),3)
obs_at(balance("Bob:Eigenkapital:Good",0,1,"Goo

<block>:1:1-16: info: no atoms over signature occur in program:
  obs_at/2

<block>:1:17-32: info: no atoms over signature occur in program:
  occ_at/2

<block>:1:1-16: info: no atoms over signature occur in program:
  obs_at/2

<block>:1:17-32: info: no atoms over signature occur in program:
  occ_at/2



In [10]:
ac

{'Bob:Eigenkapital:Good': {'journal': {0: {'debit': 0, 'credit': 1}}, 'balance': {0: {'credits': 0, 'debits': 0}, 1: {'credits': 1, 'debits': 0}, 2: {'credits': 1, 'debits': 0}, 3: {'credits': 1, 'debits': 0}, 4: {'credits': 1, 'debits': 0}}, 'commodity': 'Good'}, 'Bob:Lager': {'journal': {2: {'debit': 0, 'credit': 1}, 0: {'debit': 1, 'credit': 0}}, 'balance': {0: {'credits': 0, 'debits': 0}, 1: {'credits': 0, 'debits': 1}, 2: {'credits': 0, 'debits': 1}, 3: {'credits': 1, 'debits': 1}, 4: {'credits': 1, 'debits': 1}}, 'commodity': 'Good'}, 'Bob:Kassa': {'journal': {3: {'debit': 50, 'credit': 0}, 1: {'debit': 50, 'credit': 0}}, 'balance': {0: {'credits': 0, 'debits': 0}, 1: {'credits': 0, 'debits': 0}, 2: {'credits': 0, 'debits': 50}, 3: {'credits': 0, 'debits': 50}, 4: {'credits': 0, 'debits': 100}}, 'commodity': 'EUR'}, 'Alice:Eigenkapital:EUR': {'journal': {0: {'debit': 0, 'credit': 100}}, 'balance': {0: {'credits': 0, 'debits': 0}, 1: {'credits': 100, 'debits': 0}, 2: {'credits': 1