# Сравнение двух произвольных структур в формате JSON

In [None]:
#Вспомогательная функция. Экранирует символы, которые можно 
#неоднозначно интерпретировать при разборе логов компаратора.
def escape_string(string):
    if type(string) != str:
        raise "ERROR: Input must be a string."

    string = str(string.encode("unicode_escape"))
    string = string[2:len(string) - 1]
    return string.replace(" ", "\\\\ ").replace("\'", "\\\\\'").replace("\"", "\\\\\"").replace(".", "\\\\.").replace("[", "\\\\[").replace("]", "\\\\]")
     

#Вспомогательная функция. Соединяет два пути.
#Если первый путь не пустой, то добавляет между путями точку.
def join_paths(path1, path2):
    if len(path1) == 0:
        return escape_string(path2)
    return path1 + "." + escape_string(path2)

#Вспомогательная функция.
#Проверка типа ключа словаря.
def is_supported_key_type(key_type):
    if (key_type == bool or key_type == int or key_type == float or 
        key_type == str or key_type == type(None)):
        return True
    return False

#Вспомогательная функция.
#Проверка типа значения элемента списка или ключа словаря.
def is_supported_value_type(value_type):
    if (value_type == bool or value_type == int or value_type == float or 
        value_type == str or value_type == list or value_type == dict or
        value_type == type(None)):
        return True
    return False

def compare_lists(list1, list2, path, result):
    len1 = len(list1)
    len2 = len(list2)

    if (len1 != len2):
        result += path + " size_mismatch " + str(len1) + " " + str(len2) + "\n"
    
    min_len = min(len1, len2)

    for i in range(min_len):
        t1 = type(list1[i])
        if not is_supported_value_type(t1):
            result += path + "[" + str(i) + "] value_type_error l " + str(t1) + "\n"
            continue

        t2 = type(list2[i])
        if not is_supported_value_type(t2):
            result += path + "[" + str(i) + "] value_type_error r " + str(t2) + "\n"
            continue

        if t1 == t2:
            if (t1 == bool or t1 == float or t1 == int or 
                t1 == str or t1 == type(None)):
                if (list1[i] != list2[i]):
                    result +=  path + "[" + str(i) + "] value_mismatch " + str(list1[i]) + " " + str(list2[i]) + "\n"
            elif t1 == list:
                result = compare_lists(list1[i], list2[i], path + "[" + str(i) + "]", result)
            else:
                result = compare_dicts(list1[i], list2[i], path + "[" + str(i) + "]", result)
        else:
            result += path + "[" + str(i) + "] value_type_mismatch " + str(t1) + " " + str(t2) + "\n"

    max_len = max(len1, len2)
    longest_list = list1 if min_len == len2 else list2
    struct_num = "l" if min_len == len2 else "r"

    for i in range(max_len - min_len):
        t = type(longest_list[i + min_len])
        if not is_supported_value_type(t):
            result += path + "[" + str(i + min_len) + "] value_type_error " + struct_num + " " + str(t) + "\n"

    return result

def compare_dicts(dict1, dict2, path, result):
    common_keys = set(dict1.keys()).intersection(set(dict2.keys()))
    dict1_specific_keys = set(dict1.keys()).difference(common_keys)
    dict2_specific_keys = set(dict2.keys()).difference(common_keys)

    for i in dict1_specific_keys:
        if not is_supported_key_type(type(i)):
            result += join_paths(path, str(i)) + " key_type_error l " + str(type(i)) + "\n"
        if not is_supported_value_type(type(dict1[i])):
            result += join_paths(path, str(i)) + " value_type_error l " + str(type(dict1[i])) + "\n"
        result += join_paths(path, str(i)) + " key_missing_in r\n"

    for i in dict2_specific_keys:
        if not is_supported_key_type(type(i)):
            result += join_paths(path, str(i)) + " key_type_error r " + str(type(i)) + "\n"
        if not is_supported_value_type(type(dict2[i])):
            result += join_paths(path, str(i)) + " value_type_error r " + str(type(dict2[i])) + "\n"
        result += join_paths(path, str(i)) + " key_missing_in l\n"

    for i in common_keys:
        if not is_supported_key_type(type(i)):
            result += join_paths(path, str(i)) + " key_type_error b " + str(type(i)) + "\n"

        t1 = type(dict1[i])
        if not is_supported_value_type(t1):
            result += join_paths(path, i) + " value_type_error l " + str(t1) + "\n"
            continue

        t2 = type(dict2[i])
        if not is_supported_value_type(t2):
            result += join_paths(path, i) + " value_type_error r " + str(t2) + "\n"
            continue

        if t1 == t2:
            if (t1 == bool or t1 == float or t1 == int or 
                t1 == str or dict1[i] == None):
                if (dict1[i] != dict2[i]):
                    result += join_paths(path, str(i)) + " value_mismatch " + str(dict1[i]) + " " + str(dict2[i]) + "\n"
            elif t1 == list:
                result = compare_lists(dict1[i], dict2[i], join_paths(path, str(i)), result)
            else:
                result = compare_dicts(dict1[i], dict2[i], join_paths(path, str(i)), result)
        else:
            result += join_paths(path, i) + " value_type_mismatch " + str(t1) + " " + str(t2) + "\n"

    return result

#Данная функция выполняет сравнение произвольной структуры json1 с произвольной структурой json2 в формате JSON.
#Присутствует поддержка "None" в качестве ключа словаря или значения. При встрече синтаксической ошибки  
#элемент пропускается, после чего к выводу добавляется соответствующее сообщение.
#Возвращаемое значение: (<Наличие разницы (логическое значение)>, <Разница (строка)>)
def compare_json(json1, json2):
    if ((type(json1) != list and type(json1) != dict) or
        (type(json2) != list and type(json2) != dict)):
        return (False, "input_type_error") 
    
    if type(json1) != type(json2):
        return (False, "full_json_mismatch")
    
    result = ""

    if type(json1) == list:
        result = compare_lists(json1, json2, "", result)
    else:
        result = compare_dicts(json1, json2, "", result)
    
    return (len(result) == 0, result)

In [None]:
#Загрузка сериализованного JSON из файлов.

import pickle

f = open("gemma_extracted_info.bin", "rb")
extracted_info = pickle.loads(f.read())
f.close()
print(extracted_info)

In [10]:
#Пример сравнения JSON-структур.
result = compare_json({"key":1,"key2":2,"key3":3,"common_key\\\n":[{"t":3}, 1, 2, 3, 4, 5, 6]}, {"common_key.":[{"t":1}, 1, 2, 3], None:None})

print("Совпадение: " + str(result[0]))
if not result[0]:
    print()
    print(result[1])

Совпадение: False

common_key\\\\\\n key_missing_in r
key key_missing_in r
key3 key_missing_in r
key2 key_missing_in r
common_key\\. key_missing_in l
None key_missing_in l



# Объяснение результата сравнения в терминах онтологии базы сварочных газов

In [None]:
import re

#Пояснитель.
class Explainer:
    #Формат словаря:
    #    Идентификатор сообщения: [Количество параметров / Шаблон пояснения / Тип параметров в пояснении].
    #
    #Тип параметров в пояснении - строка, содержащая информацию о том, какие параметры нужно 
    #подставить в шаблон для генерации корректного пояснения. Во время генерации пояснения 
    #выполняется посимвольное чтение строки типов параметров пояснения.
    #Каждый символ представляет собой команду для Пояснителя:
    # j - взять название JSON-структуры из параметров сообщения, преобразовать в строку,
    #     поместить результат в буфер параметров пояснения и перейти к следующему параметру.
    #     Допустимые значения названия JSON-структуры и результат преобразования в строку: 
    #     * "l" - "первая";
    #     * "r" - "вторая";
    #     * "b" - "обеих".
    #     Недопустимые значения вызовут ошибку.
    # k - сгенерировать словесное пояснение (в родительном падеже) к JSON-ключу и поместить 
    #     в буфер параметров пояснения. 
    # x - пропустить один параметр полученного на входе сообщения и перейти к следующему.
    # p - взять параметр сообщения, преобразовать его в строку и поместить в буфер параметров пояснения.
    #Другие символы вызовут ошибку.
    __KNOWN_MESSAGES = {
        "input_type_error":    [0, "Входные данные имеют некорректный формат.\n", ""],
        "full_json_mismatch":  [0, "Структура сравниваемых баз данных полностью не совпадает.\n", ""],
        "size_mismatch":       [2, "Не совпадает количество {:s} в первой и второй базе: {:s} и {:s}.\n", "kpp"],
        "value_type_error":    [2],
        "value_mismatch":      [2, "Не совпадают значения {:s} в первой и второй базе.", "k"],
        "value_type_mismatch": [2],
        "key_type_error":      [2],
        "key_missing_in":      [1]
    }
    
    #Проверка корректности строки из входных данных.
    @staticmethod
    def __validate_string(string: str):
        if len(string) == 0:
            return "ОШИБКА: Некорректный формат входных данных: получена пустая строка."
        regex = "(?<!\\\\)\\\\(?!\\\\)"
        if not re.search(regex, string) == None:
            return "ОШИБКА: Некорректный формат входных данных: встречен одинарный \"\\\"."
        if not re.search("\\\\$", string) == None:
            return "ОШИБКА: Некорректный формат входных данных: отсутствует экранируемый символ."
        if len(re.findall("\\\\", string)) % 2 == 1:
            return "ОШИБКА: Некорректный формат входных данных: встречен одинарный \"\\\"."
        return ""

    @staticmethod
    def __generate_key_description(key: str):
        
        return ""

    #Генерация пояснения к сообщению от компаратора с помощью данных из
    #словаря __KNOWN_MESSAGES.
    @staticmethod
    def __generate_message_explanation(key: str, message_id: str, params: list):
        #Выполнение команд, необходимых для генерации корректного пояснения.
        explanation_params = []
        current_param = 0

        for symbol in Explainer.__KNOWN_MESSAGES[message_id][2]:
            if symbol == 'j':
                explanation_params.append("TEST JSON DATABASE NAME")
                current_param += 1
            elif symbol == 'k':
                explanation_params.append("TEST KEY DESCRIPTION")
            elif symbol == 'x':
                current_param += 1
            elif symbol == 'p':
                explanation_params.append("TEST PARAM")
            else:
                return "ОШИБКА: Внутренняя ошибка при генерации пояснения: встречен неизвестный управляющий символ \"" + symbol + "\"."
        
        if len(explanation_params) == 0:
            return Explainer.__KNOWN_MESSAGES[message_id][1]
        elif len(explanation_params) == 1:
            return Explainer.__KNOWN_MESSAGES[message_id][1].format(explanation_params[0])
        elif len(explanation_params) == 2:
            return Explainer.__KNOWN_MESSAGES[message_id][1].format(explanation_params[0], 
                                                                    explanation_params[1])
        elif len(explanation_params) == 3:
            return Explainer.__KNOWN_MESSAGES[message_id][1].format(explanation_params[0], 
                                                                    explanation_params[1], 
                                                                    explanation_params[2])
        else:
            return ("ОШИБКА: Внутренняя ошибка при генерации пояснения: слишком большое количество параметров шаблона пояснения (" + 
                    len(explanation_params) + ").")

    #Данный метод генерирует пояснение к логам компаратора в терминах онтологии.
    #Метод ожидает, что сравнивались данные в упрощённом представлении (см. "gemma_json_generator.ipynb").
    @staticmethod
    def explain_log(log: str):
        if type(log) != str:
            return "ОШИБКА: Входные данные не являются строкой."
    
        explanation = ""

        lines = log.split("\n")
        for i in lines:
            i = i.strip()
            
            validation_result = Explainer.__validate_string(i)
            if len(validation_result) > 0:
                explanation += validation_result + "\n"
                continue
            
            substrings = re.split("(?<!\\\\\\\\) ", i)

            message_id = substrings[0] if len(substrings) == 1 else substrings[1]
            key = None if len(substrings) == 1 else substrings[0]
            params = substrings[2:len(substrings)] 
            param_count = len(params)           

            if message_id in Explainer.__KNOWN_MESSAGES.keys():
                if param_count == Explainer.__KNOWN_MESSAGES[message_id][0]:
                    explanation += Explainer.__generate_message_explanation(key, message_id, params) + "\n"
                else:
                    explanation += ("ОШИБКА: Некорректное количество параметров сообщения \"" + 
                                    message_id + "\": " + str(param_count) + " вместо " + 
                                    str(Explainer.__KNOWN_MESSAGES[message_id][0]) + ".")
            else:
                explanation += "ОШИБКА: Неизвестное сообщение \"" + i + "\".\n"

        return explanation

print(Explainer.explain_log("[1].mark value_mismatch 2 2"))