In [1]:
pip install feedparser stix2

Collecting feedparser
  Downloading feedparser-6.0.11-py3-none-any.whl.metadata (2.4 kB)
Collecting stix2
  Downloading stix2-3.0.1-py2.py3-none-any.whl.metadata (10 kB)
Collecting sgmllib3k (from feedparser)
  Downloading sgmllib3k-1.0.0.tar.gz (5.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting simplejson (from stix2)
  Downloading simplejson-3.19.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.2 kB)
Collecting stix2-patterns>=1.2.0 (from stix2)
  Downloading stix2_patterns-2.0.0-py2.py3-none-any.whl.metadata (8.3 kB)
Collecting antlr4-python3-runtime~=4.9.0 (from stix2-patterns>=1.2.0->stix2)
  Downloading antlr4-python3-runtime-4.9.3.tar.gz (117 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m117.0/117.0 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading feedparser-6.0.11-py3-none-any.whl (81 kB)
[2K   [90m━━━━━

In [35]:
import feedparser
import json
import uuid
from stix2 import Bundle, IntrusionSet
from datetime import datetime, timezone
import requests
import logging

# Configure logging
logging.basicConfig(
    filename="rss_scraper_errors.log",
    level=logging.ERROR,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

def log_error(feed_url, message):
    """
    Log an error message to the log file.
    """
    logging.error(f"Feed: {feed_url} - {message}")

def fetch_feed_content(feed_url):
    """
    Fetch the content of an RSS feed.

    Args:
        feed_url (str): The URL of the RSS feed.

    Returns:
        str: The raw feed content or None if fetching fails.
    """
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    try:
        response = requests.get(feed_url, headers=headers)
        response.encoding = 'utf-8'  # Ensure proper encoding
        if response.status_code != 200:
            log_error(feed_url, f"HTTP Status Code: {response.status_code}")
            print(f"Failed to fetch feed: {feed_url}. HTTP Status: {response.status_code}")
            return None
        return response.text
    except Exception as e:
        log_error(feed_url, f"Exception during fetch: {e}")
        print(f"Error fetching feed: {feed_url}. Exception: {e}")
        return None

def parse_feed(feed_content):
    """
    Parse the content of an RSS feed.

    Args:
        feed_content (str): The raw feed content.

    Returns:
        dict: The parsed feed or None if parsing fails.
    """
    try:
        return feedparser.parse(feed_content)
    except Exception as e:
        print(f"Error parsing feed content. Exception: {e}")
        return None

def retry_feed_parsing(feed_url, retries=3):
    """
    Retry fetching and parsing an RSS feed.

    Args:
        feed_url (str): The URL of the RSS feed.
        retries (int): The number of retry attempts.

    Returns:
        dict: The parsed feed or None if all retries fail.
    """
    for attempt in range(retries):
        print(f"Attempt {attempt + 1} to fetch and parse feed: {feed_url}")
        feed_content = fetch_feed_content(feed_url)
        if feed_content:
            feed = parse_feed(feed_content)
            if feed and not feed.bozo:  # Ensure the feed is valid
                return feed
        print(f"Retrying feed: {feed_url}")
    log_error(feed_url, "Failed to fetch or parse after retries.")
    print(f"Failed to parse feed after {retries} attempts: {feed_url}")
    return None

def scrape_rss_feed_to_stix(feed_url):
    """
    Scrape an RSS feed and transform the data into STIX 2.1 JSON format.

    Args:
        feed_url (str): The URL of the RSS feed.

    Returns:
        str: JSON string in STIX format or None if parsing fails.
    """
    feed = retry_feed_parsing(feed_url)
    if not feed:
        return None

    # Collect STIX objects
    stix_objects = []

    for entry in feed.entries:
        intrusion_set_id = f"intrusion-set--{uuid.uuid4()}"

        # Parse published date
        published_date = entry.get("published", None)
        if published_date:
            try:
                published_date = datetime.strptime(published_date, "%a, %d %b %Y %H:%M:%S %z")
            except ValueError:
                try:
                    published_date = datetime.strptime(published_date, "%a, %d %b %Y %H:%M:%S +0000")
                except ValueError:
                    print(f"Failed to parse date for entry: {entry.title}")
                    continue

            published_date = published_date.astimezone(timezone.utc)

            intrusion_set = IntrusionSet(
                id=intrusion_set_id,
                name=entry.title,
                description=entry.get("summary", "No description available."),
                first_seen=published_date.isoformat().replace("+00:00", "Z"),
                created=published_date.isoformat().replace("+00:00", "Z"),
                modified=published_date.isoformat().replace("+00:00", "Z"),
                resource_level="unknown",
                primary_motivation="unknown",
                aliases=[]
            )
            stix_objects.append(intrusion_set)

    # Create a STIX Bundle
    stix_bundle = Bundle(objects=stix_objects)
    return stix_bundle.serialize(pretty=True)

def process_multiple_feeds(feed_urls):
    """
    Process multiple RSS feeds and save STIX bundles for each feed.

    Args:
        feed_urls (list): List of RSS feed URLs.
    """
    for feed_url in feed_urls:
        print(f"Processing feed: {feed_url}")
        stix_json = scrape_rss_feed_to_stix(feed_url)
        if stix_json:
            domain = feed_url.split("//")[-1].split("/")[0]
            filename = f"stix_bundle_{domain}.json"
            with open(filename, "w") as json_file:
                json_file.write(stix_json)
            print(f"STIX bundle saved to '{filename}'.")

if __name__ == "__main__":
    # List of RSS feed URLs
    rss_feed_urls = [
        "https://feeds.feedburner.com/TheHackersNews?format=xml",
        "https://www.wired.com/feed/category/security/latest/rss",
        "https://www.bleepingcomputer.com/feed/"
    ]

    process_multiple_feeds(rss_feed_urls)


Processing feed: https://feeds.feedburner.com/TheHackersNews?format=xml
Attempt 1 to fetch and parse feed: https://feeds.feedburner.com/TheHackersNews?format=xml
STIX bundle saved to 'stix_bundle_feeds.feedburner.com.json'.
Processing feed: https://www.wired.com/feed/category/security/latest/rss
Attempt 1 to fetch and parse feed: https://www.wired.com/feed/category/security/latest/rss
STIX bundle saved to 'stix_bundle_www.wired.com.json'.
Processing feed: https://www.bleepingcomputer.com/feed/
Attempt 1 to fetch and parse feed: https://www.bleepingcomputer.com/feed/
STIX bundle saved to 'stix_bundle_www.bleepingcomputer.com.json'.


rss_scraper/
├── __init__.py
├── config.py             
├── main.py               
├── scraper.py            
├── stix_converter.py     
├── tests/                
│   ├── __init__.py
│   ├── test_scraper.py   
│   ├── test_stix_converter.py  
├── logs/                 
│   ├── rss_scraper_errors.log
├── stix_bundles/         
├── requirements.txt      
├── setup.py              


In [None]:
# scraper.py:

#Handles all RSS-related functionality, including fetching, parsing, and retry logic.

In [None]:
import requests
import feedparser
import logging

# Configure logging
logging.basicConfig(
    filename="logs/rss_scraper_errors.log",
    level=logging.ERROR,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

def fetch_feed_content(feed_url):
    """Fetch the raw content of an RSS feed.

    Makes an HTTP GET request to the given RSS feed URL and retrieves the content as text.
    Handles HTTP status codes and logs errors for failed requests.

    Args:
        feed_url (str): The URL of the RSS feed.

    Returns:
        str: The raw feed content as a string if the request is successful.
        None: If the request fails or an exception occurs.
    """
    headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
    try:
        response = requests.get(feed_url, headers=headers)
        response.encoding = 'utf-8'
        if response.status_code != 200:
            logging.error(f"Feed: {feed_url} - HTTP Status Code: {response.status_code}")
            return None
        return response.text
    except Exception as e:
        logging.error(f"Feed: {feed_url} - Exception: {e}")
        return None

def parse_feed(feed_content):
    """Parse RSS feed content into a structured format.

    Uses the `feedparser` library to parse the raw RSS feed content into a structured object.

    Args:
        feed_content (str): The raw feed content as a string.

    Returns:
        feedparser.FeedParserDict: A structured object representing the parsed RSS feed.
        None: If parsing fails or an exception occurs.
    """
    try:
        return feedparser.parse(feed_content)
    except Exception as e:
        logging.error(f"Exception parsing feed content: {e}")
        return None

def retry_feed_parsing(feed_url, retries=3):
    """Retry fetching and parsing an RSS feed multiple times.

    Attempts to fetch and parse the RSS feed content up to the specified number of retries.
    Logs each failed attempt and returns the parsed feed on success.

    Args:
        feed_url (str): The URL of the RSS feed.
        retries (int, optional): The maximum number of retry attempts. Defaults to 3.

    Returns:
        feedparser.FeedParserDict: A structured object representing the parsed RSS feed on success.
        None: If all retries fail.
    """
    for attempt in range(retries):
        feed_content = fetch_feed_content(feed_url)
        if feed_content:
            feed = parse_feed(feed_content)
            if feed and not feed.bozo:  # Ensure the feed is valid
                return feed
        logging.error(f"Retry {attempt + 1} failed for feed: {feed_url}")
    return None


In [None]:
# stix_converter.py:

#Handles conversion of RSS data into STIX format.

In [None]:
import uuid
from datetime import datetime, timezone
from stix2 import Bundle, IntrusionSet

def convert_entry_to_stix(entry):
    """Convert an RSS feed entry to a STIX IntrusionSet object.

    Parses the provided RSS feed entry and converts it into a STIX `IntrusionSet` object.
    The function generates a unique identifier and processes the published date.

    Args:
        entry (dict): A dictionary representing an RSS feed entry, typically containing
                      fields like 'title', 'summary', and 'published'.

    Returns:
        stix2.IntrusionSet: A STIX IntrusionSet object containing the converted entry data.
        None: If the entry does not contain a valid published date.
    """
    intrusion_set_id = f"intrusion-set--{uuid.uuid4()}"
    published_date = entry.get("published", None)

    if published_date:
        try:
            published_date = datetime.strptime(published_date, "%a, %d %b %Y %H:%M:%S %z")
        except ValueError:
            return None
        published_date = published_date.astimezone(timezone.utc)

    return IntrusionSet(
        id=intrusion_set_id,
        name=entry.get("title", "Unknown Title"),
        description=entry.get("summary", "No description available."),
        first_seen=published_date.isoformat().replace("+00:00", "Z"),
        created=published_date.isoformat().replace("+00:00", "Z"),
        modified=published_date.isoformat().replace("+00:00", "Z"),
        resource_level="unknown",
        primary_motivation="unknown",
        aliases=[]
    )

def convert_feed_to_stix(feed):
    """Convert an entire RSS feed into a STIX Bundle.

    Processes all entries in the provided RSS feed and converts them into STIX `IntrusionSet` objects.
    Combines all the converted objects into a single STIX `Bundle`.

    Args:
        feed (feedparser.FeedParserDict): A parsed RSS feed object, typically containing
                                          a list of entries in `feed.entries`.

    Returns:
        str: A serialized STIX Bundle containing all the converted entries in JSON format.
    """
    stix_objects = [
        convert_entry_to_stix(entry) for entry in feed.entries if entry
    ]
    return Bundle(objects=stix_objects).serialize(pretty=True)


In [None]:
# config.py:

#Stores program configurations such as RSS feed URLs and retry settings.

In [None]:
# Configuration for RSS feeds
RSS_FEEDS = [
    "https://feeds.feedburner.com/TheHackersNews?format=xml",
    "https://www.wired.com/feed/category/security/latest/rss",
    "https://www.bleepingcomputer.com/feed/"
]

# Retry settings
RETRY_COUNT = 3


In [None]:
# main.py:

#The entry point for the program that coordinates fetching, parsing, and saving data.

In [None]:
import os
from scraper import retry_feed_parsing
from stix_converter import convert_feed_to_stix
from config import RSS_FEEDS, RETRY_COUNT

def save_stix_to_file(stix_json, filename):
    """Save a STIX JSON string to a file.

    Creates the output directory if it does not exist, and writes the STIX JSON
    string to a specified file.

    Args:
        stix_json (str): The serialized STIX JSON string to save.
        filename (str): The name of the file to save the JSON content.

    Side Effects:
        Creates the directory `stix_bundles` if it doesn't exist.
        Writes the STIX JSON string to the specified file.

    Example:
        >>> save_stix_to_file(stix_json, "stix_bundle_example.json")
        STIX bundle saved to stix_bundles/stix_bundle_example.json
    """
    output_dir = "stix_bundles"
    os.makedirs(output_dir, exist_ok=True)
    filepath = os.path.join(output_dir, filename)
    with open(filepath, "w") as f:
        f.write(stix_json)
    print(f"STIX bundle saved to {filepath}")

def process_feeds():
    """Process a list of RSS feeds, convert them to STIX bundles, and save to files.

    Iterates through the RSS feed URLs defined in the `RSS_FEEDS` configuration,
    attempts to parse each feed, converts the feed content to a STIX bundle, and
    saves the bundle to a file.

    Uses the `retry_feed_parsing` function to handle retry logic for fetching and
    parsing feeds. The STIX bundles are saved using the `save_stix_to_file` function.

    Side Effects:
        Writes STIX JSON bundles to files in the `stix_bundles` directory.

    Example:
        If `RSS_FEEDS` contains the following URLs:
        - "https://feeds.feedburner.com/TheHackersNews?format=xml"
        - "https://www.bleepingcomputer.com/feed/"

        The corresponding files will be saved as:
        - `stix_bundles/stix_bundle_feeds.feedburner.com.json`
        - `stix_bundles/stix_bundle_www.bleepingcomputer.com.json`
    """
    for feed_url in RSS_FEEDS:
        print(f"Processing feed: {feed_url}")
        feed = retry_feed_parsing(feed_url, RETRY_COUNT)
        if feed:
            stix_json = convert_feed_to_stix(feed)
            domain = feed_url.split("//")[-1].split("/")[0]
            save_stix_to_file(stix_json, f"stix_bundle_{domain}.json")
        else:
            print(f"Failed to process feed: {feed_url}")

if __name__ == "__main__":
    process_feeds()

In [None]:
# Add a setup.py file for packaging.

In [None]:
from setuptools import setup, find_packages

setup(
    name="rss_scraper",
    version="1.0.0",
    packages=find_packages(),
    install_requires=[
        "feedparser",
        "stix2",
        "requests",
    ],
    entry_points={
        "console_scripts": [
            "rss-scraper=main:process_feeds",
        ]
    },
)

In [None]:
#Test for parse_feed

#This test ensures that the parse_feed function correctly parses valid RSS feed
#content and handles invalid content gracefully.

#tests/test_scraper.py

In [None]:
import unittest
from scraper import parse_feed

class TestParseFeed(unittest.TestCase):
    """Unit tests for the `parse_feed` function.

    This test suite verifies that the `parse_feed` function correctly parses valid
    RSS feed content into a structured object and handles invalid content gracefully.
    """

    def test_parse_valid_feed(self):
        """Test parsing of a valid RSS feed.

        Verifies that `parse_feed` successfully parses a well-formed RSS feed string
        and extracts the correct feed and entry information.

        Example:
            Feed Content:
            - Title: "Example Feed"
            - Entry Title: "Test Entry"
            - Entry Link: "https://example.com/test-entry"
            - Entry Description: "Test Description"
            - Entry Publication Date: "Mon, 01 Jan 2024 10:00:00 +0000"

        Assertions:
            - The parsed feed object is not `None`.
            - The feed's title matches the expected value.
            - The number of entries matches the expected value.
            - The first entry's title matches the expected value.
        """
        valid_feed_content = """<?xml version="1.0" encoding="UTF-8" ?>
        <rss version="2.0">
            <channel>
                <title>Example Feed</title>
                <item>
                    <title>Test Entry</title>
                    <link>https://example.com/test-entry</link>
                    <description>Test Description</description>
                    <pubDate>Mon, 01 Jan 2024 10:00:00 +0000</pubDate>
                </item>
            </channel>
        </rss>"""
        parsed_feed = parse_feed(valid_feed_content)
        self.assertIsNotNone(parsed_feed)
        self.assertEqual(parsed_feed.feed.title, "Example Feed")
        self.assertEqual(len(parsed_feed.entries), 1)
        self.assertEqual(parsed_feed.entries[0].title, "Test Entry")

    def test_parse_invalid_feed(self):
        """Test parsing of an invalid RSS feed.

        Verifies that `parse_feed` correctly identifies and handles invalid RSS feed
        content by returning `None`.

        Example:
            Feed Content:
            - "<rss><invalid></invalid></rss>"

        Assertions:
            - The parsed feed object is `None`.
        """
        invalid_feed_content = "<rss><invalid></invalid></rss>"
        parsed_feed = parse_feed(invalid_feed_content)
        self.assertIsNone(parsed_feed)

if __name__ == "__main__":
    unittest.main()



In [None]:
#Test for convert_entry_to_stix

#This test ensures that convert_entry_to_stix correctly converts RSS entries
#into STIX objects.

#File: tests/test_stix_converter.py

In [None]:
import unittest
from stix_converter import convert_entry_to_stix

class TestConvertEntryToStix(unittest.TestCase):
    """Unit tests for the `convert_entry_to_stix` function.

    This test suite verifies that the `convert_entry_to_stix` function correctly
    converts RSS feed entries into STIX IntrusionSet objects and handles edge cases
    such as missing required fields.
    """

    def test_convert_valid_entry(self):
        """Test conversion of a valid RSS entry to a STIX object.

        Verifies that a properly formatted RSS entry is converted into a valid STIX
        IntrusionSet object with correctly mapped fields.

        Example:
            Entry:
            - Title: "Test Entry"
            - Summary: "This is a test entry."
            - Published: "Mon, 01 Jan 2024 10:00:00 +0000"
            - Link: "https://example.com/test-entry"

        Assertions:
            - The returned STIX object is not `None`.
            - The object's name matches the entry's title.
            - The object's description matches the entry's summary.
            - The object's `first_seen` timestamp is in ISO 8601 format with a "Z" suffix.
        """
        entry = {
            "title": "Test Entry",
            "summary": "This is a test entry.",
            "published": "Mon, 01 Jan 2024 10:00:00 +0000",
            "link": "https://example.com/test-entry",
        }
        stix_object = convert_entry_to_stix(entry)
        self.assertIsNotNone(stix_object)
        self.assertEqual(stix_object.name, "Test Entry")
        self.assertEqual(stix_object.description, "This is a test entry.")
        self.assertTrue(stix_object.first_seen.endswith("Z"))

    def test_convert_entry_missing_published_date(self):
        """Test conversion of an RSS entry missing a published date.

        Verifies that `convert_entry_to_stix` returns `None` when the RSS entry does
        not contain a valid `published` field.

        Example:
            Entry:
            - Title: "Test Entry Without Date"
            - Summary: "No date provided."
            - Published: (missing)

        Assertions:
            - The returned STIX object is `None`.
        """
        entry = {
            "title": "Test Entry Without Date",
            "summary": "No date provided.",
        }
        stix_object = convert_entry_to_stix(entry)
        self.assertIsNone(stix_object)

if __name__ == "__main__":
    unittest.main()


In [None]:
# requirements.txt

feedparser==6.0.10      # For parsing RSS feeds
stix2==3.0.0           # For creating STIX-compliant objects
requests==2.31.0       # For making HTTP requests to fetch RSS feed content
unittest-xml-reporting==3.2.0  # For enhanced unittest reporting (optional)
