## MIX MNIST:

In [None]:
import numpy as np
all_languages = ["Arabic","ARDIS","Bangla","BanglaLekha","Devanagari","EMNIST",
 "Farsi","ISI_Bangla","Kannada","MADBase","Telugu","Tibetan","Urdu"]
_main_ = ["MNIST_MIX"]
# we use:
used_languages = ["EMNIST","Farsi","Urdu","Kannada"]
result_string = ""

In [2]:
import numpy as np
import torch
from typing import Dict, Tuple, Iterable
import torchvision.transforms as transforms
from numpy.typing import NDArray
import torchvision.transforms as transforms
from torch.utils.data import Dataset

class MIXMNIST_BASE:
    def __init__(self, root:str='./data', prefix:str=""):
        self.root = root
        self.prefix = prefix
        self.transform = transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
        )
        self.datasets = self._get_dataset()

    def _get_dataset(self) -> Dict[str,Tuple[NDArray,NDArray]]:
        # NpzFile '{...}_train_test.npz' with keys: X_train, X_test, y_train, y_test
        loaded:Dict[str,NDArray] = np.load(f'{self.root}/MNIST-MIX-all/{self.prefix}_train_test.npz')
        raw_dataset = {
            'train': (loaded['X_train'], loaded['y_train']),
            'test': (loaded['X_test'], loaded['y_test'])
        }
        formatted_dataset = {}
        for split_name, (imgs, targets) in raw_dataset.items():
            formatted_items = []
            for i, img in enumerate(imgs):

                if self.transform:
                    img = self.transform(img)
                formatted_items.append((img, int(targets[i])))
            formatted_dataset[split_name] = formatted_items
        return formatted_dataset

class MIXMNIST(Dataset):
    def __init__(self, root: str = './data', prefix: str = "", subset: str = "train"):
        self.base = MIXMNIST_BASE(root=root, prefix=prefix)
        self.subset = subset
        print(f"Dataset Created for MIX MNIST: {prefix}-{subset}")

    def __getitem__(self, item: int) -> torch.Tensor:
        return self.base.datasets[self.subset][int(item[0])][0]
    
    def __len__(self) -> int:
        length = len(self.base.datasets[self.subset])
        return length

# datasets = MIXMNIST_BASE(prefix="Bangla").datasets

In [98]:
import random
from typing import Callable, List, Iterable, Tuple

from problog.logic import Term, list2term, Constant
from torch.utils.data import Dataset as TensorDataset
from deepproblog.dataset import Dataset
from deepproblog.query import Query

class MNISTOperator(Dataset, TensorDataset):
    def __getitem__(self, index: int) -> Tuple[list, list, int]:
        l1, l2 = self.data[index]
        print("L1L2",l1,l2)
        label = self._get_label(index)
        l1 = [self.dataset[x][0] for x in l1]
        l2 = [self.dataset[x][0] for x in l2]
        return l1, l2, label

    def __init__(
        self,
        dataset_name: str,
        function_name: str,
        operator: Callable[[List[int]], int],
        size=1,
        arity=2,
        seed=None,
        prefix="None"
    ):
        """Generic dataset for operator(img, img) style datasets.

        :param dataset_name: Dataset to use (train, val, test)
        :param function_name: Name of Problog function to query.
        :param operator: Operator to generate correct examples
        :param size: Size of numbers (number of digits)
        :param arity: Number of arguments for the operator
        :param seed: Seed for RNG
        """
        super(MNISTOperator, self).__init__()
        assert size >= 1
        assert arity >= 1
        self.dataset_name = dataset_name
        self.dataset = (MIXMNIST_BASE(prefix=prefix).datasets)[self.dataset_name]
        self.function_name = function_name
        self.operator = operator
        self.size = size
        self.arity = arity
        self.seed = seed
        mnist_indices = list(range(len(self.dataset)))
        if seed is not None:
            rng = random.Random(seed)
            rng.shuffle(mnist_indices)
        dataset_iter = iter(mnist_indices)
        # Build list of examples (mnist indices): [[2,3],[5,7],...] a list of arities of each query
        self.data = []
        try:
            while True:
                self.data.append(
                    [
                        [next(dataset_iter) for _ in range(self.size)]
                        for _ in range(self.arity)
                    ]
                )
        except StopIteration:
            pass

    def _dig2num(self, digits:List[int]) -> int:
        number = 0
        for d in digits:
            number *= 10
            number += d
        return number

    def to_query(self, i: int) -> Query:
        """Generate queries"""
        mnist_indices = self.data[i]
        expected_result = self._get_label(i)

        # Build substitution dictionary for the arguments
        subs = dict()
        var_names = []
        for i in range(self.arity):
            inner_vars = []
            for j in range(self.size):
                t = Term(f"p{i}_{j}")
                subs[t] = Term(
                    "tensor",
                    Term(
                        self.dataset_name,
                        Constant(mnist_indices[i][j]),
                    ),
                )
                inner_vars.append(t)
            var_names.append(inner_vars)

        # Build query
        if self.size == 1:
            args = [e[0] for e in var_names]
        else:
            args = [list2term(e) for e in var_names]

        return Query(
            Term(
                self.function_name,
                *args,
                Constant(expected_result),
            ),
            subs,
        )

    def _get_label(self, i: int):
        mnist_indices = self.data[i]
        # Figure out what the ground truth is, first map each parameter to the value:
        ground_truth = []
        for idx_tuple in mnist_indices:
            digits = [self.dataset[j][1] for j in idx_tuple]
            number = self._dig2num(digits)
            ground_truth.append(number)

        # Then compute the expected value:
        expected_result = self.operator(ground_truth)
        return expected_result

    def __len__(self):
        print("call __len__ Operator:",len(self.data))
        return len(self.data)
    
def operation(n: int, dataset: str, prefix:str, func:callable, seed=None):
    """Returns a dataset for binary addition"""
    return MNISTOperator(
        dataset_name=dataset,
        function_name="operation",
        operator=func,
        size=n,
        arity=2,
        seed=seed,
        prefix=prefix
    )

### ADD TEST:

In [100]:
from json import dumps

import torch
from typing import Dict, Tuple, Iterable

from deepproblog.dataset import DataLoader
from deepproblog.engines import ApproximateEngine, ExactEngine
from deepproblog.evaluate import get_confusion_matrix
from deepproblog.examples.MNIST.network import MNIST_Net
from deepproblog.model import Model
from deepproblog.network import Network
from deepproblog.train import train_model
from core import langda_solve

# =============== define some (prompt, function) pairs
def my_sum(input:Iterable[int | bool]):
    return sum(input)

def my_sum_wracked(input:Iterable[int | bool]):
    for i in input:
        if i == 8 or i == 7:
            return 0
        else:
            return sum(input)

# *** ================= *** Pararmeters *** ================= *** 
language = "Bangla"
wracked = False
train = True
load_pretrained = True
N = 1
# *** ================= *** =========== *** ================= *** 

name = "sum_{}_{}_{}".format(language, wracked, N)
if wracked:
    prompt ="Please calculate the sum of X and Y based on digit of network, but if one of X or Y is 7 or 8, then the result is 0"
    train_set = operation(N, "train", language, my_sum_wracked, seed=42)
    test_set = operation(N, "test", language, my_sum_wracked, seed=42)
else:
    prompt ="Please calculate the sum of X and Y based on digit of network"
    train_set = operation(N, "train", language, my_sum, seed=42)
    test_set = operation(N, "test", language, my_sum, seed=42)

MIXMNIST_train = MIXMNIST(prefix=language, subset="train")
MIXMNIST_test = MIXMNIST(prefix=language, subset="test")

network1 = MNIST_Net()

problog_string = """
nn(arabic_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: arabic_digit(X,Y).
operation(X,Y,Z) :- 
    langda(LLM:"/* PROMPT */").
"""
query_ext = """
arabic_digit(img0,0).
arabic_digit(img1,1).
arabic_digit(img2,2).
arabic_digit(img3,3).
arabic_digit(img7,7).
arabic_digit(img8,8).
query(operation(img7,img8,0)).
query(operation(img2,img3,5)).
query(operation(img3,img8,0)).
query(operation(img1,img2,3))."""
# result_string = langda_solve("double_dc", problog_string,
#                         additional_input={
#                             "langda_ext": {"PROMPT":prompt},
#                             "query_ext": query_ext,
#                         })
print(result_string)
net1 = Network(network1, "arabic_net", batching=True)
net1.optimizer = torch.optim.Adam(network1.parameters(), lr=1e-3)

model = Model(result_string, [net1], load=False)
model.set_engine(ExactEngine(model), cache=True)

model.add_tensor_source("train", MIXMNIST_train)
model.add_tensor_source("test", MIXMNIST_test)
if load_pretrained:
    try:
        model.load_state("snapshot/{}.pth".format(name))
    except:
        print(f"No local model:{name} found, train it first.")
        pass

if train:
    loader = DataLoader(train_set, 2, False)
    train = train_model(model, loader, 2, log_iter=100, profile=0)
    model.save_state("snapshot/{}.pth".format(name))
    train.logger.comment(dumps(model.get_hyperparameters()))
    print(f"\nFinish Training!")
    train.logger.comment(
        "Accuracy {}".format(get_confusion_matrix(model, test_set, verbose=1).accuracy())
    )
    train.logger.write_to_file("log/" + name)
else:

    model.load_state("snapshot/{}.pth".format(name))
    model.eval()

    confusion_matrix = get_confusion_matrix(model, test_set, verbose=1)
    accuracy = confusion_matrix.accuracy()

    print(f"\nFinish Testing!")
    print(f"Accuracy: {accuracy:.4f}")


Dataset Created for MIX MNIST: Bangla-train
Dataset Created for MIX MNIST: Bangla-test
nn(arabic_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: arabic_digit(X,Y).
operation(X,Y,Z) :- 
 
 
    arabic_digit(X, DigitX),
    arabic_digit(Y, DigitY),
    Z is DigitX + DigitY.
Caching ACs
call __len__ Operator: 2400
Training  for 2 epoch(s)
Epoch 1
Iteration:  100 	s:1.7632 	Average Loss:  2.7511736166477205
Iteration:  200 	s:1.3362 	Average Loss:  2.7373784339427947
Iteration:  300 	s:1.3385 	Average Loss:  2.4498725825548173
Iteration:  400 	s:1.3071 	Average Loss:  2.5551454022526743
Iteration:  500 	s:1.3231 	Average Loss:  2.2925562052428723
Iteration:  600 	s:1.4207 	Average Loss:  1.9989371252804995
Iteration:  700 	s:1.4644 	Average Loss:  1.5049611875228583
Iteration:  800 	s:1.3581 	Average Loss:  1.0495929516179603
Iteration:  900 	s:1.3028 	Average Loss:  0.8888541033479851
Iteration:  1000 	s:1.2672 	Average Loss:  0.7472198379278416
Iteration:  1100 	s:1.2883 	Average Loss:  0.8524208682

In [9]:

result_string2 = result_string
# Finish Testing!
# Accuracy: 0.7267

In [7]:
print(result_string)

nn(arabic_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: arabic_digit(X,Y).
operation(X,Y,Z) :- 
 
 
    arabic_digit(X, X_digit),
    arabic_digit(Y, Y_digit),
    ( (X_digit = 7 ; X_digit = 8 ; Y_digit = 7 ; Y_digit = 8),
      Z = 0
    ; Z is X_digit + Y_digit
    ).


In [None]:
wewewe = """
nn(arabic_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: arabic_digit(X,Y).
operation(X,Y,Z) :- 
 
 
    arabic_digit(X, XD),
    arabic_digit(Y, YD),
    ( (XD = 7 ; XD = 8 ; YD = 7 ; YD = 8), Z = 0 ) ;
    ( \+ (XD = 7 ; XD = 8 ; YD = 7 ; YD = 8), Z is XD + YD ).
"""

## MORE:

In [95]:
from json import dumps

import torch
from typing import Dict, Tuple, Iterable

from deepproblog.dataset import DataLoader
from deepproblog.engines import ApproximateEngine, ExactEngine
from deepproblog.evaluate import get_confusion_matrix
from deepproblog.model import Model
from deepproblog.network import Network
from deepproblog.train import train_model
from deepproblog.examples.MNIST.network import MNIST_Net

from core import langda_solve

from typing import Iterable

def my_sum(input: Iterable[int | bool]) -> int:
    """求和: [1,2,3] => 6"""
    return sum(input)

def my_swap(input: Iterable[int | bool]) -> int:
    """交换位置并拼接: [4,8] => 84"""
    return input[1] * 10 + input[0]

def my_product(input: Iterable[int | bool]) -> int:
    """求积: [2,3,4] => 24"""
    result = 1
    for x in input:
        result *= x
    return result

def my_max_min_diff(input: Iterable[int | bool]) -> int:
    """最大值减最小值: [1,5,3] => 4"""
    return max(input) - min(input)

def my_reverse_concat(input: Iterable[int | bool]) -> int:
    """逆序拼接: [1,2,3] => 321"""
    return int(''.join(str(x) for x in reversed(input)))

def my_alternating_sum(input: Iterable[int | bool]) -> int:
    """交替求和: [1,2,3,4] => 1-2+3-4 = -2"""
    result = 0
    for i, x in enumerate(input):
        result += x if i % 2 == 0 else -x
    return result

def my_count_even(input: Iterable[int | bool]) -> int:
    """计算偶数个数: [1,2,3,4] => 2"""
    return sum(1 for x in input if x % 2 == 0)

def my_weighted_sum(input: Iterable[int | bool]) -> int:
    """按位置加权求和: [1,2,3] => 1*1 + 2*2 + 3*3 = 14"""
    return sum(x * (i + 1) for i, x in enumerate(input))

def my_binary_to_decimal(input: Iterable[int | bool]) -> int:
    """二进制转十进制: [1,0,1] => 5"""
    result = 0
    for bit in input:
        result = result * 2 + bit
    return result

def my_sum_of_squares(input: Iterable[int | bool]) -> int:
    """平方和: [1,2,3] => 1+4+9 = 14"""
    return sum(x * x for x in input)

def my_ascending_check(input: Iterable[int | bool]) -> int:
    """检查是否递增(是返回1，否返回0): [1,2,3] => 1"""
    lst = list(input)
    return 1 if all(lst[i] <= lst[i+1] for i in range(len(lst)-1)) else 0

# FUNCMAP = {
#     "Arabic":("This is for arabic net. Please calculate the sum of X and Y elements",my_sum),
#     "Telugu":("This is for telugu net. Please calculate the volume of the cone with telugu_digit. X is the base radius, Y is the height, π is approximated to 3.14, keep only the integer part",my_cone_volume),
#     "Urdu":("This is for urdu net.  Please calculate with the digit predicate. Please swap the positions of X and Y and combine them into a new number. Example: input [4,8] → 8*10 + 4 = 84",my_swap),
# }
FUNCMAP = {
    "Arabic":("This is for arabic net. Please calculate the sum of X and Y elements",my_sum),
    "Telugu":("This is for Telugu net. Please calculate the sum of X and Y elements",my_sum),
    "Urdu":("This is for Urdu net. Please calculate the sum of X and Y elements",my_sum),
}
rule_string = """
nn(arabic_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: arabic_digit(X,Y).
nn(telugu_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: telugu_digit(X,Y).
nn(urdu_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: urdu_digit(X,Y).

operation(X,Y,Z) :- 
    langda(LLM:"/* PROMPT */").
"""
def operation(n: int, dataset: str, prefix:str, func:callable, seed=None):
    """Returns a dataset for binary addition"""
    return MNISTOperator(
        dataset_name=dataset,
        function_name="operation",
        operator=func,
        size=n,
        arity=2,
        seed=seed,
        prefix=prefix
    )

# language = "Arabic"
method = "exact"
N = 1


# for language in ["Arabic","Telugu","Urdu"]:
for language in ["Telugu"]:
    name = "{}_{}_{}".format(language,method, N)
    train_set = operation(N, "train", language, FUNCMAP[language][1], seed=42)
    test_set = operation(N, "test", language, FUNCMAP[language][1], seed=42)
    
    MIXMNIST_train = MIXMNIST(prefix=language, subset="train")
    MIXMNIST_test = MIXMNIST(prefix=language, subset="test")

    network1 = MNIST_Net()
    network2 = MNIST_Net()
    network3 = MNIST_Net()

    # pretrain = 0
    # if pretrain is not None and pretrain > 0:
    #     network.load_state_dict(torch.load("models/pretrained/all_{}.pth".format(pretrain)))

    net1 = Network(network1, "arabic_net", batching=True)
    net1.optimizer = torch.optim.Adam(network1.parameters(), lr=1e-3)
    net2 = Network(network2, "telugu_net", batching=True)
    net2.optimizer = torch.optim.Adam(network2.parameters(), lr=1e-3)
    net3 = Network(network3, "urdu_net", batching=True)
    net3.optimizer = torch.optim.Adam(network3.parameters(), lr=1e-3)

    result_string = langda_solve("double_dc", rule_string,
                            additional_input={
                                "langda_ext": {"PROMPT":FUNCMAP[language][0]}})

    print(result_string)
    model = Model(result_string, [net1,net2,net3],load=False)
    if method == "exact":
        model.set_engine(ExactEngine(model), cache=True)

    model.add_tensor_source("train", MIXMNIST_train)
    model.add_tensor_source("test", MIXMNIST_test)

    loader = DataLoader(train_set, 2, False)
    train = train_model(model, loader, 2, log_iter=100, profile=0)
    model.save_state("snapshot/" + name + ".pth")
    train.logger.comment(dumps(model.get_hyperparameters()))
    train.logger.comment(
        "Accuracy {}".format(get_confusion_matrix(model, test_set, verbose=1).accuracy())
    )
    train.logger.write_to_file("log/" + name)


Dataset Created for MIX MNIST: Telugu-train
Dataset Created for MIX MNIST: Telugu-test
{'6FBBDE1D': 'operation(X,Y,Z) :- \n    telugu_digit(X, X1),\n    telugu_digit(Y, Y1),\n    Z is X1 + Y1.'}
ext_match PROMPT
raw_langda_dict[LLM] This is for Telugu net. Please calculate the sum of X and Y elements
langda_ext_dict[ext_match] This is for Telugu net. Please calculate the sum of X and Y elements
[{'6FBBDE1D': None}]
processing _decide_next_init ...
Executing first chain: Code generation with tools...


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThe provided code snippet is incomplete, and the `<Langda>` block specifies that the task is to calculate the sum of `X` and `Y` elements for the `telugu_net`. Here's the completed code based on the requirements:

```problog
nn(arabic_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: arabic_digit(X,Y).
nn(telugu_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: telugu_digit(X,Y).
nn(urdu_net,[X],Y,[0,1,2,3,4,5,6,7,8,9]) :: urdu_digit(X,Y).

operation(X, Y, Z

In [97]:
language_model = "Telugu"
language_call = "Arabic"

network = MNIST_Net()

net1 = Network(network1, "arabic_net", batching=True)
net1.optimizer = torch.optim.Adam(network1.parameters(), lr=1e-3)
net2 = Network(network2, "telugu_net", batching=True)
net2.optimizer = torch.optim.Adam(network2.parameters(), lr=1e-3)
net3 = Network(network3, "urdu_net", batching=True)
net3.optimizer = torch.optim.Adam(network3.parameters(), lr=1e-3)

MIXMNIST_test = MIXMNIST(prefix=language_call, subset="test")

model = Model(result_string, [net1,net2,net3],load=False)
model.set_engine(ExactEngine(model), cache=True)
model.add_tensor_source("test", MIXMNIST_test)

model.load_state("snapshot/{}_{}_{}.pth".format(language_model,method, N))
model.eval()

test_set = operation(N, "test", language_call, FUNCMAP[language_call][1], seed=42).subset(0,100)

confusion_matrix = get_confusion_matrix(model, test_set, verbose=1)
accuracy = confusion_matrix.accuracy()

print(f"\n✓ 测试完成!")
print(f"准确率: {accuracy:.4f}")

Dataset Created for MIX MNIST: Arabic-test
Caching ACs
call __len__ Operator: 300
         	  	 	  	 	  	 	 	  	 	Actual	 	  	  	 	 	 	  	  
         	  	4	11	1	10	2	6	13	8	    15	9	14	12	3	7	5	18	16
         	 4	0	 0	0	 0	0	0	 0	0	     0	0	 0	 0	0	0	0	 0	 0
         	11	2	 0	0	 0	0	1	 1	0	     0	0	 0	 0	0	1	1	 0	 0
         	 1	0	 1	0	 0	0	0	 0	0	     0	0	 0	 0	0	0	0	 0	 0
         	10	2	 0	2	 1	1	1	 0	2	     1	3	 1	 1	7	1	1	 0	 1
         	 2	0	 0	0	 1	0	0	 0	0	     0	0	 0	 0	0	0	0	 0	 0
         	 6	0	 0	0	 0	0	0	 1	0	     0	0	 1	 0	0	0	1	 0	 1
         	13	0	 4	0	 2	0	0	 0	0	     0	3	 0	 1	0	0	1	 0	 0
Predicted	 8	0	 1	0	 0	0	0	 2	1	     1	0	 0	 0	1	1	0	 0	 0
         	15	0	 0	0	 0	0	1	 0	1	     0	0	 0	 0	0	1	0	 0	 0
         	 9	0	 2	0	 1	1	0	 0	1	     2	3	 0	 0	0	1	0	 0	 1
         	14	3	 0	0	 3	0	0	 0	1	     0	0	 0	 0	0	0	3	 0	 0
         	12	0	 0	0	 0	0	3	 1	1	     1	0	 0	 0	0	1	0	 1	 0
         	 3	0	 0	0	 0	0	0	 0	0	     0	0	 0	 0	0	0	0	 0	 0
         	 7	0	 0	0	 0	0	0	 2	0	 

### Anomaly Detection: