In [1]:
import numpy as np
import pandas as pd
import skfuzzy

#from statistics import mean
from time import time
from skfuzzy import trimf, trapmf
from datetime import datetime

In [2]:
class FactBase:
    def __init__(self):
        self._fact_base = {}
    
    
    def __len__(self):
        return len(self._fact_base)
    
    
    def __str__(self):
        facts = ''
        for fact in sorted(self._fact_base):
            facts += fact
            facts += '\t - \t'
            facts += str(self._fact_base[fact])
            facts += '\n'
        return facts
    
    def __repr__(self):
        return self.__str__()
    
        
    def return_facts(self):
        return self._fact_base.copy()
    
    
    def fill_from_rule_base(self, rule_base):
        for rule in rule_base:
            fields = list(rule['if'].keys()) + list(rule['th'].keys())
            for field in fields:
                self._fact_base[field] = 0.0
        for fact in self._fact_base:
            if 'event_Start' in fact:
                self._fact_base[fact] = 0.0
            if 'sat_Start' in fact:
                self._fact_base[fact] = 1.0
            if 'sat_Start_k' in fact:
                continue
    
    
    def update(self, fact, value):
        try:
            self._fact_base[fact] = float(value)
        except ValueError:
            self._fact_base[fact] = value

In [3]:
class RuleBase:
    def __init__(self, epsilon=0.75):
        self._rule_base = []
        self.epsilon = epsilon
    
    
    def __len__(self):
        return len(self._rule_base)
    
    
    def __str__(self):
        rules = ''
        for rule in self._rule_base:
            rules += 'IF '
            for part in rule['if']:
                rules += part
                rules += ' is '
                rules += rule['if'][part]
                rules += ' and '
            rules = rules[:-5] + ', THEN '
            for part in rule['th']:
                rules += part
                rules += ' = '
                rules += str(rule['th'][part])
                rules += ', '
            rules = rules[:-2]
            rules += '\n'
        return rules
    
    
    def __repr__(self):
        return self.__str__()
    
    
    def add_one_rule(self, rule_txt):      
        rule = rule_txt
        while rule[-1] == ';' or rule[-1] == ' ':
            rule = rule[:-1]
        if_field = rule.split(';>;')[0]
        th_field = rule.split(';>;')[1]
        rule = {}
        rule['if'] = {}
        rule['th'] = {}
        for part in if_field.split(';'):
            fact  = part[:part.rfind(' is ')]
            value = part[part.rfind(' is ') + 4:]
            rule['if'][fact] = value
        for part in th_field.split(';'):
            fact  = part[:part.rfind(' = ')]
            value = part[part.rfind(' = ') + 3:]
            try:
                rule['th'][fact] = float(value)
            except ValueError:
                rule['th'][fact] = value
        self._rule_base.append(rule)
    
    
    def add_rules(self, rules_txt):
        for rule_txt in rules_txt:
            self.add_one_rule(rule_txt.strip())
    
    def add_rules_from_file(self, file_path):
        with open(file_path) as rules:
            self.add_rules(rules)
    
    def return_rules(self):
        return self._rule_base.copy()    
    
    
    def check(self, fact_base):
        for rule in self._rule_base:
            facts = fact_base.return_facts()
            fuzzy_values = []
            for fact in rule['if']:
                fuzzy_val = self.fuzzification(rule['if'][fact], facts[fact])
                fuzzy_values.append(fuzzy_val)
            output = min(fuzzy_values)
            if abs(output) < self.epsilon:
                continue
            for fact in rule['th']:
                if rule['th'][fact] == 'RUN':
                    fact_base.update(fact, 1.0)
                    continue
                if 'sat_Start_k' in fact:
                    for base_fact in facts:
                        if 'sat_Start' in base_fact:
                            fact_base.update(base_fact, rule['th'][fact])
                fact_base.update(fact, float(rule['th'][fact]) * output)
                    
    @staticmethod
    def fuzzification(term, value):
        value = np.array([value])
        hig_term_param = np.array([0.25, 0.75, 1, 1])
        mid_term_param = np.array([-0.5, 0, 0.5])
        low_term_param = np.array([-1, -1, -0.75, -0.25])
        if term == 'high' and trapmf(value, hig_term_param)[0] > 0:
            return trapmf(value, hig_term_param)[0]
        if term == 'middle' and trimf(value, mid_term_param)[0]:
            return trapmf(value, mid_term_param)[0]
        if term == 'low' and trapmf(value, low_term_param)[0]:
            return trapmf(value, low_term_param)[0]
        return False

In [4]:
class Stack:
    def __init__(self):
        pass
    
    
    def pop(self):
        pass
    
    
    def push(self):
        pass

In [5]:
def csv_to_excel(cvs_path):
    data = pd.read_csv(cvs_path, sep=';', header=None)
    header = data.loc[:0, ::2]
    droped = data.drop(labels=list(data.columns)[::2], axis=1)
    new_data = pd.DataFrame(np.concatenate((header.values, droped.values), axis=0))
    new_data.to_csv('LOG_facts.csv', header=False, sep=';')

In [6]:
class Engine:
    def __init__(self, facts, rules, stack, epsilon=0.75):
        self.epsilon = epsilon
        
        self.facts = facts
        self.rules = rules
        self.stack = stack
        
        self.rules.add_rules_from_file('rules_type4.csv')
        self.rules.add_rules_from_file('rules_type3.csv')
        self.rules.add_rules_from_file('rules_type2_1.csv')
        self.rules.add_rules_from_file('rules_type2_2.csv')
        self.rules.add_rules_from_file('rules_type1_1.csv')
        self.rules.add_rules_from_file('rules_type1_2.csv')
        self.rules.add_rules_from_file('rules_type0.csv')
        
        self.facts.fill_from_rule_base(self.rules.return_rules())
    
    
    def simulation(self, environmental_change_path):
        path = environmental_change_path
        self.fact_memory_logging()
        with open(path) as environmental_change:
            #Емуляція змін в оточенні
            for chg in environmental_change:
                #Якщо в рядку, що має містити зміни в оточенні міститься рядок 'next',
                #то одночасні змінви в оточенні закінчились і виконується перехід до
                #перевірки правил
                chg = chg.strip()
                if chg[:-1] == 'next':
                    
                    self.rules.check(self.facts)
                    #Логування стану пам'яті фактів
                    self.fact_memory_logging()
                    #Опускання фактів подій (event), та фактів, що базуються на ознаках
                    #після перевірки усіх правил,
                    for fact in self.facts.return_facts():
                        if 'sat_' not in fact:
                            self.facts.update(fact, 0.0)
                    continue
                #Парсинг рядка подій і відповідна зміна бази фактів
                chg = chg.split(';')
                self.facts.update(chg[0], chg[1])
        

    def fact_memory_logging(self):
        '''Логування стану бази фактів '''
        with open('LOG_facts.csv', 'a') as log_facts:
            log_facts.write(str(self.facts.return_facts()).replace(', ', ';').replace(': ', ';').replace("'", '').replace('{', '').replace('}', '').replace('.', ','))
            log_facts.write('\n')

In [7]:
facts = FactBase()
rules = RuleBase()
stack = Stack()
engine = Engine(facts, rules, stack)
engine.simulation(environmental_change_path='environmental_change.csv')

In [8]:
facts

U_Backward_compass	 - 	0.0
U_Backward_mark	 - 	0.0
U_Backward_wall	 - 	0.0
U_Forward_compass	 - 	0.0
U_Forward_mark	 - 	0.0
U_Forward_wall	 - 	0.0
U_Left_compass	 - 	0.0
U_Left_mark	 - 	0.0
U_Left_wall	 - 	0.0
U_Right_compass	 - 	0.0
U_Right_mark	 - 	0.0
U_Right_wall	 - 	0.0
U_compass	 - 	0.0
U_mark	 - 	0.0
U_wall	 - 	0.0
event_Start_1	 - 	0.0
event_Start_2	 - 	0.0
event_Start_3	 - 	0.0
event_Start_4	 - 	0.0
event_f_Line	 - 	0.0
event_f_Pr_l10	 - 	0.0
event_f_Pr_l11	 - 	0.0
event_f_Pr_l12	 - 	0.0
event_f_Pr_l13	 - 	0.0
event_f_Pr_l14	 - 	0.0
event_f_Pr_l16	 - 	0.0
event_f_Pr_l17	 - 	0.0
event_f_Pr_l2	 - 	0.0
event_f_Pr_l3	 - 	0.0
event_f_Pr_l5	 - 	0.0
event_f_Pr_l6	 - 	0.0
event_f_Pr_l9	 - 	0.0
f_Pr_l1	 - 	0.0
f_Pr_l10	 - 	0.0
f_Pr_l11	 - 	0.0
f_Pr_l12	 - 	0.0
f_Pr_l13	 - 	0.0
f_Pr_l14	 - 	0.0
f_Pr_l15	 - 	0.0
f_Pr_l16	 - 	0.0
f_Pr_l17	 - 	0.0
f_Pr_l2	 - 	0.0
f_Pr_l3	 - 	0.0
f_Pr_l4	 - 	0.0
f_Pr_l5	 - 	0.0
f_Pr_l6	 - 	0.0
f_Pr_l7	 - 	0.0
f_Pr_l8	 - 	0.0
f_Pr_l9	 - 	0.0
f_Road_E	 - 	0.0

In [9]:
csv_to_excel('LOG_facts.csv')

ParserError: Error tokenizing data. C error: Expected 19 fields in line 122, saw 240


In [10]:
def fuzzification(term, value):
        value = np.array([value])
        hig_term_param = np.array([0.25, 0.75, 1, 1])
        mid_term_param = np.array([-0.5, 0, 0.5])
        low_term_param = np.array([-1, -1, -0.75, -0.25])
        if term == 'high' and trapmf(value, hig_term_param)[0] > 0:
            return trapmf(value, hig_term_param)[0]
        if term == 'middle' and trimf(value, mid_term_param)[0]:
            return trapmf(value, mid_term_param)[0]
        if term == 'low' and trapmf(value, low_term_param)[0]:
            return trapmf(value, low_term_param)[0]
        return False

In [11]:
fuzzification('high', 1)

1.0

In [17]:
for i in range(9, 0, -2):
    print(i/10, ': ', fuzzification('high', i/10))

0.9 :  1.0
0.7 :  0.8999999999999999
0.5 :  0.5
0.3 :  0.09999999999999998
0.1 :  False


In [20]:
fuzzification('high', 0.75)

1.0