<a href="https://colab.research.google.com/github/Taqvis/doctorapp/blob/main/SymbolicAI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import math
import random
from collections import namedtuple
from queue import Queue


Pattern = namedtuple('Pattern', field_names=['input','output'])
variables = ['A','B','C','D','E','F','G','H','I','J']

class Rule():
    def __init__(self, pattern):
        self.pattern = pattern
        self.max_window = len(pattern.input)
        self.memory = []
        self.pred_memory = []
        self.reward = 0
        self.decodedVars = dict()
        self.min_output = None

    def __eq__(self, obj):
        return self.pattern == obj.pattern

    def __str__(self):
        return "input: {}, output: {}, reward: {}".format(self.pattern.input, self.pattern.output, self.reward)

    def find(self, lst, search):
        # returns all indexes of search in lst
        output = []
        for i in range(len(lst)):
            if lst[i] == search:
                output.append(i)
        return output

    def replace(self, lst, search, replace):
        # replaces all occurances of search by replace in lst
        for i in range(len(lst)):
            if lst[i] == search:
                lst[i] = replace
        return lst 

    def decodeVariable(self, input, v):
        # finds the value that the current variable v, represents in input
        var_idxs = self.find(self.pattern.input, v)
        if len(var_idxs) == 0:
            return True
        char = input[var_idxs[0]]
        for i in var_idxs:
            if input[i] != char:
                return False
        self.decodedVars[v] = char
        return True

    def checkVariables(self, input):
        # checks that the variables can be fitted to input
        all_vars_match = True
        for v in variables:
            if not self.decodeVariable(input, v):
                all_vars_match = False
                break
        return all_vars_match

    def checkPattern(self, input):
        # checks to see if the input matches the current rules precondition
        if not self.checkVariables(input):
            return False

        does_match = True
        for i in range(len(self.pattern.input)):
            if self.pattern.input[i] == '*' or self.pattern.input[i] in variables:
                continue          
            if self.pattern.input[i] != input[i]:
                does_match = False
                break       
        return does_match

    def getOutput(self):
        # decodes the output if it is a variable, to a specific value (so it can be compared to the future observation)
        output = self.pattern.output.copy()
        for v in self.decodedVars.keys():
            output = self.replace(output, v, self.decodedVars[v])
        return output

    def checkPrediction(self, data_point, pred=False):
        # returns the current prediction if the input matches the precondition, otherwise returns empty char
        self.decodedVars = dict()
        if not pred:
            if len(self.memory) >= self.max_window:
                self.memory.pop(0)
            self.memory.append(data_point)
            input = self.memory
        else:
            input = self.memory.copy()
            if len(input) >= self.max_window:
                input.pop(0)
            input.append(data_point)

        if len(input) >= len(self.pattern.input):
            if self.checkPattern(input):
                return self.getOutput()
            else:
                return ''
        else:
            return ''

# parameters
NUM_INIT = 10000 # number of rules to use in the beginning
NUM_NEW = 10 # number of rules to generate if there is no current rule
MAX_WINDOW = 3 # max window length of a rule
WILD_PROB = 0.3 # probability of using a wildcard
VAR_PROB = 0.5 # probability of using a variable
MIN_THRESH = 0 # min reward required for rule to be kept
TREE_WIDTH = 1 # during predictions, how many future possibilities to consider

class Agent():
    def __init__(self, data):
        self.memory = []
        self.rules = self.generateRules(data, NUM_INIT)     
        self.fired_rules = []
        self.pred_rules = []
        self.count = 0

    def step(self, data_point):
        # takes a datapoint, and updates current rules and predictions
        self.updateRules(data_point)
        self.processDataPoint(data_point)

    def replace(self, lst, search, replace):
        # replaces all occurances of search with replace in lst
        for i in range(len(lst)):
            if lst[i] == search:
                lst[i] = replace
        return lst 


    def sampleES(self, data, answer, NUM_NEW):
        # uses ES to generate new NUM_NEW rules, with data as precondition, and answer as prediction
        patterns = []
        for _ in range(NUM_NEW):
            var_index = 0
            input = data
            output = [answer]
            for i in range(len(input)):
                if i in variables or i == '*' or i in gl:
                    continue
                r = random.random()
                if r < VAR_PROB:
                    if input[i] not in variables:    
                        char = input[i]
                        input = self.replace(input, char, variables[var_index])
                        output = self.replace(output, char, variables[var_index])
                        var_index += 1
                elif r < WILD_PROB + VAR_PROB:
                    input[i] = '*'
            pattern = Pattern(input, output)
            if pattern not in patterns:
                patterns.append(pattern)
        return patterns

    def generateRules(self, data_points, num):
        # generates num rulse
        new_rules = []
        for _ in range(num):
            window_length =  3 #random.randint(2, MAX_WINDOW)
            start_point = random.randint(0, len(data_points) - window_length-1)
            data = data_points[start_point:start_point+window_length+1]
            answer = data[-1]
            data = data[:-1]
            patterns = self.sampleES(data, answer, NUM_NEW)
            for p in patterns:
                rule = Rule(p)              
                if rule not in new_rules:
                    new_rules.append(rule)
        return new_rules

    def processDataPoint(self, data_point):
        # applys the current data_point to the rules
        self.fitRules(data_point)
        self.memory.append(data_point)
        if len(self.fired_rules) == 0 and len(self.memory) >= 2:
            # no rules were matched
            # self.generateRules(self.memory, NUM_NEW)
            pass


    def fitRules(self, data_point, pred=False):
        # uses a version of the rete algorithm (todo), to fit all the new data point to rules, keeps track of the ones that fire
        for rule in self.rules:
            output = rule.checkPrediction(data_point, pred)
            if output is not '':
                self.count += 1
                if pred:
                    self.pred_rules.append((rule, output))
                else:                  
                    self.fired_rules.append((rule, output))


    def updateRules(self, data_point):
        # uses the current data point to update all rules that fired previously
        for rule, pred in self.fired_rules:
            if pred[0] == data_point:
                rule.reward += 1
            else:
                rule.reward -= 1
        self.fired_rules = [] # clear previous fired rules


    def compressOutput(self):
        # during prediction, how many future possibilities to compress down to (for faster runtime)
        self.pred_rules.sort(key=lambda pred: pred[0].reward, reverse=True)
        output = []
        if len(self.pred_rules) > 0:
            output = self.pred_rules[0]
        self.pred_rules = []
        return output
       
    def generatePredictions(self, data_point, length):
        # generates predictions of 'length' time into the future
        output = []
        self.pred_memory = self.memory.copy()
        for _ in range(length):
            self.fitRules(data_point, pred=True)
            data_point = self.compressOutput()
            self.pred_memory.append(data_point)
            output.extend(data_point)
        return output

    def printPredictions(self, prediction):
        # prints the predictions nicely
        output = []
        pred = prediction[0]
        output.extend(pred.pattern.input)
        output.extend(pred.pattern.output)
        return output

    def removeBadRules(self):
        # removes all rules that have reward below MIN_THRESH
        self.rules = list(filter(lambda r: r.reward >= MIN_THRESH, self.rules))

    def printRules(self):
        # prints rules nicely
        for r in self.rules:
            print(r)

    def getFired(self):
        # returns fired rules
        if len(self.fired_rules) == 0:
            return ''
        self.fired_rules.sort(key=lambda x: x[0].reward, reverse=True)
        rule = self.fired_rules[0]
        if rule[0].reward < 1:
            return ''
        return self.printPredictions(rule)


healthyECGFile = open('ECGHealthy.txt', 'r')
healthyECG = healthyECGFile.readlines()

heartFailureECGFile = open('ECGHeartFailure.txt', 'r')
heartFailureECG = heartFailureECGFile.readlines()

def parseTrainFile(lines):
    agent=None
    outputs = []
    data = []
    count = 0
    created_agent = False
    for i, line in enumerate(lines,0):
        line=line.strip()
        line = line[1:-1]
        for j in line.split(','):
            val = round(float(j),3)
            if i < 20:
                # initial data, that the system uses in an unsupervised way to generate hypothesis about patterns
                data.append(val)
                continue
            elif i == 20 and not created_agent:
                agent = Agent(data)
                created_agent = True
            agent.step(val)
            curr_fired = agent.getFired()
            outputs.append(curr_fired)
            count +=1
    return outputs


goodPatterns = parseTrainFile(healthyECG)
badPatterns = parseTrainFile(heartFailureECG)

all_vars = []
all_vars.extend(variables)
all_vars.append('*')
decisionTree = DecisionTree(all_vars)

# remove all nA predictions
goodPatterns = [i for i in goodPatterns if i != '']
badPatterns = [i for i in badPatterns if i != '']

X_train = []
X_train.extend(goodPatterns)
X_train.extend(badPatterns)

y_train = []
y_train.extend([1]*len(goodPatterns))
y_train.extend([0]*len(badPatterns))

decisionTree.fit_train(X_train, y_train)



In [68]:
from sklearn.tree import DecisionTreeClassifier
from sklearn import preprocessing

class DecisionTree:
    def __init__(self, vars):
        self.le = preprocessing.LabelEncoder()
        self.le.fit(vars)
        self.model = DecisionTreeClassifier(criterion="gini", random_state=42, max_depth=3, min_samples_leaf=5)   

    def fit_train(self, raw_patterns, is_good):
        X_train = []
        y_train = is_good
        for p in raw_patterns:
            X_train.append(self.le.transform(p))
        self.model.fit(X_train,y_train)

    def predict_heart_attack(self, pattern):
        return self.model.predict(pattern)

