Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from vulnerabilities.pipelines.v2_importers import (
elixir_security_importer as elixir_security_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import euvd_importer as euvd_importer_v2
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
Expand Down Expand Up @@ -75,6 +76,7 @@
pysec_importer_v2.PyPIImporterPipeline,
xen_importer_v2.XenImporterPipeline,
curl_importer_v2.CurlImporterPipeline,
euvd_importer_v2.EUVDImporterPipeline,
oss_fuzz_v2.OSSFuzzImporterPipeline,
istio_importer_v2.IstioImporterPipeline,
postgresql_importer_v2.PostgreSQLImporterPipeline,
Expand Down
218 changes: 218 additions & 0 deletions vulnerabilities/pipelines/v2_importers/euvd_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
import time
from datetime import datetime
from http import HTTPStatus
from typing import Iterable

import requests
from dateutil import parser as dateparser

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import SCORING_SYSTEMS

logger = logging.getLogger(__name__)


class EUVDImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
EUVD (EU Vulnerability Database) Importer Pipeline

This pipeline imports security advisories from the European Union Vulnerability Database (EUVD).
"""

pipeline_id = "euvd_importer_v2"
spdx_license_expression = "LicenseRef-scancode-other-permissive"
license_url = "https://www.enisa.europa.eu/about-enisa/legal-notice/"
url = "https://euvdservices.enisa.europa.eu/api/search"

def __init__(self):
super().__init__()
self._cached_data = None

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def fetch_data(self):
# Return cached data if already fetched
if self._cached_data is not None:
logger.info(f"Using cached data: {len(self._cached_data)} items")
return self._cached_data
Comment on lines +51 to +53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have _cached_data? It is because the API returns repeated data

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cached_data prevents a second full API fetch.
The base importer calls fetch_data() once to count advisories and again to iterate through them.
Caching ensures both steps use the same dataset snapshot while avoiding duplicated network requests and API load.


headers = {"User-Agent": "VulnerableCode"}
all_items = []
page = 0
size = 100
max_retries = 100

logger.info(f"Fetching data from EUVD API: {self.url}")

while True:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid loops without a condition. Maybe looping over the total 452584 advisories is a good idea.


retry_count = 0
success = False

while retry_count < max_retries and not success:
try:
params = {"size": size, "page": page}
response = requests.get(self.url, headers=headers, params=params, timeout=30)

if response.status_code != HTTPStatus.OK:
logger.error(f"API returned status {response.status_code} for page {page}")
retry_count += 1
if retry_count < max_retries:
sleep_time = min(10 * (2 ** min(retry_count - 1, 5)), 60)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this sleep_time? We run the importers multiple times. If one request fails, we can have just one retry.

( please avoid complex retry )

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot from 2025-11-25 20-25-53

I added the retry logic as I faced some API failures during importer run (in my case it was network failures), but as you mentioned that we run importers multiple times, it shouldn't be an issue. I will apply one retry on failure as suggested.

logger.info(
f"Retrying page {page} in {sleep_time}s (attempt {retry_count}/{max_retries})"
)
time.sleep(sleep_time)
continue
else:
logger.error(f"Max retries reached for page {page}")
return all_items

data = response.json()
items = data.get("items", [])

if not items:
logger.info(f"No items in response for page {page}; stopping fetch.")
logger.info(
f"Fetch completed successfully. Total items collected: {len(all_items)}"
)

# Cache the fetched data for reuse
self._cached_data = all_items
logger.info(f"Cached {len(all_items)} items for reuse")

return all_items

all_items.extend(items)
logger.info(
f"Fetched page {page}: {len(items)} items (total: {len(all_items)})"
)
success = True
page += 1

except requests.exceptions.Timeout as e:
retry_count += 1
if retry_count < max_retries:
logger.warning(
f"Timeout on page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})"
)
time.sleep(10)
else:
logger.error(f"Max retries reached for page {page} after timeout")
return all_items

except Exception as e:
retry_count += 1
if retry_count < max_retries:
logger.error(
f"Error fetching page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})"
)
time.sleep(10)
else:
logger.error(f"Max retries reached for page {page}")
return all_items

def advisories_count(self) -> int:
return len(self.fetch_data())

def collect_advisories(self) -> Iterable[AdvisoryData]:
for raw_data in self.fetch_data():
try:
advisory = self.parse_advisory(raw_data)
if advisory:
yield advisory
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid using general exceptions.

logger.error(f"Failed to parse advisory: {e}")
logger.debug(f"Raw data: {raw_data}")
continue

def parse_advisory(self, raw_data: dict) -> AdvisoryData:
advisory_id = raw_data.get("id", "")

aliases = [advisory_id] if advisory_id else []
aliases_str = raw_data.get("aliases", "")
if aliases_str:
cve_aliases = [alias.strip() for alias in aliases_str.split("\n") if alias.strip()]
aliases.extend(cve_aliases)

summary = raw_data.get("description", "")

date_published = None
date_str = raw_data.get("datePublished", "")
if date_str:
try:
date_published = dateparser.parse(date_str)
if date_published and date_published.tzinfo is None:
date_published = date_published.replace(
tzinfo=datetime.now().astimezone().tzinfo
)
except Exception as e:
logger.warning(f"Failed to parse date '{date_str}': {e}")

references = []
references_str = raw_data.get("references", "")
if references_str:
urls = [url.strip() for url in references_str.split("\n") if url.strip()]
for url in urls:
references.append(ReferenceV2(url=url))

if advisory_id:
advisory_url = f"https://euvd.enisa.europa.eu/vulnerability/{advisory_id}"
references.append(ReferenceV2(url=advisory_url))

severities = []
base_score = raw_data.get("baseScore")
base_score_version = raw_data.get("baseScoreVersion")
base_score_vector = raw_data.get("baseScoreVector")

if base_score and base_score_version:
scoring_system = self.get_scoring_system(base_score_version)
if scoring_system:
severity = VulnerabilitySeverity(
system=scoring_system,
value=str(base_score),
scoring_elements=base_score_vector or "",
)
severities.append(severity)

return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
affected_packages=[],
date_published=date_published,
url=advisory_url if advisory_id else "",
severities=severities,
original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False),
)

@staticmethod
def get_scoring_system(version: str):
version_map = {
"4.0": "cvssv4",
"3.1": "cvssv3.1",
"3.0": "cvssv3",
"2.0": "cvssv2",
}
system_key = version_map.get(version)
if system_key:
return SCORING_SYSTEMS.get(system_key)
logger.warning(f"Unknown CVSS version: {version}")
return None
124 changes: 124 additions & 0 deletions vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import patch

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.euvd_importer import EUVDImporterPipeline

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "euvd"


class TestEUVDImporterPipeline(TestCase):
@patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get")
def test_collect_advisories(self, mock_get):
"""Test collecting and parsing advisories from test data"""
sample1_path = TEST_DATA / "euvd_sample1.json"
sample2_path = TEST_DATA / "euvd_sample2.json"

sample1 = json.loads(sample1_path.read_text(encoding="utf-8"))
sample2 = json.loads(sample2_path.read_text(encoding="utf-8"))

mock_responses = [
Mock(status_code=200, json=lambda: sample1),
Mock(status_code=200, json=lambda: sample2),
Mock(status_code=200, json=lambda: {"items": []}),
]
mock_get.side_effect = mock_responses

pipeline = EUVDImporterPipeline()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 5

first = advisories[0]
assert isinstance(first, AdvisoryData)
assert first.advisory_id == "EUVD-2025-197757"
assert "EUVD-2025-197757" in first.aliases
assert "CVE-2025-13284" in first.aliases
assert first.summary == "ThinPLUS vulnerability that allows remote code execution"
assert first.date_published is not None
assert len(first.severities) == 1
assert first.severities[0].system.identifier == "cvssv3.1"
assert first.severities[0].value == "9.8"
Comment on lines +44 to +53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be easier if you test using util_tests.check_results_against_json(result, expected_file) and with an expected file.

assert (
first.severities[0].scoring_elements == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
)

urls = [ref.url for ref in first.references_v2]
assert "https://nvd.nist.gov/vuln/detail/CVE-2025-13284" in urls
assert "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-197757" in urls

second = advisories[1]
assert second.advisory_id == "EUVD-2024-123456"
assert "CVE-2024-12345" in second.aliases
assert "CVE-2024-67890" in second.aliases
assert len([a for a in second.aliases if a.startswith("CVE-")]) == 2

urls = [ref.url for ref in second.references_v2]
assert "https://example.com/advisory1" in urls
assert "https://example.com/advisory2" in urls

third = advisories[2]
assert third.advisory_id == "EUVD-2023-999999"
assert third.severities[0].system.identifier == "cvssv3"
assert third.severities[0].value == "5.3"

fourth = advisories[3]
assert fourth.advisory_id == "EUVD-2022-555555"
assert fourth.summary == ""
assert fourth.severities[0].system.identifier == "cvssv2"
assert fourth.severities[0].value == "4.3"

fifth = advisories[4]
assert fifth.advisory_id == "EUVD-2021-111111"
assert len([a for a in fifth.aliases if a.startswith("CVE-")]) == 0
assert fifth.summary == "Advisory without CVE alias but with EUVD ID"

def test_get_scoring_system(self):
"""Test CVSS version to scoring system mapping"""
pipeline = EUVDImporterPipeline()

system_v4 = pipeline.get_scoring_system("4.0")
assert system_v4 is not None
assert system_v4.identifier == "cvssv4"

system_v31 = pipeline.get_scoring_system("3.1")
assert system_v31 is not None
assert system_v31.identifier == "cvssv3.1"

system_v3 = pipeline.get_scoring_system("3.0")
assert system_v3 is not None
assert system_v3.identifier == "cvssv3"

system_v2 = pipeline.get_scoring_system("2.0")
assert system_v2 is not None
assert system_v2.identifier == "cvssv2"

system_unknown = pipeline.get_scoring_system("unknown")
assert system_unknown is None

@patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get")
def test_advisories_count(self, mock_get):
"""Test counting advisories"""
sample_data = {"items": [{"id": "1"}, {"id": "2"}, {"id": "3"}]}
mock_responses = [
Mock(status_code=200, json=lambda: sample_data),
Mock(status_code=200, json=lambda: {"items": []}),
]
mock_get.side_effect = mock_responses

pipeline = EUVDImporterPipeline()
count = pipeline.advisories_count()

assert count == 3
34 changes: 34 additions & 0 deletions vulnerabilities/tests/test_data/euvd/euvd_sample1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"items": [
{
"id": "EUVD-2025-197757",
"aliases": "CVE-2025-13284",
"description": "ThinPLUS vulnerability that allows remote code execution",
"datePublished": "2025-01-09T01:00:00.000Z",
"baseScore": "9.8",
"baseScoreVersion": "3.1",
"baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
"references": "https://nvd.nist.gov/vuln/detail/CVE-2025-13284"
},
{
"id": "EUVD-2024-123456",
"aliases": "CVE-2024-12345\nCVE-2024-67890",
"description": "Multiple vulnerabilities in authentication system",
"datePublished": "2024-12-15T10:30:00.000Z",
"baseScore": "7.5",
"baseScoreVersion": "3.1",
"baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
"references": "https://example.com/advisory1\nhttps://example.com/advisory2"
},
{
"id": "EUVD-2023-999999",
"aliases": "CVE-2023-99999",
"description": "Denial of service vulnerability",
"datePublished": "2023-06-20T14:22:00.000Z",
"baseScore": "5.3",
"baseScoreVersion": "3.0",
"baseScoreVector": "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:L",
"references": "https://security.example.org/2023-999999"
}
]
}
Loading