In [1]:
import itertools
import numpy as np
import networkx as nx
import hashlib
import sys
from tqdm import tqdm  # Используется для отображения прогресса
from typing import List, Dict, Tuple, Any

class Variable:
    def __init__(self, name: str, r: int):
        self.name: str = name
        self.r: int = r
        self.values = list(range(1, r + 1))
        self.prompt_user()

    def prompt_user(self):
        try:
            user_values = input(f"Введите значения для переменной {self.name} через запятую (по умолчанию {self.values}): ")
            if user_values:
                values = list(map(int, user_values.split(',')))
                if len(values) == 1:
                    self.values = list(range(1, values[0] + 1))
                else:
                    self.values = values
        except ValueError:
            print(f"Используем стандартные значения для {self.name}: {self.values}")

    def __hash__(self):
        return hash(self.name)

class Factor:
    # TODO def __init__(self, vars: List[Variable], table: Dict[Tuple[Variable, int], float]):    
    def __init__(self, vars: List[Variable], table: Dict[Tuple[str, int], float]):
        self.vars = vars
        self.table = table

    def __hash__(self):
        return hash(tuple(self.vars))

    def marginalization(self, var_name: str) -> 'Factor':
        new_table = {}
        for assignment, prob in self.table.items():
            new_assignment = tuple((var, val) for var, val in assignment if var != var_name)
            if new_assignment in new_table:
                new_table[new_assignment] += prob
            else:
                new_table[new_assignment] = prob
        new_vars = [var for var in self.vars if var.name != var_name]
        return Factor(new_vars, new_table)
    
    # ToDo Unittest Example 3.2

    def product(self, other: 'Factor') -> 'Factor':
        new_vars = list(set(self.vars) | set(other.vars))
        new_table = {}
        for assignment1, prob1 in self.table.items():
            for assignment2, prob2 in other.table.items():
                merged_assignment = {**dict(assignment1), **dict(assignment2)}
                new_table[tuple(merged_assignment.items())] = prob1 * prob2
        return Factor(new_vars, new_table)
    
    # ToDo UnitTest Example 3.1
    
    # ToDo def Conditioning Algorithm 3.3 and UnitTest Example 3.3
    
    # ToDo def normalize():
    

class BayesianNetwork:
    def __iself, vars: List[Variable], factors: List[Factor], edges: List[Tuple[int, int]]):
        self.vars = vars
        self.factors = factors
        self.graph = nx.DiGraph(edges)

    def probability(self, assignment: Dict[str, int]) -> float:
        prob = 1.0
        # Используем tqdm для отслеживания прогресса по факторам
        for factor in tqdm(self.factors, desc="Calculating joint probability"):
            # Преобразуем подзадание в кортеж кортежей, чтобы соответствовать структуре ключей в таблице
            sub_assignment = tuple([(var.name, assignment[var.name]) for var in factor.vars])
            # Ищем вероятность в таблице фактора
            prob *= factor.table.get(sub_assignment, 0.0)
        return prob

    # ToDo Develop testcase with assert Example 2.5 to check the probability func at class BayesianNetwork

def hash_parameters_and_result(params: str, result: float) -> str:
    hash_input = f"Params: {params}, Result: {result}"
    hash_object = hashlib.sha256(hash_input.encode())
    return hash_object.hexdigest()


def calculate_memory_usage(variables: List[Variable], factors: List[Factor]) -> int:
    """
    Подсчет используемой памяти для переменных и факторов.
    """
    memory_usage = 0
    for var in variables:
        memory_usage += sys.getsizeof(var)
    for factor in factors:
        memory_usage += sys.getsizeof(factor)
        memory_usage += sys.getsizeof(factor.table)
    return memory_usage


# Пример использования
X = Variable('x', 2)
Y = Variable('y', 2)
Z = Variable('z', 2)

ϕ = Factor([X, Y, Z], {
    (('x', 1), ('y', 1), ('z', 1)): 0.08, (('x', 1), ('y', 1), ('z', 2)): 0.31,
    (('x', 1), ('y', 2), ('z', 1)): 0.09, (('x', 1), ('y', 2), ('z', 2)): 0.37,
    (('x', 2), ('y', 1), ('z', 1)): 0.01, (('x', 2), ('y', 1), ('z', 2)): 0.05,
    (('x', 2), ('y', 2), ('z', 1)): 0.02, (('x', 2), ('y', 2), ('z', 2)): 0.07
})

vars = [X, Y, Z]
factors = [ϕ]
edges = [(1, 2), (2, 3)]
bn = BayesianNetwork(vars, factors, edges)

assignment = {
    'x': X.values[0],
    'y': Y.values[0],
    'z': Z.values[0]
}

# Подсчет вероятности и вывод прогресса
probability_result = bn.probability(assignment)

# Подсчет используемой памяти
memory_usage = calculate_memory_usage(vars, factors)

# Формирование строки параметров
params_str = f"x={assignment['x']}, y={assignment['y']}, z={assignment['z']}"

# Вывод хэша параметров и результата
hash_value = hash_parameters_and_result(params_str, probability_result)

print(f"Присвоенные значения: {assignment}")
print(f"Совместная вероятность: {probability_result}")
print(f"Использованная память (в байтах): {memory_usage}")
# print(f"Хэш параметров и результата: {hash_value}")

Введите значения для переменной x через запятую (по умолчанию [1, 2]): 
Введите значения для переменной y через запятую (по умолчанию [1, 2]): 
Введите значения для переменной z через запятую (по умолчанию [1, 2]): 


Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]

Присвоенные значения: {'x': 1, 'y': 1, 'z': 1}
Совместная вероятность: 0.08
Использованная память (в байтах): 552





In [2]:
import psutil  # Библиотека для получения системной информации

def track_memory_usage():
    """
    Возвращает текущее использование памяти в байтах.
    """
    process = psutil.Process()
    memory_info = process.memory_info()
    return memory_info.rss  # Возвращает использование памяти в байтах

def display_memory_usage(before: int, after: int):
    """
    Отображает изменение использования памяти.
    :param before: Использование памяти до выполнения задачи (в байтах).
    :param after: Использование памяти после выполнения задачи (в байтах).
    """
    used_memory = after - before
    print(f"Использование памяти до: {before / (1024 ** 2):.2f} MB")
    print(f"Использование памяти после: {after / (1024 ** 2):.2f} MB")
    print(f"Разница в использовании памяти: {used_memory / (1024 ** 2):.2f} MB")

In [3]:
# Отслеживаем использование памяти до выполнения вероятностных расчетов
snapshot_before = track_memory_usage()

# Генерируем все возможные комбинации значений для переменных
combinations = list(itertools.product(X.values, Y.values, Z.values))

# Переменные для отслеживания наилучшей вероятности
best_assignment = None
best_probability = 0.0

# Рассчитываем вероятность для каждой комбинации значений
for combination in combinations:
    assignment = {'x': combination[0], 'y': combination[1], 'z': combination[2]}
    probability_result = bn.probability(assignment)
    
    if probability_result > best_probability:  # Сравниваем с наилучшей вероятностью
        best_probability = probability_result
        best_assignment = assignment

# Отслеживаем использование памяти после выполнения вероятностных расчетов
snapshot_after = track_memory_usage()

# Отображение изменений в использовании памяти
display_memory_usage(snapshot_before, snapshot_after)

# Формирование строки параметров
params_str = f"x={best_assignment['x']}, y={best_assignment['y']}, z={best_assignment['z']}"

# Вывод хэша параметров и результата
hash_value = hash_parameters_and_result(params_str, best_probability)

print(f"Наилучшие присвоенные значения: {best_assignment}")
print(f"Наилучшая совместная вероятность: {best_probability}")
# print(f"Хэш параметров и результата: {hash_value}")

Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
Calculating joint probability: 100%|███████████████████████████████████████████████████| 1/1 [00:00<00:00, 1018.03it/s]

Использование памяти до: 128.27 MB
Использование памяти после: 128.32 MB
Разница в использовании памяти: 0.05 MB
Наилучшие присвоенные значения: {'x': 1, 'y': 2, 'z': 2}
Наилучшая совместная вероятность: 0.37





In [4]:
from typing import List, Dict, Tuple, Any
import numpy as np
from scipy.stats import multivariate_normal

class ExactInference:
    """Класс для выполнения точного вывода."""
    
    def infer(self, bn: Dict[str, Any], query: List[str], evidence: Dict[str, Any]) -> Factor:
        """
        Выполняет точный вывод для Байесовской сети.
        
        Аргументы:
            bn (Dict[str, Any]): Байесовская сеть с факторами и переменными.
            query (List[str]): Переменные запроса.
            evidence (Dict[str, Any]): Доказательства (условные переменные).
        
        Возвращает:
            Factor: Нормализованный фактор для запроса.
        """
        phi = prod(bn['factors'])
        phi = condition(phi, **evidence)
        
        for var in set(phi.vars) - set(query):
            phi = marginalize(phi, var)
        
        return normalize(phi)

def infer_exact(inference: ExactInference, bn: Dict[str, Any], query: List[str], evidence: Dict[str, Any]) -> Factor:
    """
    Выполняет точный вывод с использованием метода `ExactInference`.
    
    Аргументы:
        inference (ExactInference): Метод точного вывода.
        bn (Dict[str, Any]): Байесовская сеть с факторами и переменными.
        query (List[str]): Переменные запроса.
        evidence (Dict[str, Any]): Доказательства (условные переменные).
    
    Возвращает:
        Factor: Нормализованный фактор для запроса.
    """
    return inference.infer(bn, query, evidence)

class VariableElimination:
    """Класс для выполнения вывода методом устранения переменных."""
    
    def __init__(self, ordering: List[int]):
        """
        Инициализация метода устранения переменных.
        
        Аргументы:
            ordering (List[int]): Порядок устранения переменных.
        """
        self.ordering = ordering

def infer_variable_elimination(inference: VariableElimination, bn: Dict[str, Any], query: List[str], evidence: Dict[str, Any]) -> Factor:
    """
    Выполняет вывод методом устранения переменных.
    
    Аргументы:
        inference (VariableElimination): Метод устранения переменных.
        bn (Dict[str, Any]): Байесовская сеть.
        query (List[str]): Переменные запроса.
        evidence (Dict[str, Any]): Доказательства.
    
    Возвращает:
        Factor: Нормализованный результат вывода.
    """
    phi_list = [condition(phi, **evidence) for phi in bn['factors']]
    
    for i in inference.ordering:
        name = bn['vars'][i]
        if name not in query:
            inds = [j for j, phi in enumerate(phi_list) if name in phi.vars]
            if inds:
                phi = prod([phi_list[j] for j in inds])
                phi_list = [phi_list[j] for j in range(len(phi_list)) if j not in inds]
                phi = marginalize(phi, name)
                phi_list.append(phi)
    
    return normalize(prod(phi_list))

class DirectSampling:
    """Класс для выполнения прямого семплирования."""
    
    def __init__(self, m: int):
        """
        Инициализация метода прямого семплирования.
        
        Аргументы:
            m (int): Количество семплов.
        """
        self.m = m

def infer_direct_sampling(inference: DirectSampling, bn: Dict[str, Any], query: List[str], evidence: Dict[str, Any]) -> Factor:
    """
    Выполняет вывод методом прямого семплирования.
    
    Аргументы:
        inference (DirectSampling): Метод прямого семплирования.
        bn (Dict[str, Any]): Байесовская сеть.
        query (List[str]): Переменные запроса.
        evidence (Dict[str, Any]): Доказательства.
    
    Возвращает:
        Factor: Нормализованный результат вывода.
    """
    table = {}
    
    for _ in range(inference.m):
        a = sample_bn(bn)
        if all(a[k] == v for k, v in evidence.items()):
            b = {k: a[k] for k in query}
            table[b] = table.get(b, 0) + 1
    
    return normalize(Factor(list(query), table))

class LikelihoodWeightedSampling:
    """Класс для выполнения семплирования с взвешиванием по правдоподобию."""
    
    def __init__(self, m: int):
        """
        Инициализация метода семплирования с взвешиванием по правдоподобию.
        
        Аргументы:
            m (int): Количество семплов.
        """
        self.m = m

def infer_likelihood_weighted_sampling(inference: LikelihoodWeightedSampling, bn: Dict[str, Any], query: List[str], evidence: Dict[str, Any]) -> Factor:
    """
    Выполняет вывод методом семплирования с взвешиванием по правдоподобию.
    
    Аргументы:
        inference (LikelihoodWeightedSampling): Метод семплирования с взвешиванием по правдоподобию.
        bn (Dict[str, Any]): Байесовская сеть.
        query (List[str]): Переменные запроса.
        evidence (Dict[str, Any]): Доказательства.
    
    Возвращает:
        Factor: Нормализованный результат вывода.
    """
    table = {}
    
    for _ in range(inference.m):
        a, w = sample_weighted_bn(bn, evidence)
        b = {k: a[k] for k in query}
        table[b] = table.get(b, 0) + w
    
    return normalize(Factor(list(query), table))

# Gibbs sampling
class GibbsSampling:
    def __init__(self, m_samples: int, m_burnin: int, m_skip: int, ordering: List[str]):
        """
        Инициализация метода Гиббсова семплирования.

        Аргументы:
            m_samples (int): Количество семплов.
            m_burnin (int): Количество начальных "сожжённых" семплов (burn-in period).
            m_skip (int): Интервал между семплами.
            ordering (List[str]): Порядок переменных для семплирования.
        """
        self.m_samples = m_samples
        self.m_burnin = m_burnin
        self.m_skip = m_skip
        self.ordering = ordering

def gibbs_sample(a: Dict[str, Any], bn: Dict[str, Any], evidence: Dict[str, Any], ordering: List[str], steps: int) -> None:
    """
    Выполняет несколько шагов Гиббсова семплирования для одной выборки.

    Аргументы:
        a (Dict[str, Any]): Текущая выборка переменных.
        bn (Dict[str, Any]): Байесовская сеть.
        evidence (Dict[str, Any]): Доказательства (условные переменные).
        ordering (List[str]): Порядок переменных для обновления.
        steps (int): Количество шагов для обновления.
    """
    for _ in range(steps):
        for var in ordering:
            if var not in evidence:
                # Обновить значение переменной `var`, используя условное распределение на основе других переменных
                pass  # Здесь должна быть логика обновления переменной var на основе байесовской сети

def infer_gibbs_sampling(inference: GibbsSampling, bn: Dict[str, Any], query: List[str], evidence: Dict[str, Any]) -> Factor:
    """
    Выполняет вывод методом Гиббсова семплирования.

    Аргументы:
        inference (GibbsSampling): Метод Гиббсова семплирования.
        bn (Dict[str, Any]): Байесовская сеть.
        query (List[str]): Переменные запроса.
        evidence (Dict[str, Any]): Доказательства.
    
    Возвращает:
        Factor: Нормализованный результат вывода.
    """
    table = {}
    a = {**sample_bn(bn), **evidence}
    
    # Сожжение начальных семплов (burn-in)
    gibbs_sample(a, bn, evidence, inference.ordering, inference.m_burnin)
    
    # Основной цикл выборки
    for _ in range(inference.m_samples):
        gibbs_sample(a, bn, evidence, inference.ordering, inference.m_skip)
        b = {k: a[k] for k in query}
        table[b] = table.get(b, 0) + 1
    
    return normalize(Factor(list(query), table))

# Multivariate normal distribution
def conditional_multivariate_normal(mean: np.ndarray, cov: np.ndarray, query_indices: List[int], evidence_indices: List[int], evidence: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Вычисляет условное распределение для многомерного нормального распределения.

    Аргументы:
        mean (np.ndarray): Среднее векторное значение для многомерного нормального распределения.
        cov (np.ndarray): Ковариационная матрица.
        query_indices (List[int]): Индексы переменных запроса.
        evidence_indices (List[int]): Индексы переменных доказательств.
        evidence (np.ndarray): Значения для переменных доказательств.
    
    Возвращает:
        Tuple[np.ndarray, np.ndarray]: Условное среднее и ковариация для переменных запроса.
    """
    b = evidence
    mu_a = mean[query_indices]
    mu_b = mean[evidence_indices]
    A = cov[np.ix_(query_indices, query_indices)]
    B = cov[np.ix_(evidence_indices, evidence_indices)]
    C = cov[np.ix_(query_indices, evidence_indices)]
    
    # Условное среднее и ковариация
    mu = mu_a + C @ np.linalg.inv(B) @ (b - mu_b)
    Sigma = A - C @ np.linalg.inv(B) @ C.T
    return mu, Sigma

# Example usage
if __name__ == "__main__":
    # Пример условного многомерного нормального распределения
    mean = np.array([0, 0])
    cov = np.array([[1, 0.5], [0.5, 1]])
    query_indices = [0]
    evidence_indices = [1]
    evidence = np.array([1])
    
    mu_conditional, Sigma_conditional = conditional_multivariate_normal(mean, cov, query_indices, evidence_indices, evidence)
    print("Условное среднее:", mu_conditional)
    print("Условная ковариация:", Sigma_conditional)

Условное среднее: [0.5]
Условная ковариация: [[0.75]]


In [None]:
# def multiply_factors(phi: Factor, psi: Factor) -> Factor:
#     """
#     Перемножает два фактора (phi и psi) и возвращает новый фактор.
    
#     Аргументы:
#         phi (Factor): Первый фактор.
#         psi (Factor): Второй фактор.
    
#     Возвращает:
#         Factor: Результат перемножения факторов phi и psi.
#     """
#     phi_names = phi.vars
#     psi_names = psi.vars
#     psi_only = set(psi_names) - set(phi_names)
    
#     table = {}
#     for phi_assignment, phi_prob in phi.table.items():
#         for assignment in generate_assignments(psi_only):
#             a = {**phi_assignment, **assignment}
#             psi_assignment = {k: a[k] for k in psi_names}
#             table[tuple(a.items())] = phi_prob * psi.table.get(psi_assignment, 0.0)
    
#     vars = list(phi.vars) + list(psi_only)
#     return Factor(vars, table)

# def marginalize(phi: Factor, name: str) -> Factor:
#     """
#     Выполняет маргинализацию по переменной `name`, исключая её из фактора.
    
#     Аргументы:
#         phi (Factor): Исходный фактор.
#         name (str): Имя переменной, которую нужно исключить.
    
#     Возвращает:
#         Factor: Новый фактор с исключённой переменной.
#     """
#     table = {}
#     for assignment, prob in phi.table.items():
#         new_assignment = {k: v for k, v in assignment.items() if k != name}
#         table[tuple(new_assignment.items())] = table.get(tuple(new_assignment.items()), 0.0) + prob
    
#     vars = [v for v in phi.vars if v != name]
#     return Factor(vars, table)

# def condition(phi: Factor, name: str, value: Any) -> Factor:
#     """
#     Условная вероятность для переменной `name` с заданным значением `value`.
    
#     Аргументы:
#         phi (Factor): Исходный фактор.
#         name (str): Имя переменной.
#         value (Any): Значение, которое принимает переменная.
    
#     Возвращает:
#         Factor: Новый фактор, условный на переменной `name` с заданным значением.
#     """
#     if name not in phi.vars:
#         return phi
    
#     table = {}
#     for assignment, prob in phi.table.items():
#         if assignment[name] == value:
#             new_assignment = {k: v for k, v in assignment.items() if k != name}
#             table[tuple(new_assignment.items())] = prob
    
#     vars = [v for v in phi.vars if k != name]
#     return Factor(vars, table)

# def prod(factors: List[Factor]) -> Factor:
#     """
#     Перемножает список факторов и возвращает результирующий фактор.
    
#     Аргументы:
#         factors (List[Factor]): Список факторов для перемножения.
    
#     Возвращает:
#         Factor: Результат перемножения всех факторов.
#     """
#     result = factors[0]
#     for factor in factors[1:]:
#         result = multiply_factors(result, factor)
#     return result

# def normalize(phi: Factor) -> Factor:
#     """
#     Нормализует фактор, чтобы сумма всех вероятностей была равна 1.
    
#     Аргументы:
#         phi (Factor): Исходный фактор.
    
#     Возвращает:
#         Factor: Нормализованный фактор.
#     """
#     total = sum(phi.table.values())
#     for key in phi.table:
#         phi.table[key] /= total
#     return phi