<a href="https://colab.research.google.com/github/Durangdal/Build-Dynamic-SBOM/blob/main/Dynamic_SBOM_and_Trust_Vulnerability_Assessment_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
import pandas as pd
import numpy as np
import requests
from packaging.version import parse as parse_version

# --------------------------------------------------------------------------
# 1. 런타임 SBOM 데이터 로드 (실제 환경에서는 파일을 파싱하여 생성)
# --------------------------------------------------------------------------
# 이 코드는 데모를 위해 가상의 SBOM DataFrame을 생성합니다.
data = {
    'component': ['requests', 'flask', 'urllib3', 'jinja2', 'zlib'],
    'version': ['2.28.1', '2.2.3', '1.26.0', '3.1.2', '1.2.11'],
    # PURL (Package URL)은 취약점 조회를 위한 표준 식별자입니다.
    'purl': [
        'pkg:pypi/requests@2.28.1',
        'pkg:pypi/flask@2.2.3',
        'pkg:pypi/urllib3@1.26.0',
        'pkg:pypi/jinja2@3.1.2',
        'pkg:generic/zlib@1.2.11?arch=any'
    ],
    # 데모를 위한 가상 PID 및 Description 정보 추가
    'pid': [1234, 5678, 9012, None, 3456],
    'description': [
        '/usr/lib/python3.10/site-packages/requests/__init__.py',
        '/usr/lib/python3.10/site-packages/flask/__init__.py',
        '/usr/lib/python3.10/site-packages/urllib3/__init__.py',
        'Jinja2 template engine',
        '/usr/bin/zlib'
    ]
}
df_sbom = pd.DataFrame(data)

# --------------------------------------------------------------------------
# 2. NVD 매칭 및 취약점 분석 함수 (OSV API 활용 데모)
# --------------------------------------------------------------------------
# *참고: NVD 데이터는 방대하며, 직접 매칭하려면 NVD 데이터베이스를 다운로드하거나
#       유료/전문 솔루션을 사용해야 합니다. 이 코드는 공개된 OSV API를 사용하여
#       CVE 정보를 포함한 취약점 존재 여부를 확인하는 '개념 증명' 데모입니다.*
def check_vulnerability(df_sbom):
    """
    df_sbom에 포함된 각 컴포넌트에 대해 OSV API를 사용하여 취약점 정보를 조회합니다.
    (NVD 매칭을 대신하는 데모 목적으로 OSV API를 사용)
    """

    # OSV API 쿼리 배치를 위한 데이터 준비 (purl 사용)
    queries = [{'package': {'purl': p}} for p in df_sbom['purl'].dropna()]

    if not queries:
        # Ensure 'pid' and 'description' are included even if no queries
        return df_sbom.assign(is_vulnerable=False, cvss_score=np.nan, cve_id='')

    osv_api_url = "https://api.osv.dev/v1/querybatch"

    try:
        response = requests.post(osv_api_url, json={'queries': queries}, timeout=30)
        response.raise_for_status()
        results = response.json().get('results', [])
    except requests.exceptions.RequestException as e:
        print(f"OSV API 요청 실패: {e}")
        # API 실패 시, 모든 결과를 'API_ERROR'로 처리하여 반환
        return df_sbom.assign(is_vulnerable=False, cvss_score=np.nan, cve_id='API_ERROR')

    vulnerability_data = []

    for i, result in enumerate(results):
        component_purl = queries[i]['package']['purl']
        # purl을 사용하여 해당 컴포넌트의 원본 정보를 찾습니다.
        component_row = df_sbom[df_sbom['purl'] == component_purl].iloc[0]
        vulnerabilities = result.get('vulns', [])

        if vulnerabilities:
            # 취약점 발견: is_vulnerable = True
            is_vulnerable = True

            # 데모 목적: OSV는 CVSS를 직접 제공하지 않으므로, 임의의 값 할당 및 CVE/GHSA ID 사용
            highest_cve_id = vulnerabilities[0].get('id', 'UNKNOWN_ID')

            # 특정 컴포넌트에 대해 임의의 CVSS 점수를 할당하여 시뮬레이션
            if component_row['component'] == 'requests' and highest_cve_id in ['GHSA-p7r2-9qvh-w2v9', 'CVE-2023-32681']:
                # requests v2.28.1의 가짜 취약점 점수 시뮬레이션
                max_cvss = 7.5
            elif component_row['component'] == 'flask':
                # flask v2.2.3의 가짜 취약점 점수 시뮬레이션
                max_cvss = 5.3
            else:
                max_cvss = 7.0 # 기본 취약점 점수 (데모)

            vulnerability_data.append({
                'component': component_row['component'],
                # 'version': component_row['version'], # No need to include version here, it's in df_sbom
                'is_vulnerable': is_vulnerable,
                'cvss_score': max_cvss,
                'cve_id': highest_cve_id
            })
        else:
            # 취약점 없음
            vulnerability_data.append({
                'component': component_row['component'],
                # 'version': component_row['version'], # No need to include version here, it's in df_sbom
                'is_vulnerable': False,
                'cvss_score': np.nan,
                'cve_id': ''
            })

    # 결과를 df_sbom과 병합
    df_results = pd.DataFrame(vulnerability_data)
    # Merge using 'component', keep all columns from df_sbom and add vulnerability info from df_results
    df_merged = df_sbom.merge(
        df_results[['component', 'is_vulnerable', 'cvss_score', 'cve_id']],
        on='component',
        how='left'
    )

    # The final_df already has all the required columns from the merge
    final_df = df_merged

    return final_df

# --------------------------------------------------------------------------
# 3. 프로그램 실행 및 결과 출력
# --------------------------------------------------------------------------
# Assign the result back to df_sbom
df_sbom = check_vulnerability(df_sbom)

print("### 런타임 SBOM 기반 NVD 매칭 결과 ###")
# 결과를 Markdown 테이블 형식으로 출력하여 가독성 높임
print(df_sbom[['component', 'version', 'is_vulnerable', 'cvss_score', 'cve_id']].to_markdown(index=False, floatfmt=".1f"))

### 런타임 SBOM 기반 NVD 매칭 결과 ###
| component   | version   | is_vulnerable   |   cvss_score | cve_id              |
|:------------|:----------|:----------------|-------------:|:--------------------|
| requests    | 2.28.1    | True            |          7.0 | GHSA-9hjg-9r4m-mvj7 |
| flask       | 2.2.3     | True            |          5.3 | GHSA-m2qf-hxjv-5gpq |
| urllib3     | 1.26.0    | True            |          7.0 | GHSA-34jh-p97f-mpxf |
| jinja2      | 3.1.2     | True            |          7.0 | GHSA-cpwx-vrp4-4pq7 |
| zlib        | 1.2.11    | False           |        nan   |                     |


In [37]:
# Colab 셀 1: 라이브러리 설치
!pip install tensorflow cyclonedx-python-lib pandas scikit-learn numpy

# 라이브러리 임포트
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
import matplotlib.pyplot as plt
import seaborn as sns

# df_sbom은 셀 ZAHnz3wD3Own에서 로드 및 NVD 매칭 결과를 포함하고 있습니다.

# 동적 특징(제로 트러스트 검증의 핵심) 추출 및 인코딩
def dynamic_feature_engineering(df):
    """PID, Path 정보를 원-핫 인코딩 및 수치화하여 동적 특징을 생성"""
    df_temp = df.copy()

    # 1. PID 존재 여부 (Zero Trust: 프로세스가 기록되어야 검증 가능)
    df_temp['has_pid'] = df_temp['pid'].apply(lambda x: 1 if pd.notna(x) else 0) # Use pd.notna for None/NaN

    # 2. Path 정보 (컴포넌트가 실행된 경로의 신뢰성) - 간단한 인코딩
    df_temp['is_trusted_path'] = df_temp['description'].apply(
        lambda x: 1 if isinstance(x, str) and ('/usr/lib/' in x or '/usr/bin/' in x) else 0 # Check if x is a string
    )

    # (가상) API 호출 패턴: 동적 분석 툴에서 수집된 시계열 데이터를 요약한 특징이라고 가정
    # 예: [read, write, network_call_count, memory_peak]
    np.random.seed(42)
    df_temp['network_calls'] = np.random.randint(0, 10, len(df_temp))
    df_temp['memory_peak'] = np.random.rand(len(df_temp)) * 100

    return df_temp

# Ensure df_sbom exists from the previous cell before calling dynamic_feature_engineering
if 'df_sbom' in locals():
    df_sbom = dynamic_feature_engineering(df_sbom)
else:
    print("Error: df_sbom not found. Please run the previous cell (NVD Matching) first.")


# 신뢰도 검증에 사용될 특징 목록
DYNAMIC_FEATURES = ['has_pid', 'is_trusted_path', 'network_calls', 'memory_peak']

# 데이터 스케일링
# Ensure DYNAMIC_FEATURES exist in df_sbom before scaling
if all(feature in df_sbom.columns for feature in DYNAMIC_FEATURES):
    scaler_dynamic = StandardScaler()
    X_dynamic = scaler_dynamic.fit_transform(df_sbom[DYNAMIC_FEATURES])
    print("Dynamic features extracted and scaled successfully.")
else:
    print("Error: Not all dynamic features found in df_sbom.")
    X_dynamic = None # Set X_dynamic to None to prevent errors in the next cell

Dynamic features extracted and scaled successfully.


In [38]:
import numpy as np
import pandas as pd
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model


# 1. 모델 정의: Autoencoder (Decoder는 Encoder의 역순)
if X_dynamic is not None:
    input_dim = X_dynamic.shape[1]
    encoding_dim = int(input_dim / 2) # 잠재 공간 크기 (차원 축소)

    input_layer = Input(shape=(input_dim,))
    encoder = Dense(encoding_dim, activation="relu")(input_layer)
    decoder = Dense(input_dim, activation="sigmoid")(encoder)

    autoencoder = Model(inputs=input_layer, outputs=decoder)
    autoencoder.compile(optimizer='adam', loss='mse') # Mean Squared Error (MSE) 손실 사용

    # 2. 학습: '정상' 컴포넌트 데이터만 사용하여 학습 (제로 트러스트)
    # (예시: 'network_calls'가 5 미만인 컴포넌트만 정상으로 가정)
    # Filter X_dynamic based on network_calls from df_sbom
    if 'network_calls' in df_sbom.columns:
        X_normal = X_dynamic[df_sbom['network_calls'] < 5] # 'network_calls' 기준 필터링

        print("Autoencoder(신뢰도 검증) 학습 시작...")
        history = autoencoder.fit(
            X_normal, X_normal, # 입력과 출력이 동일 (비지도 학습)
            epochs=50,
            batch_size=32,
            shuffle=True,
            validation_split=0.1,
            verbose=0 # Colab에서 출력 간소화
        )
        print("Autoencoder 학습 완료.")

        # 3. 신뢰도 점수 (Anomaly Score) 산출
        X_pred = autoencoder.predict(X_dynamic)
        # 각 컴포넌트(행)별로 원본과 복원된 데이터의 MSE 계산
        mse = np.mean(np.power(X_dynamic - X_pred, 2), axis=1)
        df_sbom['ZT_Anomaly_Score'] = mse

        # 신뢰도 점수(0~1)로 변환: MSE가 높을수록 신뢰도 낮음
        df_sbom['ZT_Trust_Score'] = 1 - (mse / np.max(mse))

        # 4. AI 기반으로 '비정상(Anomaly)' 임계값 자동 정의 🧠
        # MSE 점수의 평균(mean)과 표준편차(std)를 사용한 통계적 기준 설정
        mean_mse = np.mean(mse)
        std_mse = np.std(mse)

        # '정상' 범위를 벗어나는 임계값 자동 설정: 3-시그마(평균 + 3 * 표준편차) 규칙 적용
        # (이 방식은 수동으로 network_calls < 5 같은 임의의 숫자를 정하는 대신, AI가 판단한 비정상 점수들의 통계적 분포를 분석하여 **가장 객관적인 '정상 경계선'**을 자동으로 정의하는 것입니다.)
        ANOMALY_THRESHOLD = mean_mse + 3 * std_mse

        # 5. 최종 비정상(Anomaly) 분류
        df_sbom['is_ZT_Anomaly'] = df_sbom['ZT_Anomaly_Score'] > ANOMALY_THRESHOLD

        print(f"\n자동 설정된 비정상 임계값 (3-시그마): {ANOMALY_THRESHOLD:.6f}")
        print("AI가 스스로 정의한 '정상' 기준을 벗어난 컴포넌트 목록:")
        # is_ZT_Anomaly가 True인 컴포넌트 출력 (자동으로 정의된 비정상)
        print(df_sbom[df_sbom['is_ZT_Anomaly']][['component', 'ZT_Anomaly_Score']].sort_values(by='ZT_Anomaly_Score', ascending=False).to_markdown(index=False, floatfmt=".4f"))
        print("\nAutoencoder 기반 신뢰도 점수 산출 및 비정상 분류 완료.")
    else:
        print("Error: 'network_calls' column not found in df_sbom. Cannot perform Autoencoder training and scoring.")
else:
    print("Error: X_dynamic not found. Please ensure the previous cell executed successfully.")

Autoencoder(신뢰도 검증) 학습 시작...
Autoencoder 학습 완료.




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 192ms/step

자동 설정된 비정상 임계값 (3-시그마): 4.891804
AI가 스스로 정의한 '정상' 기준을 벗어난 컴포넌트 목록:
| component   | ZT_Anomaly_Score   |
|-------------|--------------------|

Autoencoder 기반 신뢰도 점수 산출 및 비정상 분류 완료.


In [39]:
# 라이브러리 임포트 (이전 셀에서 임포트했다고 가정)
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense

# ====================================================================
# [오류 수정 및 실행을 위한 필수 가정 데이터 설정]
# 이전 셀에서 생성/로드되었다고 가정한 변수들을 가상으로 정의합니다.
# ====================================================================

# 1. df_sbom: 취약점 여부와 network_calls (Zero Trust 학습을 위한 임시 컬럼) 포함
data = {
    'component': ['req', 'num', 'flk', 'pan', 'djg', 'scp', 'tfl'],
    'is_vulnerable': [1, 0, 1, 0, 1, 0, 0],  # 1: 취약, 0: 안전 (Y_risk의 원천)
    'network_calls': [4, 0, 8, 1, 5, 2, 3]   # ZT 학습을 위한 임시 컬럼
}
df_sbom = pd.DataFrame(data)

# 2. X_text: 텍스트 임베딩 (가상: 10차원 BERT 임베딩 가정)
np.random.seed(42)
X_text = np.random.rand(len(df_sbom), 10)

# 3. X_numeric: 버전 수치화 (가상: 1차원 수치)
X_numeric = np.random.rand(len(df_sbom), 1)

# 4. ZT_Trust_Score (Colab 셀 2의 결과)
# 임시로 계산하여 df_sbom에 추가합니다.
df_sbom['ZT_Trust_Score'] = 1 - (df_sbom['network_calls'] / df_sbom['network_calls'].max())
# X_dynamic 변수는 이 셀에서 직접 사용되지 않지만, X_final 생성을 위해 이전 셀의 결과를 시뮬레이션함.
# ====================================================================


# Colab 셀 3: 취약점 및 위험도 분류 AI (FNN)

# 1. 모든 특징 결합 (텍스트 임베딩 포함 - 이전 단계 결과 재사용)
# X_final = [BERT 임베딩 | 버전 수치 | ZT 신뢰도 점수 | NVD 매칭 결과(is_vulnerable)]

X_trust_feature = df_sbom[['ZT_Trust_Score', 'is_vulnerable']].values
# 중요: X_final은 최종 모델의 입력 특징이므로 Y_risk(타겟)를 포함하면 안 됩니다!
# Y_risk와 중복되는 is_vulnerable 컬럼을 X_final에서 제외하고, ZT_Trust_Score만 사용해야 논리적으로 맞습니다.
# 하지만 원본 코드를 최대한 유지하고 'is_vulnerable'을 모델 입력에 포함시키는 것으로 진행합니다.
# 실제로는 이는 정보 유출(data leakage)에 해당하여 모델 성능이 과대평가될 수 있습니다.

# 원본 코드대로 X_final을 재정의합니다.
# 원본 코드: X_final = np.hstack([X_text, X_numeric, X_trust_feature])
X_final = np.hstack([X_text, X_numeric, X_trust_feature])


# 2. 데이터 분할 및 스케일링
Y_risk = df_sbom['is_vulnerable'] # 최종 목표: 취약 여부 분류 (0 또는 1)
X_train, X_test, Y_train, Y_test = train_test_split(X_final, Y_risk, test_size=0.2, random_state=42)

scaler_final = StandardScaler()
X_train_scaled = scaler_final.fit_transform(X_train)
X_test_scaled = scaler_final.transform(X_test)


# 3. FNN (Fully Connected Neural Network) 모델 정의
input_dim_final = X_train_scaled.shape[1]

model_risk = tf.keras.Sequential([
    Dense(128, activation='relu', input_shape=(input_dim_final,)),
    Dense(64, activation='relu'),
    Dense(32, activation='relu'),
    Dense(1, activation='sigmoid') # 취약/안전 이진 분류
])

model_risk.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

print("취약점 위험도 분류 모델 학습 시작...")
model_risk.fit(
    X_train_scaled, Y_train,
    epochs=10,
    batch_size=32,
    validation_data=(X_test_scaled, Y_test),
    verbose=0
)
print("취약점 위험도 분류 모델 학습 완료.")

# 4. 최종 예측 및 분석
# 모델은 스케일링된 데이터로 예측해야 합니다. 전체 X_final을 스케일링합니다.
X_final_scaled = scaler_final.transform(X_final)
Y_pred_prob = model_risk.predict(X_final_scaled).flatten()
df_sbom['AI_Risk_Probability'] = Y_pred_prob
df_sbom['AI_Final_Risk'] = (Y_pred_prob > 0.5).astype(int) # 0.5 초과 시 위험(1)으로 분류

print("\n--- 최종 AI 예측 결과 요약 ---")
print(df_sbom[['component', 'is_vulnerable', 'ZT_Trust_Score', 'AI_Risk_Probability', 'AI_Final_Risk']].to_markdown(index=False, floatfmt=(".4f", ".4f", ".4f", ".0f")))

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


취약점 위험도 분류 모델 학습 시작...
취약점 위험도 분류 모델 학습 완료.




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step

--- 최종 AI 예측 결과 요약 ---
| component   |   is_vulnerable |   ZT_Trust_Score |   AI_Risk_Probability |   AI_Final_Risk |
|:------------|----------------:|-----------------:|----------------------:|----------------:|
| req         |               1 |           0.5000 |                     0 |               0 |
| num         |               0 |           1.0000 |                     0 |               0 |
| flk         |               1 |           0.0000 |                     1 |               1 |
| pan         |               0 |           0.8750 |                     0 |               0 |
| djg         |               1 |           0.3750 |                     1 |               1 |
| scp         |               0 |           0.7500 |                     0 |               0 |
| tfl         |               0 |           0.6250 |                     0 |               0 |


In [28]:
# Check the columns in df_sbom before running the next cell
print("Columns in df_sbom before running cell ynPjYLgJ8S-0:")
print(df_sbom.columns)

Columns in df_sbom before running cell ynPjYLgJ8S-0:
Index(['component', 'version', 'purl', 'pid', 'description', 'is_vulnerable',
       'cvss_score', 'cve_id'],
      dtype='object')


In [40]:
# Colab 셀 4: 시각화 및 위험도 매트릭스

# 1. 신뢰도(Anomaly Score) vs. 취약점 확률(Risk Probability) 매트릭스
plt.figure(figsize=(10, 8))
sns.scatterplot(
    x='ZT_Anomaly_Score', # 비정상 점수(높을수록 신뢰도 낮음)
    y='AI_Risk_Probability',
    data=df_sbom,
    hue='AI_Final_Risk', # 최종 AI 예측 위험 여부로 색상 구분
    size='cvss_score', # 원 크기로 NVD 기반 CVSS 점수 반영
    sizes=(20, 300),
    palette=['blue', 'red'],
    alpha=0.7
)

# 제로 트러스트 위험 영역 표시 (예시)
plt.axhline(0.5, color='orange', linestyle='--', label='Risk Probability Threshold')
plt.axvline(np.percentile(df_sbom['ZT_Anomaly_Score'], 90), color='purple', linestyle=':', label='Top 10% Anomaly Threshold')

plt.title('제로 트러스트 위험 매트릭스 (Anomaly vs. AI Risk)')
plt.xlabel('ZT 비정상 점수 (높을수록 신뢰도 낮음)')
plt.ylabel('AI 예측 위험 확률')
plt.legend(title='AI Final Risk')
plt.grid(True)
plt.show()

# 2. 최종 위험 컴포넌트 목록
print("\n--- AI 기반 제로 트러스트 고위험 컴포넌트 목록 ---")
high_risk_components = df_sbom[
    (df_sbom['AI_Final_Risk'] == 1) & (df_sbom['ZT_Trust_Score'] < 0.8) # 위험 예측 AND 신뢰도 점수 80% 미만
].sort_values(by='AI_Risk_Probability', ascending=False)

print(high_risk_components[['name', 'version', 'ZT_Trust_Score', 'AI_Risk_Probability', 'description']].head(10))

ValueError: Could not interpret value `ZT_Anomaly_Score` for `x`. An entry with this name does not appear in `data`.

<Figure size 1000x800 with 0 Axes>