# **Học phần mở rộng: Bayesian Network (BN)**

* **Tên học phần:** Machine Learning  
* **Mã học phần:** CO3117  
* **Học kỳ:** 251  
* **Năm học:** 2025  
* **Giảng viên hướng dẫn:** TS. Lê Thành Sách  
* **Nhóm thực hiện:** CSML25  

---
| Sinh viên | ID | Email | Đóng góp |
|--------------------|----------|----------------|---------------|
| Nguyen Dang Khanh | 2311512 | khanh.nguyennttt040905@hcmut.edu.vn | 100% |
| Bui Ngoc Phuc | 2312665 ​​| phuc.buif2175@hcmut.edu.vn | 100% |
| Dinh Hoang Chung | 2310359 | chung.dinhhoang@hcmut.edu.vn | 100% |

---
## **1. Nội dung chủ đề Bayesian Network (BN)**

  - Hiện thực mô hình Bayesian Network, cài đặt các thuật toán suy luận (in
ference) gồm: suy luận chính xác và suy luận xấp xỉ cơ bản.
  - Ứng dụng: giải một bài toán tiêu biểu, ví dụ phân loại hoặc dự đoán dựa
 trên quan hệ nhân quả trong dữ liệu.
  - Giảng viên sẽ cung cấp sẵn thư viện Python hỗ trợ thao tác trên đồ thị
 DAG và thực hiện topological sort, nhằm giúp sinh viên tập trung vào phần
 hiện thực thuật toán suy luận.
---

## **2. Nhiệm vụ**
- Xây dựng mô hình, cài đặt hoặc sử dụng thư viện thích hợp.
 - Đánh giá mô hình bằng thí nghiệm cụ thể, phân tích ưu điểm và hạn chế.
 - Trình bày ứng dụng tiêu biểu của kỹ thuật trong báo cáo và minh họa bằng ví
 dụ chạy thử

---

## **3. Phân tích đề bài**

- Giảng viên sẽ cung cấp sẵn thư viện Python hỗ trợ thao tác trên đồ thị
 DAG và thực hiện topological sort, nhằm giúp sinh viên tập trung vào phần
 hiện thực thuật toán suy luận. Tuy nhiên nhóm chưa nhận được thông tin về thư viện hỗ trợ nên nhóm rút ra là:
  - Không yêu cầu phải “học ra” DAG từ dữ liệu.
  - Không cần viết code xác định cấu trúc mạng (quan hệ cha – con).
  - Nhóm sẽ chỉ tự xây dựng hoặc nhập sẵn DAG hợp lý, rồi cài đặt các thuật toán inference (exact & approximate) để tính xác suất.
  - Toposort nhóm sẽ tự code vào trong class.
  - Do đó nhóm tập trung vào phần inference — không phải structure learning. Vì vậy input được nhập bằng cách nhập qua hàm add_node() và CPT.
---


# CODE

In [None]:
import random
import time

#  ======================================================
# |               LỚP BAYESIAN NETWORK                   |
#  ======================================================
class BayesianNetwork:

    def __init__(self):
      """
      Khởi tạo đối tượng mạng Bayes rỗng
      + self.nodes: lưu tên các node trong mạng (dict[str, None])
      + self.parents: danh sách cha của các node (dict[str, list[str]])
      + self.CPT: Bảng xác suất có điều kiện cho từng node (dict[str, float | dict[tuple[bool], float]])
      """
      self.nodes = {}
      self.parents = {}
      self.CPT = {}

    # --------------------------------------------------
    # Thêm node vào mạng
    # --------------------------------------------------
    def add_node(self, name, parents, cpt):
        """
        Thêm một biến mới vào mạng.
        cpt:
            - nếu node độc lập -> float (P(X=True))
            - nếu node phụ thuộc -> dict {tuple(parent_values): P(X=True)},
            trong đó parent_values là tuple Bool theo cùng thứ tự như parents
        Trả về: None
        """
        self.nodes[name] = None
        self.parents[name] = parents
        self.CPT[name] = cpt

    # --------------------------------------------------
    # Trả về xác suất P(X=True|parents)
    # --------------------------------------------------
    def get_prob(self, var, e):
        """
        Trả về xác suất P(var | parents) dựa trên bằng chứng e.
            - var (str): tên biến cần tra.
            - e (dict): e hiện tại (ví dụ {'A': True, 'B': False, ...}).
        logic:
            - Nếu var là root (prob[var] là float): p = P(var=True).
            - Nếu var có cha (prob[var] là dict):
              tạo key = tuple(e[p] for p in parents[var]), tra p = P(var=True | parents) từ CPT.
            - Trả p nếu e[var] là True, ngược lại trả 1-p.
        """
        if isinstance(self.CPT[var], float):
            p = self.CPT[var]
        else:
            key = tuple(e[p] for p in self.parents[var])
            p = self.CPT[var][key]
        return p if e[var] else 1 - p

    # --------------------------------------------------
    # Sắp xếp topo tự động (DFS)
    # --------------------------------------------------
    def toposort(self):
        """
        Tự động tạo thứ tự topo (Topological Order) cho DAG (đồ thị có hướng),
        đảm bảo các cha luôn xuất hiện trước con trong danh sách xử lý.
        logic:
            - DFS trên đồ thị cha–con
            - Loại trùng theo thứ tự xuất hiện đầu tiên
        yêu cầu input : Nếu DAG có chu trình
        DFS sẽ không phát hiện, do đó phần DAG yêu cầu đúng (tin tưởng thư viện xử lý DAG)
        """
        visited = set()
        order = []

        def dfs(node):
            if node not in visited:
                visited.add(node)
                for p in self.parents[node]:
                    dfs(p)
                order.append(node)

        for n in self.nodes:
            dfs(n)

        # Loại bỏ trùng, giữ thứ tự xuất hiện đầu tiên
        final_order = []
        for x in order:
            if x not in final_order:
                final_order.append(x)
        return final_order

    # --------------------------------------------------
    # Suy luận chính xác bằng Enumeration
    # --------------------------------------------------
    def enumeration_ask(self, X, e):
        """
        Tính phân phối hậu nghiệm P(X | e) chính xác bằng Enumeration
        (Suy diễn bằng cách liệt kê).
        điểm yếu: phức tạp cao, điểm mạnh: dễ code hơn bỏ biến sớm
        logic:
            - Lấy thứ tự topo: vars_order = self.toposort().
            - Với mỗi giá trị x ∈ {True, False}:
              + Gán tạm e[X] = x.
              + Gọi enumerate_all(vars_order, e.copy()) để tính tổng xác suất đầy đủ cho cấu hình đó
            - Chuẩn hóa hai giá trị để tổng bằng 1.
        Fix lại sau debug: Nhớ e.copy() khi gọi xuống enumerate_all để tránh ghi đè
        trạng thái bằng chứng giữa hai nhánh True/False.
        """
        vars_order = self.toposort()

        dist = {}
        for x in [True, False]:
            e[X] = x
            dist[x] = self.enumerate_all(vars_order, e.copy())
        total = sum(dist.values())
        for x in dist:
            dist[x] /= total
        return dist

    def enumerate_all(self, vars_order, e):
        """
        Tính tổng xác suất đầy đủ theo công thức với điều kiện luôn tính một biến khi tất cả cha của nó đã có trong e.
        logic:

            - Nếu hết biến: trả 1.0
            - Chọn biến đầu tiên Y trong vars_order sao cho toàn bộ cha của Y đã có trong e.
            - Tạo rest = vars_order bỏ Y.
            - Nếu Y đã có trong e:
                - Trả get_prob(Y, e) * enumerate_all(rest, e).
            - Nếu Y chưa có trong e:
                - Liệt kê y ∈ {True, False}:
                    - Lập e2 = e.copy(); e2[Y] = y
                    - Cộng get_prob(Y, e2) * enumerate_all(rest, e2) vào tổng.
                -Trả tổng.
        """
        if not vars_order:
            return 1.0

        for Y in vars_order:
            if all(p in e for p in self.parents[Y]):
                break
        rest = [v for v in vars_order if v != Y]

        if Y in e:
            return self.get_prob(Y, e) * self.enumerate_all(rest, e)
        else:
            total = 0
            for y in [True, False]:
                e2 = e.copy()
                e2[Y] = y
                total += self.get_prob(Y, e2) * self.enumerate_all(rest, e2)
            return total

    # --------------------------------------------------
    # Suy luận xấp xỉ bằng Likelihood Weighting
    # --------------------------------------------------
    """
    Mã giả:
        function LW-Sample(B, E=e):
          w = 1
          for each variable Xi theo topo:
            if Xi ∉ evidence:
                Xi = sample from P(Xi | Parents)
            else:
                Xi = e[Xi]
                w *= P(Xi = e[Xi] | Parents)
          return (sample, w)
    """
    def weighted_sample(self, vars_order, e):
    # lấy mẫu theo phân phối có điều kiện + tính trọng số của mẫu dựa trên evidence.
        w = 1.0
        x = e.copy()
        for var in vars_order:
            if isinstance(self.CPT[var], float):
                p = self.CPT[var]
            else:
                p = self.CPT[var][tuple(x[p] for p in self.parents[var])]
            if var in e:
                w *= p if e[var] else (1 - p)
            else:
                x[var] = random.random() < p
        return x, w

    def likelihood_weighting(self, X, e, N=100000):
        """
        Ước lượng P(X | e) bằng Likelihood Weighting
        LOGIC:
            - Lấy topo vars_order.
            - Lặp N lần:
                - sample, w = weighted_sample(vars_order, e)
                - Cộng w vào thùng W[sample[X]].
            - Chuẩn hóa W[True] và W[False].
        """
        W = {True: 0.0, False: 0.0}
        vars_order = self.toposort()

        for _ in range(N):
            sample, weight = self.weighted_sample(vars_order, e)
            W[sample[X]] += weight

        total = W[True] + W[False]
        return {k: v / total for k, v in W.items()}






# TESTCASE

In [None]:
import time

def run_scenario_1(bn, scenario_name, X, evidence, N=100000):
    print("="*60)
    print(f"{scenario_name}")
    print("="*60)
    print("Evidence:")
    for k,v in evidence.items():
        print(f"  {k} = {v}")
    print()

    # ---- Exact inference ----
    t1 = time.time()
    exact = bn.enumeration_ask(X, evidence.copy())
    t2 = time.time()

    print("Exact Inference:")
    print(f"  P({X}=True)  = {exact[True]:.6f}")
    exact_time = t2 - t1
    print(f"  Time: {exact_time:.6f} s\n")

    # ---- Approx inference ----
    t3 = time.time()
    approx = bn.likelihood_weighting(X, evidence.copy(), N=N)
    t4 = time.time()

    print(f"Approximate Inference (Likelihood Weighting, N={N}):")
    print(f"  P({X}=True)  ≈ {approx[True]:.6f}")
    approx_time = t4 - t3
    print(f"  Time: {approx_time:.6f} s\n")

    # ---- Error ----
    err = abs(exact[True] - approx[True])
    print(f"Độ lệch = {err:.6f}")
    print("\n\n")
import time

def run_scenario_2(bn, scenario_name, X, evidence, N=100000):
    print("="*60)
    print(f"{scenario_name}")
    print("="*60)
    print("Evidence:")
    for k,v in evidence.items():
        print(f"  {k} = {v}")
    print()

    # ---- Exact inference ----
    t1 = time.time()
    exact = bn.enumeration_ask(X, evidence.copy())
    t2 = time.time()

    print("Exact Inference:")
    print(f"  P({X}=False) = {exact[False]:.6f}")
    exact_time = t2 - t1
    print(f"  Time: {exact_time:.6f} s\n")

    # ---- Approx inference ----
    t3 = time.time()
    approx = bn.likelihood_weighting(X, evidence.copy(), N=N)
    t4 = time.time()

    print(f"Approximate Inference (Likelihood Weighting, N={N}):")
    print(f"  P({X}=False) ≈ {approx[False]:.6f}")
    approx_time = t4 - t3
    print(f"  Time: {approx_time:.6f} s\n")

    # ---- Error ----
    err = abs(exact[True] - approx[True])
    print(f"Độ lệch = {err:.6f}")
    print("\n\n")

TEST – Dự đoán triệu chứng bệnh

Biến:

- GeneticRisk: bệnh nhân có yếu tố di truyền

- VirusExposure: mức độ phơi nhiễm virus

- ImmuneStrength: sức đề kháng

- Infection: khả năng nhiễm bệnh

- Symptom: xuất hiện triệu chứng

DAG:
```text
...diagram...


                    +------------------+
                    |   GeneticRisk    |
                    +---------+--------+
                              |
                +-------------+-------------+
                |                           |
        +---------------+           +---------------+
        | VirusExposure |           | ImmuneStrength|
        +-------+-------+           +-------+-------+
                |                           |
                +-------------+-------------+
                              |
                        +-----------+
                        | Infection |
                        +-----+-----+
                              |
                        +-----------+
                        |  Symptom  |
                        +-----------+

CPT:

...diagram...

1) P(GeneticRisk)
GeneticRisk | Probability
------------+------------
   True     |    0.20
   False    |    0.80


2) P(VirusExposure | GeneticRisk)
GeneticRisk | VirusExposure=True
------------+--------------------
   True     |       0.70
   False    |       0.30


3) P(ImmuneStrength | GeneticRisk)
GeneticRisk | ImmuneStrength=True
------------+----------------------
   True     |        0.40
   False    |        0.80


4) P(Infection | VirusExposure, ImmuneStrength)
VirusExp | ImmStr | Infection=True
---------+--------+----------------
  True   |  True  |      0.40
  True   |  False |      0.80
  False  |  True  |      0.10
  False  |  False |      0.30


5) P(Symptom | Infection)
Infection | Symptom=True
----------+--------------
   True   |     0.90
   False  |     0.20


Kịch bản:

- Xác suất bệnh nhân xuất hiện triệu chứng khi có yếu tố di truyền
- Xác suất nhiễm bệnh khi biết cả 3 yếu tố: có rủi ro di truyền + tiếp xúc virus nhiều + miễn dịch yếu
- Bệnh nhân đã xuất hiện triệu chứng, xác suất họ thật sự bị nhiễm bệnh là bao nhiêu?
- Xác suất một người không có rủi ro di truyền nhưng vẫn có triệu chứng
- Xác suất VirusExposure nếu đã biết bệnh nhân đang có triệu chứng?
- Xác suất sức đề kháng yếu khi không có triệu chứng
- Xác suất Infection nếu biết miễn dịch khỏe nhưng có yếu tố di truyền

In [None]:
def build_medical_bn():
    bn = BayesianNetwork()

    #  Tầng 1
    bn.add_node("GeneticRisk", [], 0.2)

    #  Tầng 2
    bn.add_node("VirusExposure", ["GeneticRisk"], {
        (True,): 0.7,
        (False,): 0.3
    })
    bn.add_node("ImmuneStrength", ["GeneticRisk"], {
        (True,): 0.4,
        (False,): 0.8
    })

    #  Tầng 3
    bn.add_node("Infection", ["VirusExposure", "ImmuneStrength"], {
        (True, True): 0.4,
        (True, False): 0.8,
        (False, True): 0.1,
        (False, False): 0.3
    })

    #  Tầng 4
    bn.add_node("Symptom", ["Infection"], {
        (True,): 0.9,
        (False,): 0.2
    })

    return bn

# CHẠY KỊCH BẢN

In [None]:
bn = build_medical_bn()


In [None]:
run_scenario_1(
    bn,
    "Kịch bản 1 – Triệu chứng khi có yếu tố di truyền",
    "Symptom",
    {"GeneticRisk": True}
)

Kịch bản 1 – Triệu chứng khi có yếu tố di truyền
Evidence:
  GeneticRisk = True

Exact Inference:
  P(Symptom=True)  = 0.559800
  Time: 0.000162 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(Symptom=True)  ≈ 0.560100
  Time: 0.451653 s

Độ lệch = 0.000300





In [None]:
run_scenario_1(
    bn,
    "Kịch bản 2 – Nhiễm bệnh khi Risk=True, Exposure=True, Immune=False",
    "Infection",
    {"GeneticRisk": True, "VirusExposure": True, "ImmuneStrength": False}
)

Kịch bản 2 – Nhiễm bệnh khi Risk=True, Exposure=True, Immune=False
Evidence:
  GeneticRisk = True
  VirusExposure = True
  ImmuneStrength = False

Exact Inference:
  P(Infection=True)  = 0.800000
  Time: 0.000101 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(Infection=True)  ≈ 0.800040
  Time: 0.468188 s

Độ lệch = 0.000040





In [None]:
run_scenario_1(
    bn,
    "Kịch bản 3 – Xác suất thật sự nhiễm bệnh khi đã có triệu chứng",
    "Infection",
    {"Symptom": True}
)

Kịch bản 3 – Xác suất thật sự nhiễm bệnh khi đã có triệu chứng
Evidence:
  Symptom = True

Exact Inference:
  P(Infection=True)  = 0.654658
  Time: 0.000252 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(Infection=True)  ≈ 0.654126
  Time: 0.446678 s

Độ lệch = 0.000532





In [None]:
run_scenario_1(
    bn,
    "Kịch bản 4 – Triệu chứng khi không có rủi ro di truyền",
    "Symptom",
    {"GeneticRisk": False}
)

Kịch bản 4 – Triệu chứng khi không có rủi ro di truyền
Evidence:
  GeneticRisk = False

Exact Inference:
  P(Symptom=True)  = 0.369400
  Time: 0.000202 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(Symptom=True)  ≈ 0.369420
  Time: 0.775341 s

Độ lệch = 0.000020





In [None]:
run_scenario_1(
    bn,
    "Kịch bản 5 – Bị VirusExposure khi biết bệnh nhân có triệu chứng",
    "VirusExposure",
    {"Symptom": True}
)

Kịch bản 5 – Bị VirusExposure khi biết bệnh nhân có triệu chứng
Evidence:
  Symptom = True

Exact Inference:
  P(VirusExposure=True)  = 0.538333
  Time: 0.000204 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(VirusExposure=True)  ≈ 0.537021
  Time: 0.886541 s

Độ lệch = 0.001312





In [None]:
run_scenario_2(
    bn,
    "Kịch bản 6 – ImmuneStrength yếu khi không có triệu chứng",
    "ImmuneStrength",
    {"Symptom": False}
)

Kịch bản 6 – ImmuneStrength yếu khi không có triệu chứng
Evidence:
  Symptom = False

Exact Inference:
  P(ImmuneStrength=False) = 0.200837
  Time: 0.000228 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(ImmuneStrength=False) ≈ 0.199264
  Time: 0.839718 s

Độ lệch = 0.001573





In [None]:
run_scenario_1(
    bn,
    "Kịch bản 7 – Nhiễm bệnh khi Immune=True và GeneticRisk=True",
    "Infection",
    {"ImmuneStrength": True, "GeneticRisk": True}
)

Kịch bản 7 – Nhiễm bệnh khi Immune=True và GeneticRisk=True
Evidence:
  ImmuneStrength = True
  GeneticRisk = True

Exact Inference:
  P(Infection=True)  = 0.310000
  Time: 0.000108 s

Approximate Inference (Likelihood Weighting, N=100000):
  P(Infection=True)  ≈ 0.308250
  Time: 0.796380 s

Độ lệch = 0.001750



