In [11]:
import numpy as np
import pandas as pd

from pgmpy.models import BayesianNetwork
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.inference import VariableElimination
from pgmpy.sampling import BayesianModelSampling
from pgmpy.sampling import GibbsSampling

In [17]:
def generate_synthetic_data(n_samples=1000):
    """
    一連の攻撃シナリオを想定した疑似データをランダム生成。
    下記のような構造を想定:
        Attack1 -> Attack2
        Attack1 -> Attack3
        Attack2 -> Attack4
        Attack4 -> Attack5
        Attack3 -> Attack5

    各ノードは 0(失敗) or 1(成功) の二値。
    (実運用では実際のログや検知データを用いる想定)
    """

    # Attack1 (単体) の成功確率
    p_attack1 = 0.3

    # Attack2 は Attack1 に依存
    p_attack2_if_a1 = 0.7   # Attack1=1 のとき Attack2=1 になりやすい
    p_attack2_if_not_a1 = 0.2

    # Attack3 も Attack1 に依存
    p_attack3_if_a1 = 0.6
    p_attack3_if_not_a1 = 0.1

    # Attack4 は Attack2 に依存
    p_attack4_if_a2 = 0.8
    p_attack4_if_not_a2 = 0.2

    # Attack5 は Attack3, Attack4 の両方に依存
    # ここでは4通りの確率を仮定 (それぞれ [a3, a4] = [1 or 0, 1 or 0])
    p_attack5_if_a3_a4_11 = 0.9  # Attack3=1, Attack4=1
    p_attack5_if_a3_a4_10 = 0.7  # Attack3=1, Attack4=0
    p_attack5_if_a3_a4_01 = 0.8  # Attack3=0, Attack4=1
    p_attack5_if_a3_a4_00 = 0.1  # Attack3=0, Attack4=0

    data = []
    for _ in range(n_samples):
        # Attack1
        a1 = int(np.random.rand() < p_attack1)

        # Attack2
        if a1 == 1:
            a2 = int(np.random.rand() < p_attack2_if_a1)
        else:
            a2 = int(np.random.rand() < p_attack2_if_not_a1)

        # Attack3
        if a1 == 1:
            a3 = int(np.random.rand() < p_attack3_if_a1)
        else:
            a3 = int(np.random.rand() < p_attack3_if_not_a1)

        # Attack4
        if a2 == 1:
            a4 = int(np.random.rand() < p_attack4_if_a2)
        else:
            a4 = int(np.random.rand() < p_attack4_if_not_a2)

        # Attack5 (Attack3, Attack4の両方に依存)
        if a3 == 1 and a4 == 1:
            a5 = int(np.random.rand() < p_attack5_if_a3_a4_11)
        elif a3 == 1 and a4 == 0:
            a5 = int(np.random.rand() < p_attack5_if_a3_a4_10)
        elif a3 == 0 and a4 == 1:
            a5 = int(np.random.rand() < p_attack5_if_a3_a4_01)
        else:
            a5 = int(np.random.rand() < p_attack5_if_a3_a4_00)

        data.append([a1, a2, a3, a4, a5])
    
    # pandasのDataFrameに変換
    df = pd.DataFrame(data, columns=["Attack1", "Attack2", "Attack3", "Attack4", "Attack5"])
    return df

def main():
    # ===========================
    # 0. サンプルデータの生成
    # ===========================
    df_data = generate_synthetic_data(n_samples=2000)

    # ===========================
    # 1. ベイジアンネットワークの構築 & パラメータ学習
    # ===========================
    # 構造は固定: 
    #   Attack1 -> Attack2
    #   Attack1 -> Attack3
    #   Attack2 -> Attack4
    #   Attack4 -> Attack5
    #   Attack3 -> Attack5
    model = BayesianNetwork([
        ("Attack1", "Attack2"),
        ("Attack1", "Attack3"),
        ("Attack2", "Attack4"),
        ("Attack4", "Attack5"),
        ("Attack3", "Attack5")
    ])

    # データから最大尤度推定でパラメータ(CPD)を学習
    model.fit(df_data, estimator=MaximumLikelihoodEstimator)

    print("学習が完了しました。学習された条件付き確率表 (CPD):\n")
    for cpd in model.get_cpds():
        print(cpd)

    # ===========================
    # 2. 推論フェーズ (Inference)
    # ===========================
    inference = VariableElimination(model)

    # 例1: Attack1=1 (成功) とわかったとき、Attack2, Attack3, Attack4, Attack5 はどうなるか
    print("\n=== [推論例1] Attack1=1 が観測された場合の事後分布 ===")
    observed_evidence = {"Attack1": 1}
    query_vars = ["Attack2", "Attack3", "Attack4", "Attack5"]
    for var in query_vars:
        q = inference.query(variables=[var], evidence=observed_evidence)
        # print(f"P({var}=1 | Attack1=1) = {q[var].values[1]:.4f} ( 0→{q[var].values[0]:.4f}, 1→{q[var].values[1]:.4f} )")

    
    # 例2: Attack3=1, Attack4=0 が同時に観測されたときに Attack5 はどうなるか
    print("\n=== [推論例2] Attack3=1, Attack4=0 が観測された場合の事後分布 ===")
    observed_evidence2 = {"Attack3": 1}
    query_var2 = ["Attack5"]
    for var in query_var2:
        q2 = inference.query(variables=[var], evidence=observed_evidence2)
        # print(f"P({var}=1 | Attack3=1, Attack4=0) = {q2[var].values[1]:.4f}")

    # ===========================
    # 3. サンプリング
    # ===========================
    print("\n=== [サンプリング例] 学習済みモデルから攻撃パターンを生成 ===")
    sampler = GibbsSampling(model)
    samples = sampler.sample(size=5)
    print(samples)



In [18]:
main()

学習が完了しました。学習された条件付き確率表 (CPD):

+------------+--------+
| Attack1(0) | 0.7135 |
+------------+--------+
| Attack1(1) | 0.2865 |
+------------+--------+
+------------+---------------------+--------------------+
| Attack1    | Attack1(0)          | Attack1(1)         |
+------------+---------------------+--------------------+
| Attack2(0) | 0.8009810791871058  | 0.2949389179755672 |
+------------+---------------------+--------------------+
| Attack2(1) | 0.19901892081289418 | 0.7050610820244329 |
+------------+---------------------+--------------------+
+------------+---------------------+---------------------+
| Attack1    | Attack1(0)          | Attack1(1)          |
+------------+---------------------+---------------------+
| Attack3(0) | 0.8990889978976875  | 0.41535776614310643 |
+------------+---------------------+---------------------+
| Attack3(1) | 0.10091100210231255 | 0.5846422338568935  |
+------------+---------------------+---------------------+
+------------+------------+---



100%|██████████| 4/4 [00:00<00:00, 2049.00it/s]

   Attack1  Attack2  Attack3  Attack4  Attack5
0        0        1        0        0        0
1        0        0        0        0        0
2        0        1        0        0        0
3        0        0        0        0        0
4        0        0        0        0        0



