diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 82ee4525a..54b97adc2 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -47,6 +47,7 @@ from vulnerabilities.pipelines.v2_importers import ( elixir_security_importer as elixir_security_importer_v2, ) +from vulnerabilities.pipelines.v2_importers import euvd_importer as euvd_importer_v2 from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2 from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2 from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2 @@ -75,6 +76,7 @@ pysec_importer_v2.PyPIImporterPipeline, xen_importer_v2.XenImporterPipeline, curl_importer_v2.CurlImporterPipeline, + euvd_importer_v2.EUVDImporterPipeline, oss_fuzz_v2.OSSFuzzImporterPipeline, istio_importer_v2.IstioImporterPipeline, postgresql_importer_v2.PostgreSQLImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/euvd_importer.py b/vulnerabilities/pipelines/v2_importers/euvd_importer.py new file mode 100644 index 000000000..c1ac7549d --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/euvd_importer.py @@ -0,0 +1,218 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +import logging +import time +from datetime import datetime +from http import HTTPStatus +from typing import Iterable + +import requests +from dateutil import parser as dateparser + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.severity_systems import SCORING_SYSTEMS + +logger = logging.getLogger(__name__) + + +class EUVDImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """ + EUVD (EU Vulnerability Database) Importer Pipeline + + This pipeline imports security advisories from the European Union Vulnerability Database (EUVD). + """ + + pipeline_id = "euvd_importer_v2" + spdx_license_expression = "LicenseRef-scancode-other-permissive" + license_url = "https://www.enisa.europa.eu/about-enisa/legal-notice/" + url = "https://euvdservices.enisa.europa.eu/api/search" + + def __init__(self): + super().__init__() + self._cached_data = None + + @classmethod + def steps(cls): + return (cls.collect_and_store_advisories,) + + def fetch_data(self): + # Return cached data if already fetched + if self._cached_data is not None: + logger.info(f"Using cached data: {len(self._cached_data)} items") + return self._cached_data + + headers = {"User-Agent": "VulnerableCode"} + all_items = [] + page = 0 + size = 100 + max_retries = 100 + + logger.info(f"Fetching data from EUVD API: {self.url}") + + while True: + + retry_count = 0 + success = False + + while retry_count < max_retries and not success: + try: + params = {"size": size, "page": page} + response = requests.get(self.url, headers=headers, params=params, timeout=30) + + if response.status_code != HTTPStatus.OK: + logger.error(f"API returned status {response.status_code} for page {page}") + retry_count += 1 + if retry_count < max_retries: + sleep_time = min(10 * (2 ** min(retry_count - 1, 5)), 60) + logger.info( + f"Retrying page {page} in {sleep_time}s (attempt {retry_count}/{max_retries})" + ) + time.sleep(sleep_time) + continue + else: + logger.error(f"Max retries reached for page {page}") + return all_items + + data = response.json() + items = data.get("items", []) + + if not items: + logger.info(f"No items in response for page {page}; stopping fetch.") + logger.info( + f"Fetch completed successfully. Total items collected: {len(all_items)}" + ) + + # Cache the fetched data for reuse + self._cached_data = all_items + logger.info(f"Cached {len(all_items)} items for reuse") + + return all_items + + all_items.extend(items) + logger.info( + f"Fetched page {page}: {len(items)} items (total: {len(all_items)})" + ) + success = True + page += 1 + + except requests.exceptions.Timeout as e: + retry_count += 1 + if retry_count < max_retries: + logger.warning( + f"Timeout on page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})" + ) + time.sleep(10) + else: + logger.error(f"Max retries reached for page {page} after timeout") + return all_items + + except Exception as e: + retry_count += 1 + if retry_count < max_retries: + logger.error( + f"Error fetching page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})" + ) + time.sleep(10) + else: + logger.error(f"Max retries reached for page {page}") + return all_items + + def advisories_count(self) -> int: + return len(self.fetch_data()) + + def collect_advisories(self) -> Iterable[AdvisoryData]: + for raw_data in self.fetch_data(): + try: + advisory = self.parse_advisory(raw_data) + if advisory: + yield advisory + except Exception as e: + logger.error(f"Failed to parse advisory: {e}") + logger.debug(f"Raw data: {raw_data}") + continue + + def parse_advisory(self, raw_data: dict) -> AdvisoryData: + advisory_id = raw_data.get("id", "") + + aliases = [advisory_id] if advisory_id else [] + aliases_str = raw_data.get("aliases", "") + if aliases_str: + cve_aliases = [alias.strip() for alias in aliases_str.split("\n") if alias.strip()] + aliases.extend(cve_aliases) + + summary = raw_data.get("description", "") + + date_published = None + date_str = raw_data.get("datePublished", "") + if date_str: + try: + date_published = dateparser.parse(date_str) + if date_published and date_published.tzinfo is None: + date_published = date_published.replace( + tzinfo=datetime.now().astimezone().tzinfo + ) + except Exception as e: + logger.warning(f"Failed to parse date '{date_str}': {e}") + + references = [] + references_str = raw_data.get("references", "") + if references_str: + urls = [url.strip() for url in references_str.split("\n") if url.strip()] + for url in urls: + references.append(ReferenceV2(url=url)) + + if advisory_id: + advisory_url = f"https://euvd.enisa.europa.eu/vulnerability/{advisory_id}" + references.append(ReferenceV2(url=advisory_url)) + + severities = [] + base_score = raw_data.get("baseScore") + base_score_version = raw_data.get("baseScoreVersion") + base_score_vector = raw_data.get("baseScoreVector") + + if base_score and base_score_version: + scoring_system = self.get_scoring_system(base_score_version) + if scoring_system: + severity = VulnerabilitySeverity( + system=scoring_system, + value=str(base_score), + scoring_elements=base_score_vector or "", + ) + severities.append(severity) + + return AdvisoryData( + advisory_id=advisory_id, + aliases=aliases, + summary=summary, + references_v2=references, + affected_packages=[], + date_published=date_published, + url=advisory_url if advisory_id else "", + severities=severities, + original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False), + ) + + @staticmethod + def get_scoring_system(version: str): + version_map = { + "4.0": "cvssv4", + "3.1": "cvssv3.1", + "3.0": "cvssv3", + "2.0": "cvssv2", + } + system_key = version_map.get(version) + if system_key: + return SCORING_SYSTEMS.get(system_key) + logger.warning(f"Unknown CVSS version: {version}") + return None diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py new file mode 100644 index 000000000..02c472ad3 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py @@ -0,0 +1,124 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +from pathlib import Path +from unittest import TestCase +from unittest.mock import Mock +from unittest.mock import patch + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.euvd_importer import EUVDImporterPipeline + +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "euvd" + + +class TestEUVDImporterPipeline(TestCase): + @patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get") + def test_collect_advisories(self, mock_get): + """Test collecting and parsing advisories from test data""" + sample1_path = TEST_DATA / "euvd_sample1.json" + sample2_path = TEST_DATA / "euvd_sample2.json" + + sample1 = json.loads(sample1_path.read_text(encoding="utf-8")) + sample2 = json.loads(sample2_path.read_text(encoding="utf-8")) + + mock_responses = [ + Mock(status_code=200, json=lambda: sample1), + Mock(status_code=200, json=lambda: sample2), + Mock(status_code=200, json=lambda: {"items": []}), + ] + mock_get.side_effect = mock_responses + + pipeline = EUVDImporterPipeline() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 5 + + first = advisories[0] + assert isinstance(first, AdvisoryData) + assert first.advisory_id == "EUVD-2025-197757" + assert "EUVD-2025-197757" in first.aliases + assert "CVE-2025-13284" in first.aliases + assert first.summary == "ThinPLUS vulnerability that allows remote code execution" + assert first.date_published is not None + assert len(first.severities) == 1 + assert first.severities[0].system.identifier == "cvssv3.1" + assert first.severities[0].value == "9.8" + assert ( + first.severities[0].scoring_elements == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + ) + + urls = [ref.url for ref in first.references_v2] + assert "https://nvd.nist.gov/vuln/detail/CVE-2025-13284" in urls + assert "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-197757" in urls + + second = advisories[1] + assert second.advisory_id == "EUVD-2024-123456" + assert "CVE-2024-12345" in second.aliases + assert "CVE-2024-67890" in second.aliases + assert len([a for a in second.aliases if a.startswith("CVE-")]) == 2 + + urls = [ref.url for ref in second.references_v2] + assert "https://example.com/advisory1" in urls + assert "https://example.com/advisory2" in urls + + third = advisories[2] + assert third.advisory_id == "EUVD-2023-999999" + assert third.severities[0].system.identifier == "cvssv3" + assert third.severities[0].value == "5.3" + + fourth = advisories[3] + assert fourth.advisory_id == "EUVD-2022-555555" + assert fourth.summary == "" + assert fourth.severities[0].system.identifier == "cvssv2" + assert fourth.severities[0].value == "4.3" + + fifth = advisories[4] + assert fifth.advisory_id == "EUVD-2021-111111" + assert len([a for a in fifth.aliases if a.startswith("CVE-")]) == 0 + assert fifth.summary == "Advisory without CVE alias but with EUVD ID" + + def test_get_scoring_system(self): + """Test CVSS version to scoring system mapping""" + pipeline = EUVDImporterPipeline() + + system_v4 = pipeline.get_scoring_system("4.0") + assert system_v4 is not None + assert system_v4.identifier == "cvssv4" + + system_v31 = pipeline.get_scoring_system("3.1") + assert system_v31 is not None + assert system_v31.identifier == "cvssv3.1" + + system_v3 = pipeline.get_scoring_system("3.0") + assert system_v3 is not None + assert system_v3.identifier == "cvssv3" + + system_v2 = pipeline.get_scoring_system("2.0") + assert system_v2 is not None + assert system_v2.identifier == "cvssv2" + + system_unknown = pipeline.get_scoring_system("unknown") + assert system_unknown is None + + @patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get") + def test_advisories_count(self, mock_get): + """Test counting advisories""" + sample_data = {"items": [{"id": "1"}, {"id": "2"}, {"id": "3"}]} + mock_responses = [ + Mock(status_code=200, json=lambda: sample_data), + Mock(status_code=200, json=lambda: {"items": []}), + ] + mock_get.side_effect = mock_responses + + pipeline = EUVDImporterPipeline() + count = pipeline.advisories_count() + + assert count == 3 diff --git a/vulnerabilities/tests/test_data/euvd/euvd_sample1.json b/vulnerabilities/tests/test_data/euvd/euvd_sample1.json new file mode 100644 index 000000000..5ac4b56fc --- /dev/null +++ b/vulnerabilities/tests/test_data/euvd/euvd_sample1.json @@ -0,0 +1,34 @@ +{ + "items": [ + { + "id": "EUVD-2025-197757", + "aliases": "CVE-2025-13284", + "description": "ThinPLUS vulnerability that allows remote code execution", + "datePublished": "2025-01-09T01:00:00.000Z", + "baseScore": "9.8", + "baseScoreVersion": "3.1", + "baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + "references": "https://nvd.nist.gov/vuln/detail/CVE-2025-13284" + }, + { + "id": "EUVD-2024-123456", + "aliases": "CVE-2024-12345\nCVE-2024-67890", + "description": "Multiple vulnerabilities in authentication system", + "datePublished": "2024-12-15T10:30:00.000Z", + "baseScore": "7.5", + "baseScoreVersion": "3.1", + "baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N", + "references": "https://example.com/advisory1\nhttps://example.com/advisory2" + }, + { + "id": "EUVD-2023-999999", + "aliases": "CVE-2023-99999", + "description": "Denial of service vulnerability", + "datePublished": "2023-06-20T14:22:00.000Z", + "baseScore": "5.3", + "baseScoreVersion": "3.0", + "baseScoreVector": "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:L", + "references": "https://security.example.org/2023-999999" + } + ] +} diff --git a/vulnerabilities/tests/test_data/euvd/euvd_sample2.json b/vulnerabilities/tests/test_data/euvd/euvd_sample2.json new file mode 100644 index 000000000..67347e35a --- /dev/null +++ b/vulnerabilities/tests/test_data/euvd/euvd_sample2.json @@ -0,0 +1,24 @@ +{ + "items": [ + { + "id": "EUVD-2022-555555", + "aliases": "CVE-2022-55555", + "description": "", + "datePublished": "2022-03-10T08:15:00.000Z", + "baseScore": "4.3", + "baseScoreVersion": "2.0", + "baseScoreVector": "AV:N/AC:M/Au:N/C:N/I:P/A:N", + "references": "" + }, + { + "id": "EUVD-2021-111111", + "aliases": "", + "description": "Advisory without CVE alias but with EUVD ID", + "datePublished": "2021-11-05T16:45:00.000Z", + "baseScore": "6.5", + "baseScoreVersion": "3.1", + "baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:N/A:N", + "references": "https://euvd.example.org/2021-111111" + } + ] +}