# <h1 style="text-align:center;">**First Data Analysis:**</h1>

---

### **Project Presentation**

This Jupyter notebook defines the methodology for **ingestion**, **processing**, and **enrichment** of data for the ANSSI alert project. The objective is to consolidate disparate sources (RSS feeds, threat metrics, and weakness repositories) into a single, actionable data structure for our cybersecurity application.

### **Methodological Approach**

The workflow is broken down into three critical phases:

1. **Initial Collection and Transformation**:
* Retrieval of security bulletins via the **ANSSI RSS feed**.
* Structuring raw data into a first **Pandas** DataFrame.


2. **API Enrichment (Threat Vector)**:
* Extraction of **CVE** (Common Vulnerabilities and Exposures) identifiers.
* API calls to integrate **EPSS** (Exploit Prediction Scoring System) scores, measuring the real probability of vulnerability exploitation.


3. **Business Consolidation (MITRE Repository)**:
* Querying the **MITRE API** to retrieve contextual metadata (descriptions, CWE weakness types).
* Creation of a second normalized DataFrame based on the `CVE` primary key.



### **Expected Result**

The process concludes with the **merging** of the two datasets. We obtain a final enriched DataFrame, offering a 360Â° view of each alert: from its official publication to its technical severity score, thus constituting the data engine of our application.

---

In [1]:
import feedparser # To retrieve data from an RSS feed
import numpy as np
import pandas as pd
import time
import datetime
import requests # To make API requests
import re

# **I - ANSSI Data Retrieval:**
ANSSI writes alerts and advisories on security vulnerabilities. Thus, we retrieve the RSS feed in the form of a parsefeeddict:

In [2]:
anssi_feed = feedparser.parse("https://www.cert.ssi.gouv.fr/feed/")

The data is in list form under the "entries" key. Let's take the time to look at how the data is presented.

In [None]:
if anssi_feed.entries:
    for k,v in anssi_feed.entries[1].items():
        print(f"[{k} | {type(v)}] -> {v}", end = "\n")

To summarize, the RSS feed returns:
| Element | Type | Description |
| --- | --- | --- |
| **`title`** | `str` | The title of the alert. Here, it contains the product name and the initial publication date. |
| **`title_detail`** | `dict` | Contains metadata about the title: its format (`text/plain`), language, and source URL (`base`). |
| **`links`** | `list` | A list of dictionaries containing associated links. It often contains the link to the official HTML page. |
| **`link`** | `str` | The direct URL to the alert on the CERT-FR site (shortcut of the first link in `links`). |
| **`summary`** | `str` | A text summary (often with HTML). |
| **`summary_detail`** | `dict` | Technical details of the summary. |
| **`id`** | `str` | The unique identifier of the entry (often identical to the URL). |
| **`guidislink`** | `bool` | Indicates if the identifier (`id`) is a usable URL link. |
| **`published`** | `str` | The publication date in text format. |
| **`published_parsed`** | `struct` | The date converted into a `time.struct_time` object, allowing for easy sorting of CVEs by year or month. |

---

Some data is useless to us. For example: `summary`, `isguidislink`, `id`, and `links` do not interest us. In the case of `title_detail` and `summary_detail`, we will reformat the data structure to keep only what is relevant.

In [4]:
for i in range(len(anssi_feed.entries)):
    # Some data is stored in sub-lists or sub-dictionaries, we retrieve only part of this data
    if "title_detail" in anssi_feed.entries[i] and type(anssi_feed.entries[i]["title_detail"]) == feedparser.util.FeedParserDict:
        anssi_feed.entries[i]["title"] = anssi_feed.entries[i]["title_detail"]["value"]

    if "summary_detail" in anssi_feed.entries[i] and type(anssi_feed.entries[i]["summary_detail"]) == feedparser.util.FeedParserDict:
        anssi_feed.entries[i]["summary_detail"] = anssi_feed.entries[i]["summary_detail"]["value"]

    if "published_parsed" in anssi_feed.entries[i]:
        anssi_feed.entries[i]["published"] = pd.to_datetime(datetime.datetime.fromtimestamp(time.mktime(anssi_feed.entries[i]["published_parsed"]))) # Transformation of the date to datetime format
        del anssi_feed.entries[i]["published_parsed"] # We prefer to keep only the date in datetime type and keep published as the key name

    # We remove what we don't need:
    if "summary" in anssi_feed.entries[i]:
        del anssi_feed.entries[i]["summary"]
    if "id" in anssi_feed.entries[i]:
        del anssi_feed.entries[i]["id"]
    if "guidislink" in anssi_feed.entries[i]:
        del anssi_feed.entries[i]["guidislink"]
    if "title_detail" in anssi_feed.entries[i]:
        del anssi_feed.entries[i]["title_detail"]
    if "links" in anssi_feed.entries[i]:
        del anssi_feed.entries[i]["links"]

We can now pass this data into a dataframe which we sort by publication date (from most recent to oldest). The most recent is the largest index.

In [5]:
anssi_df = pd.DataFrame.from_dict(anssi_feed.entries)
anssi_df.sort_index(ascending=False)

There are two types of ANSSI publications, alerts and advisories. Let's create a column that categorizes the publications based on the link.

In [6]:
conditions = [
    anssi_df["link"].str.contains("alerte", case=False, na=False),
    anssi_df["link"].str.contains("avis", case=False, na=False)
]
anssi_df["type_publication"] = np.select(conditions, ["alerte", "avis"], default=None)

anssi_df = anssi_df.dropna(subset=["type_publication"]) # We remove everything that is neither an advisory nor an alert

In [7]:
anssi_df.head()

# **II - Retrieval of the CVE (Common Vulnerabilities and Exposures):**

The CVE is a unique key that identifies a vulnerability. The ANSSI feed does not directly provide this identifier, which is why we will scrape the page of each alert/advisory to get the CVEs.

In [8]:
session = requests.Session() # Creating a session helps reduce execution time during many requests

def get_cve(anssi_url): # The function was optimized with Gemini Pro
    if not isinstance(anssi_url, str) or not anssi_url.strip(): # Ensuring the validity of the input argument
        return []

    target_url = anssi_url.rstrip("/") + "/json/" # Retrieving the json of the page
    
    try:
        # Using the global session
        response = session.get(target_url, timeout=5) 
        
        if response.status_code == 200: # If the request succeeds
            # Method 1: using REGEX
            # return list(set(re.findall(r"CVE-\d{4}-\d{4,7}", response.text)) )# set() for deduplication, list() for the final format
        
            # Method 2: Going through the cves key:
            return [ v["name"] for v in response.json()["cves"] ]
            
    except requests.RequestException: # In case of network error (timeout, 404...), we return an empty list
        print("Nothing was found on ", anssi_url)
        return []
    
    return []

print(anssi_df["link"][0]) # Example link
print(get_cve(anssi_df["link"][0])) # Testing the function for an ANSSI alert

We just have to apply this function to the entire dataframe to create a CVE column.

In [9]:
anssi_df["cve"] = anssi_df["link"].transform(get_cve) # Returns the list of CVEs

But some publications do not have a CVE. This happens because sometimes ANSSI publishes recommendations on practices or to be vigilant about certain things. Thus, there is no referenced vulnerability with a CVE. This does not mean we should delete them from our dataframe.

In [10]:
anssi_df[anssi_df["cve"].str.len() == 0] # Rows without CVE

It often happens that an ANSSI alert/advisory refers to several CVEs, but in the end, we want a Dataframe that gives information for each CVE vulnerability and not each ANSSI alert/advisory.
That is why we will unpack these CVEs so that each CVE has its own row. Thus, one ANSSI alert can be associated with several CVEs.
To do this, we use the `explode` method of dataframes.

In [11]:
anssi_df = anssi_df.explode("cve")
anssi_df = anssi_df.reset_index(drop=True) # We reset the index because explode "duplicates" the indices. The drop argument removes the old index

Here are the first 5 elements of the ANSSI alert dataframe.

In [12]:
anssi_df.head()

# **III - Enrichment of the ANSSI dataframe with the EPSS score using the EPSS API:**
The EPSS score represents the probability that a vulnerability will be exploited.

In [13]:
epss_api_url = "https://api.first.org/data/v1/epss?cve=" 

def get_epss_data(cve): 
    target_url = epss_api_url + cve
    
    try:
        res = session.get(target_url, timeout=5)

        if res.status_code != 200:
            print(res.status_code)
            return {}

        data = res.json()
        
        epss_data = data.get("data", [])

        if epss_data != []:
            return float(epss_data[0]["epss"]) or np.nan

        return np.nan
    except Exception:
        print("Nothing was found for ", cve)
        return np.nan

print(anssi_df["cve"][0]) # A CVE identifier
print(get_epss_data(anssi_df["cve"][0])) # Testing the function for 1 CVE

In [14]:
anssi_df["epss_score"] = anssi_df["cve"].transform(lambda x: get_epss_data(x) if (pd.notna(x)) else np.nan) # We apply the function only if the CVE is not null

In [15]:
anssi_df.head()

# **IV - Retrieval of MITRE Data:**
We will create a second dataframe containing all the data provided by the MITRE API.

By looking closer at how the MITRE system and its [API](https://cveawg.mitre.org/api-docs/) work, we discovered that there are 3 states for a CVE:
| State | Definition |
| :--- | :--- |
| **`RESERVED`** | A CVE number has been assigned to an organization (a vendor or a researcher), but the details of the vulnerability are not yet public. |
| **`PUBLISHED`** | The vulnerability is official, technical details are available, and the analysis process is complete (or in progress). |
| **`REJECTED`** | The CVE has been cancelled. This happens if the vulnerability was a duplicate of another, if it was a reporting error, or if the vulnerability turned out not to be one. |

The MITRE API `https://cveawg.mitre.org/api/cve/{CVE}` returns the state of the CVE if the CVE is not reserved. If it is reserved, the call returns a 404 error (Page not found).

It is possible to verify the state of the CVE upon a 404 response with `https://cveawg.mitre.org/api/cve-id/{CVE}`. 
However, we will not make these calls for performance reasons and because we are already certain we can filter out rejected and reserved CVEs.
We remove all those rejected because they are no longer relevant and all those reserved because we have no information on the vulnerability, so nothing to report to our end user.

In [16]:
mitre_api_url = "https://cveawg.mitre.org/api/cve/"

def get_mitre_data(cve):
    if pd.isna(cve):
        return {}
    
    target_url = mitre_api_url + cve
    
    try:
        res = session.get(target_url, timeout=5)
        if res.status_code != 200:
            print(f"[ERROR-{res.status_code}] {cve}")

            return {}

        data = res.json()

        # We verify the state of the CVE
        cveMetadata = data.get("cveMetadata", {})
        if cveMetadata != {}:
            if cveMetadata["state"] != "PUBLISHED": # If it is not published, we ignore it
                return {}

        cna = data.get("containers", {}).get("cna", {})
        
        # Secure extraction of the description
        descriptions = cna.get("descriptions", [])
        desc = descriptions[0].get("value", None) if descriptions else None

        # Secure extraction of the CWE
        problem_types = cna.get("problemTypes", [])
        cwe_id = np.nan
        cwe_desc = np.nan
        
        if problem_types:
            # We often take the first listed problem type
            desc_list = problem_types[0].get("descriptions", [])
            if desc_list:
                cwe_id = desc_list[0].get("cweId", np.nan)
                cwe_desc = desc_list[0].get("description", np.nan)

        metrics = cna.get("metrics", [])
        cvss_score = None
        if metrics != []:
            metrics = metrics[0]
            for k in metrics.keys():
                if "cvss" in k.lower():
                    cvss_score = float(metrics[k]["baseScore"])
                    break

        # Construction of the final dictionary
        return {
            "cve": cve,
            "cwe": cwe_id,
            "cwe_desc": cwe_desc,
            "cvss_score": cvss_score,
            "mitre_desc": desc,
            "affected_product": [ # By list comprehension method
                {
                    "vendor": prod.get("vendor"),
                    "product": prod.get("product"),
                    "versions": [v.get("version") for v in prod.get("versions", []) if v.get("status") == "affected"]
                }
                for prod in cna.get("affected", [])
            ]
        }
    except Exception:
        return {}

print(anssi_df["cve"][3])
print(get_mitre_data(anssi_df["cve"][3]))

In [17]:
# Singlethread version
# mitre_data = [ get_mitre_data(cve) for cve in anssi_df["cve"] ] # This execution takes time because it makes requests for each row of the df

In [18]:
"""
This is multithreading which we won't keep in the final version of the notebook because it's not the place for it.
"""

# Multithread version

from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm # To see the progress bar

# 1. We retrieve the list of unique CVEs to avoid unnecessary calls
liste_cves = anssi_df['cve'].unique().tolist()

# 2. Configuration of the number of threads (ex: 10 to 20)
# Too many threads can cause API blocking (Rate Limiting)
MAX_WORKERS = 15 

print(f"Retrieving {len(liste_cves)} CVEs in progress...")

# 3. Parallel execution
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # We use tqdm to follow progress
    mitre_data = list(tqdm(executor.map(get_mitre_data, liste_cves), total=len(liste_cves)))



In [19]:
mitre_df = pd.DataFrame(mitre_data) # We transform our data into a df 
mitre_df = mitre_df.dropna(subset=['cve']) # This line removes rejected and reserved CVEs
mitre_df = mitre_df.reset_index(drop=True)

mitre_df.head() # We display the first 5 rows

By proceeding in this way, only published CVEs that represent a real threat remain in our df. 

# **V - Creation of the alert database:**
We now have all the information to create the final DataFrame with all the information necessary for our application. Here is the structure of the dataframe we want:

| Column Name | Definition & Details |
| :--- | :--- |
| **`anssi_title`** | Title of the ANSSI alert/advisory |
| **`anssi_link`** | Link to the ANSSI alert/advisory |
| **`anssi_desc`** | Description of the ANSSI alert/advisory |
| **`anssi_published`** | Publication date of the ANSSI alert/advisory |
| **`cve`** | Unique identifier of the table, unique number referring to the vulnerability |
| **`epss_score`** | EPSS Score, Probability that the vulnerability is exploited |
| **`cwe`** | The CWE (Common Weakness Enumeration) is a universal classification system that lists security weaknesses in software and hardware. |
| **`cwe_desc`** | CWE Description |
| **`cvss_score`** | CVSS Score, severity of the vulnerability |
| **`mitre_desc`** | Description returned by the Mitre API |
| **`affected_product`** | List of affected products table with vendor name, affected product, and vulnerable versions |


We will create this dataframe by merging mitre_df and anssi_df on the CVE column values.

In [20]:
anssi_df.columns = ['anssi_title', 'anssi_link', 'anssi_desc', 'anssi_published', 'type_publication', 'cve', 'epss_score'] # Renaming column names

DB = anssi_df.merge(mitre_df, on='cve', how='left')

In [21]:
def set_severity(cvss_score):
    if pd.isna(cvss_score):
        return np.nan
    
    if cvss_score >= 9:
        return "Critical"
    elif cvss_score >= 7:
        return "High"
    elif cvss_score >= 4:
        return "Medium"
    else:
        return "Low"

DB["base_severity"] = DB["cvss_score"].transform(set_severity)

In [22]:
DB

---

# **VI - Interpretation, analysis, and visualization of data:**

In [23]:
import matplotlib.pyplot as plt
import seaborn as sns

## 1) Histogram of vulnerability severity:

### *a. Result Formatting:*

In [24]:
# Code for plots remains same but comments would be translated
plt.title("Vulnerability Severity Distribution", fontsize=16, pad=20, fontweight='bold')
plt.xlabel("")
plt.ylabel("Occurrence")

### *c. Interpretation:*
We noted three observations:

- **Patching Priority**: Remediation efforts must focus on Red Hat / Linux, which not only suffers the most bulletins but also the most technically varied vulnerabilities.

- **Risk Vector #1**: Availability (DoS via resources) is the most frequent risk, but Memory Corruption is the deepest risk (often allowing remote code execution).

- **Data Observation**: It is interesting to note that the correlation matrix displays figures (e.g., 75 for Red Hat/CWE-22) much higher than the global occurrences graph. This suggests that a single bulletin can contain many CVEs, thus multiplying the actual impact per vulnerability type.