# CPE Format Converter (POC) example Jupyter Notebook

This is a proof of concept for a CPE Format Converter.

## Description

Due to time constraints, the construction logic and thought process of this Proof of Concept (PoC) will be initially presented in this Jupyter Notebook. Further development and maintenance will continue to implement more comprehensive functionalities, such as automation and a Web User Interface.

> 由於時間限制，此概念驗證（PoC）的建構邏輯與思路將以本 Jupyter Notebook 的形式初步呈現。後續將持續進行開發與維護，以實現自動化、Web 使用者介面等更完善的功能。

## Design

This Proof of Concept (PoC) aims to provide users with a tool to search for vulnerabilities associated with specific products by inputting Vendor, Product, and Version information. The tool will interact with the National Vulnerability Database (NVD) Common Vulnerabilities and Exposures (CVE) database to retrieve relevant CVE vulnerability data. Since the NVD Vulnerability API (CVE API) uses Common Platform Enumeration (CPE) for searching, user input needs to be converted into CPE format. However, while the CPE format has a standardized specification, there is no standard conversion process, and it cannot be guaranteed that the converted CPE is included in the NVD. To address this issue, we will use the NVD's CPE Directory List as a foundation to convert user input and obtain or generate the correct CPE, subsequently using the NVD Vulnerability API to retrieve further vulnerability data.

> 本概念驗證（PoC）旨在提供使用者一個工具，透過輸入廠商（Vendor）、產品（Product）和版本（Version）資訊，搜尋相關產品的漏洞（Vulnerability）。此工具將與美國國家漏洞資料庫（NVD）的通用漏洞披露（CVE）資料庫互動，取得相關的 CVE 漏洞資訊。由於 NVD 的漏洞應用程式介面（CVE API）使用通用平台列舉（CPE）進行搜尋，因此需要將使用者輸入轉換為 CPE 格式。然而，儘管 CPE 格式具有標準化的規範，卻缺乏標準的轉換流程，且無法確定轉換後的 CPE 是否已被 NVD 收錄。為解決此問題，我們將利用 NVD 提供的 CPE 目錄列表（CPE Directory List）作為基礎，進行使用者輸入的轉換，並取得或產生正確的 CPE，進而使用 NVD 的漏洞應用程式介面取得後續的漏洞資料。

## Usage

We will leverage the interactive features of Jupyter Notebook to provide detailed explanations of the operational process through segmented code, allowing users to reproduce the same operational results by executing this notebook. Please follow the subsequent operational instructions and code comments for adjustments and execution.

> 我們將利用 Jupyter Notebook 的互動式特性，以分段程式碼的方式詳細說明操作流程，並允許使用者透過執行本 Notebook 複現相同的操作結果。請依照後續的操作說明與程式碼註解進行調整與執行。

## Step 1: Get Offical CPE Directory List

Please manually download the CPE Directory List from the official National Vulnerability Database (NVD) website and place the file in the `data` directory of the project repository (Repo). If you have any questions about the operation, please refer to the README file in the `data` directory.

> 請手動自美國國家漏洞資料庫（NVD）官方網站下載 CPE 目錄列表（CPE Directory List），並將檔案放置於專案儲存庫（Repo）的 `data` 目錄中。若對操作方式有疑問，請參閱 `data` 目錄中的 README 檔案。

## Step 2: Decompression and Data Pre-processing

The Common Platform Enumeration (CPE) list provided by the National Vulnerability Database (NVD) is in XML format. While detailed and comprehensive, its structure is not ideal for direct import into Chromadb for subsequent operations. Therefore, we will filter and organize the XML content of the CPE list and output it in Comma-Separated Values (CSV) format.

(Converting to CSV format is a non-essential transitional step, primarily to facilitate the review of the organized XML data and accelerate data utilization and analysis.)

The resulting structured CSV data (or Dataframe) will be processed and imported into Chromadb in the subsequent steps.


> 由於美國國家漏洞資料庫（NVD）提供的通用平台列舉（CPE）列表為 XML 格式，其結構雖詳盡完整，但不利於直接匯入 Chromadb 進行後續操作。因此，我們將對 CPE 列表的 XML 內容進行篩選與整理，並轉換為逗號分隔值（CSV）格式輸出。
> 
> （轉換為 CSV 格式為一非必要的過渡步驟，主要目的是為了方便檢視整理後的 XML 資料，以加速資料的使用與分析。）
> 
> 轉換後的 CSV 結構化資料（或 Dataframe）將於後續步驟中進行計算並匯入 Chromadb。

In [3]:
from lxml import etree
import json
import csv
from typing import Dict, List, Iterator, Optional, Tuple
import logging
from pathlib import Path
import re
from dataclasses import dataclass, asdict
from contextlib import contextmanager


@dataclass
class CPEItem:
    """代表單個 CPE 項目的資料結構"""
    cpe22Uri: str
    cpe23Uri: str
    vendor: str
    product: str
    version: str
    title: str
    references: str


class CPEParser:
    """CPE XML 字典的解析器，用於將 CPE 項目轉換為結構化資料"""

    # CPE 命名空間定義
    NAMESPACES = {
        'cpe-23': 'http://scap.nist.gov/schema/cpe-extension/2.3',
        'def': 'http://cpe.mitre.org/dictionary/2.0',
        'xml': 'http://www.w3.org/XML/1998/namespace'
    }

    def __init__(self, xml_file_path: str):
        """初始化 CPE 解析器

        Args:
            xml_file_path (str): XML 檔案路徑
        """
        self.xml_file_path = Path(xml_file_path)
        self.logger = logging.getLogger(__name__)

        if not self.xml_file_path.exists():
            raise FileNotFoundError(f"找不到 XML 檔案: {xml_file_path}")

    @staticmethod
    def parse_cpe23_uri(cpe23uri: str) -> Dict[str, str]:
        """解析 CPE 2.3 URI 並提取各個組件

        Args:
            cpe23uri (str): CPE 2.3 URI 字串 (例如: cpe:2.3:a:vendor:product:version:...)

        Returns:
            Dict[str, str]: 包含廠商、產品和版本資訊的字典
        """
        # CPE 2.3 格式: cpe:2.3:part:vendor:product:version:update:edition:language:sw_edition:target_sw:target_hw:other
        components = cpe23uri.split(':')

        if len(components) < 6:
            return {"vendor": "", "product": "", "version": ""}

        return {
            'vendor': components[3],
            'product': components[4],
            'version': components[5]
        }

    @contextmanager
    def _open_xml_context(self) -> Iterator:
        """創建用於迭代 XML 檔案中 CPE 項目的上下文管理器

        Yields:
            Iterator: 用於迭代 XML 檔案中 CPE 項目的迭代器
        """
        try:
            context = etree.iterparse(
                str(self.xml_file_path),
                events=('end',),
                tag=f'{{{self.NAMESPACES["def"]}}}cpe-item'
            )
            yield context
        except etree.ParseError as e:
            self.logger.error(f"XML 解析錯誤: {e}")
            raise
        except Exception as e:
            self.logger.error(f"處理 XML 時發生未預期的錯誤: {e}")
            raise

    def _extract_item_data(self, item: etree._Element) -> Optional[CPEItem]:
        """從 XML 元素中提取 CPE 項目資料

        Args:
            item (etree._Element): 包含 CPE 項目資料的 XML 元素

        Returns:
            Optional[CPEItem]: 如果成功提取則返回 CPEItem 物件，否則返回 None
        """
        try:
            # 取得 CPE 2.3 URI
            cpe23_item = item.find('.//cpe-23:cpe23-item', self.NAMESPACES)
            if cpe23_item is None:
                return None

            cpe23uri = cpe23_item.get('name', '')
            if not cpe23uri:
                return None

            # 解析 CPE 組件
            cpe_components = self.parse_cpe23_uri(cpe23uri)

            # 取得標題
            title_elem = item.find(
                './/def:title[@xml:lang="en-US"]', self.NAMESPACES)
            title = title_elem.text if title_elem is not None else ""

            # 取得參考資料
            references = self._extract_references(item)

            # 建立 CPE 項目物件
            return CPEItem(
                cpe22Uri=item.get('name', ''),
                cpe23Uri=cpe23uri,
                vendor=cpe_components['vendor'],
                product=cpe_components['product'],
                version=cpe_components['version'],
                title=title,
                references=references
            )
        except Exception as e:
            self.logger.warning(f"提取 CPE 項目時發生錯誤: {e}")
            return None

    def _extract_references(self, item: etree._Element) -> str:
        """從 XML 元素中提取參考資料

        Args:
            item (etree._Element): 包含參考資料的 XML 元素

        Returns:
            str: 格式化的參考資料字串
        """
        refs = item.findall('.//def:reference', self.NAMESPACES)
        ref_texts = []

        for ref in refs:
            href = ref.get('href', '')
            ref_text = ref.text or ''

            if href and ref_text:
                ref_texts.append(f"{ref_text} ({href})")
            elif href:
                ref_texts.append(href)
            elif ref_text:
                ref_texts.append(ref_text)

        return ' '.join(ref_texts)

    def _clear_element_memory(self, item: etree._Element) -> None:
        """清理 XML 元素佔用的記憶體

        Args:
            item (etree._Element): 要清理的 XML 元素
        """
        item.clear()
        # 清理前面的同層級元素以釋放記憶體
        while item.getprevious() is not None:
            del item.getparent()[0]

    def parse_xml(self) -> List[Dict]:
        """解析整個 CPE XML 檔案

        Returns:
            List[Dict]: 解析後的 CPE 資料列表
        """
        self.logger.info(f"開始解析 XML 檔案: {self.xml_file_path}")
        cpe_items = []

        with self._open_xml_context() as context:
            for event, item in context:
                cpe_item = self._extract_item_data(item)
                if cpe_item:
                    cpe_items.append(asdict(cpe_item))

                self._clear_element_memory(item)

        self.logger.info(f"解析完成，共提取 {len(cpe_items)} 個 CPE 項目")
        return cpe_items

    def process_by_chunk(self, chunk_size: int = 1000) -> Iterator[List[Dict]]:
        """使用分批處理方式處理大型 XML 檔案

        Args:
            chunk_size (int, optional): 每批處理的項目數量. 預設為 1000.

        Yields:
            List[Dict]: 每批處理的 CPE 項目列表
        """
        self.logger.info(f"開始分批處理 XML 檔案 (每批 {chunk_size} 個項目)")
        items = []
        total_processed = 0

        with self._open_xml_context() as context:
            for event, item in context:
                cpe_item = self._extract_item_data(item)
                if cpe_item:
                    items.append(asdict(cpe_item))

                self._clear_element_memory(item)

                if len(items) >= chunk_size:
                    total_processed += len(items)
                    self.logger.debug(f"已處理 {total_processed} 個項目")
                    yield items
                    items = []

            if items:
                total_processed += len(items)
                self.logger.debug(f"已處理 {total_processed} 個項目")
                yield items

        self.logger.info(f"分批處理完成，總共處理 {total_processed} 個項目")

    def save_to_json(self, output_file: str, items: Optional[List[Dict]] = None) -> None:
        """將解析結果儲存為 JSON 格式

        Args:
            output_file (str): 輸出檔案路徑
            items (Optional[List[Dict]], optional): 要儲存的項目列表，如果為 None 則會解析 XML 檔案. 預設為 None.
        """
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        try:
            cpe_items = items if items is not None else self.parse_xml()

            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(cpe_items, f, ensure_ascii=False, indent=2)

            self.logger.info(
                f"已成功儲存 {len(cpe_items)} 個項目至 JSON 檔案: {output_path}")

        except Exception as e:
            self.logger.error(f"儲存 JSON 檔案時發生錯誤: {e}")
            raise

    def save_to_csv(self, output_file: str, items: Optional[List[Dict]] = None) -> None:
        """將解析結果儲存為 CSV 格式

        Args:
            output_file (str): 輸出檔案路徑
            items (Optional[List[Dict]], optional): 要儲存的項目列表，如果為 None 則會解析 XML 檔案. 預設為 None.
        """
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        try:
            cpe_items = items if items is not None else self.parse_xml()
            fieldnames = ['cpe22Uri', 'cpe23Uri', 'vendor', 'product',
                          'version', 'title', 'references']

            with open(output_path, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(cpe_items)

            self.logger.info(
                f"已成功儲存 {len(cpe_items)} 個項目至 CSV 檔案: {output_path}")

        except Exception as e:
            self.logger.error(f"儲存 CSV 檔案時發生錯誤: {e}")
            raise


def setup_logging(log_level=logging.INFO, log_file=None):
    """設定日誌記錄

    Args:
        log_level (int, optional): 日誌等級. 預設為 logging.INFO.
        log_file (str, optional): 日誌檔案路徑，如果為 None 則只會輸出到控制台. 預設為 None.
    """
    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

    # 建立日誌處理器
    handlers = []
    handlers.append(logging.StreamHandler())  # 控制台輸出

    if log_file:
        log_dir = Path(log_file).parent
        log_dir.mkdir(parents=True, exist_ok=True)
        handlers.append(logging.FileHandler(
            log_file, encoding='utf-8'))  # 檔案輸出

    # 設定根日誌記錄器
    logging.basicConfig(
        level=log_level,
        format=log_format,
        handlers=handlers
    )


def main():
    """主程式進入點"""
    # 設定日誌
    log_file = "../logs/cpe_parser.log"
    setup_logging(log_level=logging.INFO, log_file=log_file)
    logger = logging.getLogger(__name__)

    # 設定檔案路徑
    input_file = "../data/official-cpe-dictionary_v2.3.xml"
    output_dir = Path("../data/convert")
    output_dir.mkdir(parents=True, exist_ok=True)

    output_json = output_dir / "official-cpe-dictionary_v2.3_v2.json"
    output_csv = output_dir / "official-cpe-dictionary_v2.3_v2.csv"

    try:
        logger.info(f"開始處理 CPE 字典檔案: {input_file}")
        parser = CPEParser(input_file)

        # 使用分批處理解析 XML 檔案並儲存結果
        all_items = []
        total_processed = 0

        for chunk in parser.process_by_chunk(chunk_size=1000):
            all_items.extend(chunk)
            total_processed += len(chunk)
            logger.info(f"已處理 {total_processed} 個項目")

        # 儲存為 JSON 和 CSV
        parser.save_to_json(str(output_json), all_items)
        parser.save_to_csv(str(output_csv), all_items)

        logger.info("程式執行完成")

    except FileNotFoundError as e:
        logger.error(f"找不到檔案: {e}")
    except Exception as e:
        logger.error(f"程式執行失敗: {e}", exc_info=True)


if __name__ == "__main__":
    main()

2025-03-12 08:35:41,999 - __main__ - INFO - 開始處理 CPE 字典檔案: ../data/official-cpe-dictionary_v2.3.xml
2025-03-12 08:35:42,000 - __main__ - INFO - 開始分批處理 XML 檔案 (每批 1000 個項目)
2025-03-12 08:35:42,030 - __main__ - INFO - 已處理 1000 個項目
2025-03-12 08:35:42,058 - __main__ - INFO - 已處理 2000 個項目
2025-03-12 08:35:42,087 - __main__ - INFO - 已處理 3000 個項目
2025-03-12 08:35:42,116 - __main__ - INFO - 已處理 4000 個項目
2025-03-12 08:35:42,145 - __main__ - INFO - 已處理 5000 個項目
2025-03-12 08:35:42,174 - __main__ - INFO - 已處理 6000 個項目
2025-03-12 08:35:42,203 - __main__ - INFO - 已處理 7000 個項目
2025-03-12 08:35:42,233 - __main__ - INFO - 已處理 8000 個項目
2025-03-12 08:35:42,262 - __main__ - INFO - 已處理 9000 個項目
2025-03-12 08:35:42,291 - __main__ - INFO - 已處理 10000 個項目
2025-03-12 08:35:42,320 - __main__ - INFO - 已處理 11000 個項目
2025-03-12 08:35:42,349 - __main__ - INFO - 已處理 12000 個項目
2025-03-12 08:35:42,378 - __main__ - INFO - 已處理 13000 個項目
2025-03-12 08:35:42,408 - __main__ - INFO - 已處理 14000 個項目
2025-03-12 08:35:42,437 -