From aa8a91dd1f1c2d3d2d3a2c605d7429758cad56e1 Mon Sep 17 00:00:00 2001 From: shubhobm Date: Wed, 4 Mar 2026 14:07:25 +0530 Subject: [PATCH 1/4] Refine URL connector title/description and report-id handling --- avidtools/connectors/url.py | 64 +++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/avidtools/connectors/url.py b/avidtools/connectors/url.py index ca9d991..77a6fba 100644 --- a/avidtools/connectors/url.py +++ b/avidtools/connectors/url.py @@ -4,6 +4,7 @@ import os import json +import re from datetime import date from typing import Any, Optional import requests @@ -125,9 +126,9 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: {{ "data_type": "AVID", "data_version": "string (optional)", - "metadata": {{ - "report_id": "string (required, format: AVID-YYYY-R-XXXX)" - }}, + "metadata": {{ + "report_id": "string (optional; omit for unpublished reports)" + }}, "affects": {{ "developer": ["list of developer organizations of the model/system involved"], "deployer": ["list of deployer organizations"], @@ -138,14 +139,14 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: }} ] }}, - "problemtype": {{ + "problemtype": {{ "classof": "AIID Incident|ATLAS Case Study|CVE Entry|LLM Evaluation|Third-party Report|Undefined (required, default: Third-party Report)", "type": "Issue|Advisory|Measurement|Detection (optional)", "description": {{ "lang": "eng", - "value": "description text" + "value": "short title (required, concise, ideally <= 12 words)" }} - }}, + }}, "metrics": [ {{ "name": "metric name", @@ -201,10 +202,10 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: Content: {scraped_data['text'][:15000]} -Please analyze this content and extract relevant information to create an AVID report. Return ONLY a valid JSON object with the following structure (all fields are optional except those marked as required): +Please analyze this content and extract relevant information to create an AVID report. Return ONLY a valid JSON object with the following structure (all fields are optional unless explicitly noted): {{ - "report_id": "string (required)", + "report_id": "string (optional; omit if unknown or unpublished)", "affects": {{ "developer": ["list of developers"], "deployer": ["list of deployers"], @@ -220,12 +221,12 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: "type": "Issue|Advisory|Measurement|Detection", "description": {{ "lang": "eng", - "value": "description text" + "value": "short title (required, concise, ideally <= 12 words)" }} }}, - "description": {{ + "description": {{ "lang": "eng", - "value": "high-level description" + "value": "full descriptive summary paragraph" }}, "references": [ {{ @@ -239,13 +240,47 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: Important guidelines: - Be specific and accurate - Extract actual names, organizations, and technical details from the content -- For the report_id, use format AVID-{date.today().year}-R-XXXX where XXXX is a random 4-digit number +- problemtype.description.value MUST be a short title, not a paragraph. Summarize the actual issue concisely, do NOT mention the reporter, or that it's from a URL or article. You can mention the product as needed. Focus on the core vulnerability or issue. +- description.value should contain the detailed narrative description - MUST include the source URL ({scraped_data['url']}) in references with an appropriate label -- If information is not available in the content, omit that field entirely (except report_id which is required) +- If information is not available in the content, omit that field entirely - Return ONLY the JSON object, no additional text or explanation """ return prompt + def _normalize_problemtype_title( + self, parsed_data: dict[str, Any], scraped_data: dict[str, Any] + ) -> dict[str, Any]: + """Ensure problemtype.description.value is a short title, not a long description.""" + problemtype = parsed_data.get("problemtype") + if not isinstance(problemtype, dict): + return parsed_data + + description_obj = problemtype.get("description") + if not isinstance(description_obj, dict): + return parsed_data + + current_title = description_obj.get("value") + page_title = (scraped_data.get("title") or "").strip() + + if not isinstance(current_title, str) or not current_title.strip(): + if page_title: + description_obj["value"] = page_title[:120].strip() + return parsed_data + + normalized = " ".join(current_title.split()) + + too_long = len(normalized) > 120 or len(normalized.split()) > 16 + if too_long and page_title: + normalized = page_title + + normalized = re.split(r"[.!?]", normalized, maxsplit=1)[0].strip() + if len(normalized) > 120: + normalized = normalized[:117].rstrip() + "..." + + description_obj["value"] = normalized + return parsed_data + def _parse_ai_response(self, response_text: str) -> dict[str, Any]: """ Parse the AI response and extract JSON. @@ -473,6 +508,9 @@ def create_report_from_url(self, url: str, max_retries: int = 2) -> Report: # Step 4: Parse AI response parsed_data = self._parse_ai_response(ai_response) + parsed_data = self._normalize_problemtype_title( + parsed_data, scraped_data + ) print("Successfully parsed AI response") # Step 5: Build Report object From 90656824f829034edb125e83bedb47b34c83a273 Mon Sep 17 00:00:00 2001 From: shubhobm Date: Wed, 4 Mar 2026 14:26:22 +0530 Subject: [PATCH 2/4] Improve URL credit extraction consistency --- avidtools/connectors/url.py | 231 ++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) diff --git a/avidtools/connectors/url.py b/avidtools/connectors/url.py index 77a6fba..dddb510 100644 --- a/avidtools/connectors/url.py +++ b/avidtools/connectors/url.py @@ -7,6 +7,7 @@ import re from datetime import date from typing import Any, Optional +from urllib.parse import urlparse import requests from bs4 import BeautifulSoup from openai import OpenAI, AsyncOpenAI @@ -242,6 +243,7 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: - Extract actual names, organizations, and technical details from the content - problemtype.description.value MUST be a short title, not a paragraph. Summarize the actual issue concisely, do NOT mention the reporter, or that it's from a URL or article. You can mention the product as needed. Focus on the core vulnerability or issue. - description.value should contain the detailed narrative description +- Credit guidance: include the research team/person and article author when available; include company only when a research team/person is not identified - MUST include the source URL ({scraped_data['url']}) in references with an appropriate label - If information is not available in the content, omit that field entirely - Return ONLY the JSON object, no additional text or explanation @@ -449,6 +451,11 @@ def _build_report_from_json(self, data: dict) -> Report: date_str = data["reported_date"] reported_date = date.fromisoformat(date_str) + # Build credit + credit = None + if "credit" in data and isinstance(data["credit"], list): + credit = [LangValue(**entry) for entry in data["credit"]] + # Build the report report = Report( metadata=metadata, @@ -457,11 +464,234 @@ def _build_report_from_json(self, data: dict) -> Report: description=description, impact=impact, references=references, + credit=credit, reported_date=reported_date, ) return report + def _infer_credit_from_url(self, url: str) -> str: + """Infer a default credit string from the source URL hostname.""" + hostname = urlparse(url).hostname or "" + hostname = hostname.lower().strip() + if hostname.startswith("www."): + hostname = hostname[4:] + return hostname or url + + def _normalized_hostname(self, url: str) -> str: + hostname = (urlparse(url).hostname or "").lower().strip() + if hostname.startswith("www."): + hostname = hostname[4:] + return hostname + + def _extract_author_person(self, scraped_data: dict[str, Any]) -> Optional[str]: + """Extract an article author person from JSON-LD, metadata, or byline text.""" + try: + soup = BeautifulSoup(scraped_data.get("html", ""), "html.parser") + except Exception: + soup = None + + candidates: list[str] = [] + + if soup is not None: + for script in soup.find_all("script", attrs={"type": "application/ld+json"}): + raw = (script.string or script.get_text() or "").strip() + if not raw: + continue + try: + payload = json.loads(raw) + except Exception: + continue + + def collect_authors(node: Any) -> None: + if isinstance(node, dict): + author = node.get("author") + if isinstance(author, dict): + name = author.get("name") + if isinstance(name, str): + candidates.append(name) + elif isinstance(author, list): + for item in author: + if isinstance(item, dict) and isinstance(item.get("name"), str): + candidates.append(item["name"]) + elif isinstance(item, str): + candidates.append(item) + elif isinstance(author, str): + candidates.append(author) + + for value in node.values(): + collect_authors(value) + elif isinstance(node, list): + for item in node: + collect_authors(item) + + collect_authors(payload) + + meta_keys = [ + ("name", "author"), + ("property", "article:author"), + ("name", "parsely-author"), + ("name", "dc.creator"), + ("name", "twitter:creator"), + ] + for attr, key in meta_keys: + tag = soup.find("meta", attrs={attr: key}) + if tag and tag.get("content"): + candidates.append(tag.get("content", "").strip()) + + byline_nodes = soup.select('[class*="author" i], [class*="byline" i], [rel="author"]') + for node in byline_nodes[:8]: + text = node.get_text(" ", strip=True) + if text: + candidates.append(text) + + text_snippet = (scraped_data.get("text") or "")[:12000] + byline_patterns = [ + r"\bBy\s+([A-Z][A-Za-z'\-]+(?:\s+[A-Z][A-Za-z'\-]+){1,3})\b", + r"\bWritten\s+by\s+([A-Z][A-Za-z'\-]+(?:\s+[A-Z][A-Za-z'\-]+){1,3})\b", + r"\bAuthor\s*:?\s*([A-Z][A-Za-z'\-]+(?:\s+[A-Z][A-Za-z'\-]+){1,3})\b", + ] + for pattern in byline_patterns: + for match in re.finditer(pattern, text_snippet): + candidates.append(match.group(1).strip()) + + for candidate in candidates: + value = candidate.strip(" @") + value = re.sub(r"^(by|written by|author:)\s+", "", value, flags=re.IGNORECASE).strip() + value = " ".join(value.split()) + if not value: + continue + if self._looks_like_person_name(value): + return value + return None + + def _extract_research_entity(self, scraped_data: dict[str, Any], url: str) -> Optional[str]: + """Extract research team/person entity from page metadata or article text.""" + hostname = self._normalized_hostname(url) + text = "\n".join( + [ + scraped_data.get("title", "") or "", + (scraped_data.get("text", "") or "")[:12000], + ] + ) + patterns = [ + r"\b([A-Z][A-Za-z0-9&'\-]+(?:\s+[A-Z][A-Za-z0-9&'\-]+){0,3}\s+Research)\b", + r"\b([A-Z][A-Za-z0-9&'\-]+(?:\s+[A-Z][A-Za-z0-9&'\-]+){0,3}\s+Security\s+Team)\b", + r"\b([A-Z][A-Za-z0-9&'\-]+(?:\s+[A-Z][A-Za-z0-9&'\-]+){0,3}\s+Labs)\b", + ] + for pattern in patterns: + match = re.search(pattern, text) + if match: + found = match.group(1).strip() + if hostname == "wiz.io" and "wiz" in found.lower(): + return "Wiz Research" + return found + + if hostname == "wiz.io": + return "Wiz Research" + + return None + + def _extract_company_credit(self, scraped_data: dict[str, Any], url: str) -> str: + """Extract organization/company name from page metadata, with hostname fallback.""" + try: + soup = BeautifulSoup(scraped_data.get("html", ""), "html.parser") + except Exception: + soup = None + + if soup is not None: + meta_keys = [ + ("property", "og:site_name"), + ("name", "application-name"), + ("name", "publisher"), + ] + for attr, key in meta_keys: + tag = soup.find("meta", attrs={attr: key}) + if tag and tag.get("content"): + content = tag.get("content", "").strip() + if content: + return content + + return self._infer_credit_from_url(url) + + def _looks_like_person_name(self, value: str) -> bool: + """Heuristic to identify person-name style values.""" + text = value.strip() + lowered = text.lower() + if "." in lowered: + return False + if any( + token in lowered + for token in [ + "research", + "team", + "security", + "labs", + "inc", + "corp", + "company", + "browser", + "blog", + "media", + ] + ): + return False + if not re.match(r"^[A-Z][A-Za-z'\-]+(?:\s+[A-Z][A-Za-z'\-]+){1,3}$", text): + return False + parts = text.split() + return 2 <= len(parts) <= 4 + + def _looks_like_research_entity(self, value: str) -> bool: + """Heuristic to identify team/person research entity values.""" + lowered = value.strip().lower() + return any( + token in lowered + for token in [ + "research", + "team", + "security", + "labs", + ] + ) + + def _apply_credit_defaults(self, report: Report, scraped_data: dict[str, Any], url: str) -> None: + """Assign credits using policy: + 1) company blog + 2) research team/person (if present, suppress company) + 3) article author person + """ + existing_values = [ + entry.value.strip() + for entry in (report.credit or []) + if entry.value and entry.value.strip() + ] + + research_entity = self._extract_research_entity(scraped_data, url) + if not research_entity: + for value in existing_values: + if self._looks_like_research_entity(value): + research_entity = value + break + + author_person = self._extract_author_person(scraped_data) + if not author_person: + for value in existing_values: + if self._looks_like_person_name(value) and value != research_entity: + author_person = value + break + + company_blog = self._extract_company_credit(scraped_data, url) + + ordered: list[str] = [] + if research_entity: + ordered.append(research_entity) + if author_person and author_person not in ordered: + ordered.append(author_person) + if not research_entity and company_blog and company_blog not in ordered: + ordered.append(company_blog) + + report.credit = [LangValue(lang="eng", value=value) for value in ordered] + def create_report_from_url(self, url: str, max_retries: int = 2) -> Report: """ Scrape a URL and create an AVID report using AI. @@ -515,6 +745,7 @@ def create_report_from_url(self, url: str, max_retries: int = 2) -> Report: # Step 5: Build Report object report = self._build_report_from_json(parsed_data) + self._apply_credit_defaults(report, scraped_data, url) print(f"Created AVID report: {report.metadata.report_id if report.metadata else 'N/A'}") return report From e98043d6b83938b591a3ad4ba9f0eb0aa615538e Mon Sep 17 00:00:00 2001 From: shubhobm Date: Wed, 4 Mar 2026 14:40:59 +0530 Subject: [PATCH 3/4] Improve URL connector type guidance and extraction --- avidtools/connectors/url.py | 61 +++++++++++++++++++++++++++++++++++ avidtools/datamodels/enums.py | 2 +- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/avidtools/connectors/url.py b/avidtools/connectors/url.py index dddb510..f01a9bf 100644 --- a/avidtools/connectors/url.py +++ b/avidtools/connectors/url.py @@ -242,6 +242,12 @@ def _create_ai_prompt(self, scraped_data: dict) -> str: - Be specific and accurate - Extract actual names, organizations, and technical details from the content - problemtype.description.value MUST be a short title, not a paragraph. Summarize the actual issue concisely, do NOT mention the reporter, or that it's from a URL or article. You can mention the product as needed. Focus on the core vulnerability or issue. +- problemtype.type MUST be one of: Issue, Advisory, Measurement, Detection +- Type definitions (from AVID database): + - Issue: qualitative evaluation based on a single sample or handful of samples + - Advisory: qualitative evaluation based on multiple Incidents + - Measurement: quantitative evaluation with associated data and metric + - Detection: a Measurement deemed critical by a threshold or statistical test - description.value should contain the detailed narrative description - Credit guidance: include the research team/person and article author when available; include company only when a research team/person is not identified - MUST include the source URL ({scraped_data['url']}) in references with an appropriate label @@ -283,6 +289,58 @@ def _normalize_problemtype_title( description_obj["value"] = normalized return parsed_data + def _normalize_problemtype_type( + self, parsed_data: dict[str, Any], scraped_data: dict[str, Any] + ) -> dict[str, Any]: + """Ensure problemtype.type is always a valid TypeEnum value.""" + problemtype = parsed_data.get("problemtype") + if not isinstance(problemtype, dict): + parsed_data["problemtype"] = { + "classof": ClassEnum.third_party.value, + "type": TypeEnum.issue.value, + "description": {"lang": "eng", "value": (scraped_data.get("title") or "Issue")[:120]}, + } + return parsed_data + + raw_type = problemtype.get("type") + normalized_raw_type = None + if isinstance(raw_type, str) and raw_type.strip(): + mapping = { + "issue": TypeEnum.issue.value, + "advisory": TypeEnum.advisory.value, + "measurement": TypeEnum.measurement.value, + "detection": TypeEnum.detection.value, + } + normalized_raw_type = mapping.get(raw_type.strip().lower()) + + classof = str(problemtype.get("classof") or "").lower() + text = " ".join( + [ + scraped_data.get("title") or "", + (scraped_data.get("text") or "")[:5000], + ] + ).lower() + url = str(scraped_data.get("url") or "").lower() + + if "cve entry" in classof or "advisory" in classof: + inferred = TypeEnum.advisory.value + elif any(k in text or k in url for k in ["benchmark", "evaluation", "measured", "metric", "score", "accuracy", "recall", "precision"]): + inferred = TypeEnum.measurement.value + elif any(k in text or k in url for k in ["detect", "detector", "detection", "flagged", "classifier"]): + inferred = TypeEnum.detection.value + elif any(k in text or k in url for k in ["cve-", "vulnerability", "vulnerable", "security bulletin", "advisory", "exploit", "rce", "xss", "csrf", "injection", "auth bypass", "data leak", "exposed", "critical vulnerability"]): + inferred = TypeEnum.advisory.value + else: + inferred = TypeEnum.issue.value + + if normalized_raw_type == TypeEnum.issue.value and inferred == TypeEnum.advisory.value: + problemtype["type"] = TypeEnum.advisory.value + elif normalized_raw_type: + problemtype["type"] = normalized_raw_type + else: + problemtype["type"] = inferred + return parsed_data + def _parse_ai_response(self, response_text: str) -> dict[str, Any]: """ Parse the AI response and extract JSON. @@ -741,6 +799,9 @@ def create_report_from_url(self, url: str, max_retries: int = 2) -> Report: parsed_data = self._normalize_problemtype_title( parsed_data, scraped_data ) + parsed_data = self._normalize_problemtype_type( + parsed_data, scraped_data + ) print("Successfully parsed AI response") # Step 5: Build Report object diff --git a/avidtools/datamodels/enums.py b/avidtools/datamodels/enums.py index b280174..5867647 100644 --- a/avidtools/datamodels/enums.py +++ b/avidtools/datamodels/enums.py @@ -89,7 +89,7 @@ class ClassEnum(str, Enum): class TypeEnum(str, Enum): - """All report/vulnerability types.""" + """All report/vulnerability types. Source: https://avidml.org/database/""" issue = "Issue" advisory = "Advisory" From 72b0c9d95dc6be80e5c8739506a7a5c3285fa3a9 Mon Sep 17 00:00:00 2001 From: shubhobm Date: Wed, 4 Mar 2026 14:53:07 +0530 Subject: [PATCH 4/4] Fix URL connector typing and add make ruff target --- Makefile | 4 +++- avidtools/connectors/url.py | 20 ++++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index fd1f756..cd8f3ce 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all setup setup-submodules update-submodules install update clean test lint format help +.PHONY: all setup setup-submodules update-submodules install update clean test lint format help ruff # Default target all: help @@ -44,6 +44,8 @@ linting: poetry run ruff check avidtools @echo "ruff check complete" +ruff: linting + # Run linter lint: typecheck linting diff --git a/avidtools/connectors/url.py b/avidtools/connectors/url.py index f01a9bf..2a1c101 100644 --- a/avidtools/connectors/url.py +++ b/avidtools/connectors/url.py @@ -542,6 +542,14 @@ def _normalized_hostname(self, url: str) -> str: hostname = hostname[4:] return hostname + def _meta_content_as_text(self, tag: Any) -> Optional[str]: + """Return meta content as text when available and string-like.""" + raw_content = tag.get("content") if tag is not None else None + if isinstance(raw_content, str): + content = raw_content.strip() + return content or None + return None + def _extract_author_person(self, scraped_data: dict[str, Any]) -> Optional[str]: """Extract an article author person from JSON-LD, metadata, or byline text.""" try: @@ -594,8 +602,9 @@ def collect_authors(node: Any) -> None: ] for attr, key in meta_keys: tag = soup.find("meta", attrs={attr: key}) - if tag and tag.get("content"): - candidates.append(tag.get("content", "").strip()) + content = self._meta_content_as_text(tag) + if content: + candidates.append(content) byline_nodes = soup.select('[class*="author" i], [class*="byline" i], [rel="author"]') for node in byline_nodes[:8]: @@ -665,10 +674,9 @@ def _extract_company_credit(self, scraped_data: dict[str, Any], url: str) -> str ] for attr, key in meta_keys: tag = soup.find("meta", attrs={attr: key}) - if tag and tag.get("content"): - content = tag.get("content", "").strip() - if content: - return content + content = self._meta_content_as_text(tag) + if content: + return content return self._infer_credit_from_url(url)