In [1]:
#Required Libraries
import requests
import feedparser
from pymongo import MongoClient
from datetime import datetime
import time
import schedule

In [2]:
# ==== MongoDB Setup ====
try:
    client = MongoClient("mongodb+srv://rskissan:HZIXkw1D5XOUxaS2@osintunctruc.p5itk5s.mongodb.net/?retryWrites=true&w=majority")
    db = client["osint_db"]
    
    # Collections for each source
    newsapi_collection = db["newsapi_data"]
    reddit_collection = db["reddit_data"]
    rss_collection = db["rss_data"]
    rapidapi_collection = db["rapidapi_data"]
    
    print(" Connected to MongoDB successfully")
except Exception as e:
    print(f" Error connecting to MongoDB: {e}")


 Connected to MongoDB successfully


In [3]:
keywords = [
    "cybersecurity", "infosec", "threat intelligence", "vulnerability", "exploit", "zero-day", "APT",
    "penetration testing", "network security", "firewall breach", "incident response",
    "ransomware", "phishing", "DDoS", "SQL injection", "man-in-the-middle attack",
    "social engineering", "supply chain attack", "malvertising", "brute force attack", 
    "credential stuffing", "malware", "spyware", "adware", "trojan", "worm", "rootkit", 
    "keylogger", "RAT", "ICS malware", "botnet", "backdoor", "CVE", "Indicators of Compromise", 
    "IOC", "MD5 hash", "SHA256 hash", "IP address blacklist", "malicious domains", "malicious URLs",
    "threat actor", "hacker group", "APT group", "state-sponsored attack", "hacktivist", 
    "cyber espionage", "black hat", "white hat", "grey hat", "deepfake", 
    "AI-generated malware", "zero-trust architecture", "cloud security breach", "IoT security", 
    "blockchain security", "quantum computing threat", "Metasploit", "Burp Suite", "Nmap", 
    "Wireshark", "Cobalt Strike", "Shodan", "VirusTotal", "AbuseIPDB", "ThreatCrowd"
]


In [4]:
NEWSAPI_KEY = "a287317e0ca94d568dd9fb178361187d"

def fetch_and_store_news(keyword):
    try:
        logging.info(f"Starting NewsAPI collection for keyword: {keyword}")
        url = f"https://newsapi.org/v2/everything?q={keyword}&apiKey={NEWSAPI_KEY}&pageSize=100"
        
        response = requests.get(url)
        response.raise_for_status()
        
        articles = response.json().get("articles", [])
        new_records = 0
        
        for article in articles:
            doc = {
                "source": "newsapi",
                "timestamp": article.get("publishedAt"),
                "text": article.get("title", "") + ". " + (article.get("description", "") or ""),
                "meta": {
                    "author": article.get("author"),
                    "source_name": article.get("source", {}).get("name"),
                    "url": article.get("url"),
                    "query_used": keyword
                },
                "fetched_at": datetime.utcnow().isoformat()
            }
            
            # Check for duplicate entries using URL
            if newsapi_collection.count_documents({"meta.url": article.get("url")}) == 0:
                newsapi_collection.insert_one(doc)
                new_records += 1
        
        logging.info(f" Successfully collected {new_records} new articles for keyword: {keyword}")

    except Exception as e:
        logging.error(f" Error collecting data from NewsAPI for keyword '{keyword}': {e}")

In [5]:
import praw
from datetime import datetime

# ==== PRAW Reddit API Authentication ====
try:
    reddit = praw.Reddit(
        client_id="6abqWObNapOZUTboI38SMA",
        client_secret="jvqQN0e285EOhw3pFWRkMsV7BN_lZA",
        user_agent="osint_tool_v1"
    )
    print(" Connected to Reddit successfully")
except Exception as e:
    print(f" Error connecting to Reddit: {e}")

# ==== Reddit Subreddits to Monitor ====
subreddits_to_monitor = ["netsec", "cybersecurity", "hacking", "blueteamsec", "malware"]

# ==== PRAW Data Collection ====
def collect_from_praw():
    try:
        for subreddit_name in subreddits_to_monitor:
            subreddit = reddit.subreddit(subreddit_name)
            for post in subreddit.new(limit=100):  # Fetch latest 100 posts per subreddit
                doc = {
                    "source": "reddit_praw",
                    "timestamp": datetime.utcfromtimestamp(post.created_utc).isoformat(),
                    "text": post.title + ". " + (post.selftext or ""),
                    "meta": {
                        "author": post.author.name if post.author else "N/A",
                        "url": post.url,
                        "subreddit": post.subreddit.display_name,
                        "score": post.score,
                        "num_comments": post.num_comments,
                        "post_id": post.id
                    },
                    "fetched_at": datetime.utcnow().isoformat()
                }

                # Check if the post already exists in the collection
                if reddit_collection.count_documents({"meta.post_id": post.id}) == 0:
                    reddit_collection.insert_one(doc)
                    print(f" New post saved from subreddit: {subreddit_name}")

    except Exception as e:
        print(f" Error collecting data from PRAW: {e}")

#collect_from_praw()


 Connected to Reddit successfully


In [6]:
rss_urls = ["https://krebsonsecurity.com/feed/", "https://threatpost.com/feed/"]

def collect_from_rss():
    try:
        for url in rss_urls:
            feed = feedparser.parse(url)
            for entry in feed.entries:
                # Check if the entry already exists based on the link
                if rss_collection.count_documents({"meta.url": entry.link}) == 0:
                    doc = {
                        "source": "rss",
                        "timestamp": entry.published,
                        "text": entry.title + ". " + entry.summary,
                        "meta": {
                            "author": entry.get("author", "N/A"),
                            "url": entry.link
                        },
                        "fetched_at": datetime.utcnow().isoformat()
                    }
                    rss_collection.insert_one(doc)
                    logging.info(f" New RSS article inserted: {entry.title}")
                else:
                    logging.info(f" Skipped duplicate RSS article: {entry.title}")

        print("RSS data collection complete.")
        
    except Exception as e:
        logging.error(f" Error collecting data from RSS feeds: {e}")



In [7]:
import logging
from datetime import datetime, timedelta

def collect_from_rapidapi(ioc_type, ioc_value):
    try:
        # Skip if the same IOC was collected in the last 24h
        existing = rapidapi_collection.find_one({
            "ioc_type": ioc_type,
            "ioc_value": ioc_value,
            "timestamp": {"$gte": (datetime.utcnow() - timedelta(hours=24)).isoformat()}
        })
        if existing:
            logging.info(f" Skipping {ioc_type}:{ioc_value} (already collected within 24h)")
            return

        url_map = {
            "ip": "https://ioc-search.p.rapidapi.com/rapid/v1/ioc/search/ip",
            "domain": "https://ioc-search.p.rapidapi.com/rapid/v1/ioc/search/domain",
            "url": "https://ioc-search.p.rapidapi.com/rapid/v1/ioc/search/url",
            "hash": "https://ioc-search.p.rapidapi.com/rapid/v1/ioc/search/hash"
        }
        url = url_map.get(ioc_type)
        if not url:
            logging.warning(f" Invalid IOC type: {ioc_type}")
            return

        headers = {
            "x-rapidapi-key": "0ea5fe4d7fmsh0b2b8346715525cp1263f3jsn707750ed0fb2",
            "x-rapidapi-host": "ioc-search.p.rapidapi.com"
        }

        response = requests.get(url, headers=headers, params={"query": ioc_value})
        response.raise_for_status()
        data = response.json()

        if not data.get("data"):
            logging.info(f" No new data for {ioc_type}: {ioc_value}")
            return

        doc = {
            "source": "rapidapi",
            "ioc_type": ioc_type,
            "ioc_value": ioc_value,
            "timestamp": datetime.utcnow().isoformat(),
            "data": data,
            "fetched_at": datetime.utcnow().isoformat()
        }

        rapidapi_collection.insert_one(doc)
        logging.info(f" Stored new data for {ioc_type}: {ioc_value}")

    except Exception as e:
        logging.error(f" Error collecting {ioc_type}:{ioc_value} → {e}")


In [8]:
import schedule
import time
import logging
from datetime import datetime
import os

# ==== Setting Up Logging Properly ====

# Clear previous handlers if they exist
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Define the log file path
log_file_path = os.path.join(os.getcwd(), 'pipeline_logs.log')

# Apply logging configuration with proper flushing
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file_path, mode='a'),
        logging.StreamHandler()  # Outputs to console too
    ]
)

def log_start(function_name):
    logging.info(f"Starting function: {function_name}")
    print(f" Starting function: {function_name} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

def log_end(function_name):
    logging.info(f"Finished function: {function_name}")
    print(f"✅ Finished function: {function_name} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# === List of IOC Types and Values to Collect ===
ioc_indicators = [
    ("ip", "117.131.215.118"),
    ("domain", "example.com"),
    ("url", "https://malicious-site.com"),
    ("hash", "44d88612fea8a8f36de82e1278abb02f")
]

def run_newsapi_fetch():
    try:
        log_start("NewsAPI Daily Fetch")
        for keyword in keywords:
            fetch_and_store_news(keyword)
        log_end("NewsAPI Daily Fetch")
    except Exception as e:
        logging.error(f" Error in run_newsapi_fetch: {e}")

# === IOC Collection - Hourly Job
def run_others():
    try:
        log_start("IOC Hourly Collection")

        log_start("collect_from_praw")
        collect_from_praw()
        log_end("collect_from_praw")

        log_start("collect_from_rss")
        collect_from_rss()
        log_end("collect_from_rss")

        for ioc_type, ioc_value in ioc_indicators:
            log_start(f"collect_from_rapidapi ({ioc_type}: {ioc_value})")
            collect_from_rapidapi(ioc_type, ioc_value)
            log_end(f"collect_from_rapidapi ({ioc_type}: {ioc_value})")

        log_end("IOC Hourly Collection")
    except Exception as e:
        logging.error(f" Error in run_ioc_collection: {e}")


# === Run Immediately Once at Startup
logging.info("Running startup fetch (NewsAPI + Others)...")
run_newsapi_fetch()
run_others()
# === Scheduler Rules
schedule.every().day.at("00:00").do(run_newsapi_fetch)  # Daily at midnight
schedule.every().hour.do(run_others)             # ⏱Hourly

# === Start Loop
while True:
    try:
        schedule.run_pending()
        time.sleep(60)
    except Exception as e:
        logging.error(f"Error in scheduler loop: {e}")
        print(f" Error in scheduler loop: {e}")


2025-04-09 22:55:26,445 - INFO - Running startup fetch (NewsAPI + Others)...
2025-04-09 22:55:26,449 - INFO - Starting function: NewsAPI Daily Fetch
2025-04-09 22:55:26,454 - INFO - Starting NewsAPI collection for keyword: cybersecurity


 Starting function: NewsAPI Daily Fetch at 2025-04-09 22:55:26


2025-04-09 22:55:29,762 - INFO -  Successfully collected 0 new articles for keyword: cybersecurity
2025-04-09 22:55:29,768 - INFO - Starting NewsAPI collection for keyword: infosec
2025-04-09 22:55:32,796 - INFO -  Successfully collected 2 new articles for keyword: infosec
2025-04-09 22:55:32,803 - INFO - Starting NewsAPI collection for keyword: threat intelligence
2025-04-09 22:55:35,903 - INFO -  Successfully collected 3 new articles for keyword: threat intelligence
2025-04-09 22:55:35,910 - INFO - Starting NewsAPI collection for keyword: vulnerability
2025-04-09 22:55:38,436 - INFO -  Successfully collected 2 new articles for keyword: vulnerability
2025-04-09 22:55:38,441 - INFO - Starting NewsAPI collection for keyword: exploit
2025-04-09 22:55:41,492 - INFO -  Successfully collected 3 new articles for keyword: exploit
2025-04-09 22:55:41,498 - INFO - Starting NewsAPI collection for keyword: zero-day
2025-04-09 22:55:44,490 - INFO -  Successfully collected 6 new articles for keywor

✅ Finished function: NewsAPI Daily Fetch at 2025-04-09 22:56:52
 Starting function: IOC Hourly Collection at 2025-04-09 22:56:52
 Starting function: collect_from_praw at 2025-04-09 22:56:52


2025-04-09 22:57:06,728 - INFO - Finished function: collect_from_praw
2025-04-09 22:57:06,732 - INFO - Starting function: collect_from_rss


✅ Finished function: collect_from_praw at 2025-04-09 22:57:06
 Starting function: collect_from_rss at 2025-04-09 22:57:06


2025-04-09 22:57:07,414 - INFO -  Skipped duplicate RSS article: Patch Tuesday, April 2025 Edition
2025-04-09 22:57:07,437 - INFO -  Skipped duplicate RSS article: Cyber Forensic Expert in 2,000+ Cases Faces FBI Probe
2025-04-09 22:57:07,464 - INFO -  Skipped duplicate RSS article: How Each Pillar of the 1st Amendment is Under Attack
2025-04-09 22:57:07,489 - INFO -  Skipped duplicate RSS article: When Getting Phished Puts You in Mortal Danger
2025-04-09 22:57:07,515 - INFO -  Skipped duplicate RSS article: Arrests in Tap-to-Pay Scheme Powered by Phishing
2025-04-09 22:57:07,541 - INFO -  Skipped duplicate RSS article: DOGE to Fired CISA Staff: Email Us Your Personal Data
2025-04-09 22:57:07,568 - INFO -  Skipped duplicate RSS article: ClickFix: How to Infect Your PC in Three Easy Steps
2025-04-09 22:57:07,607 - INFO -  Skipped duplicate RSS article: Microsoft: 6 Zero-Days in March 2025 Patch Tuesday
2025-04-09 22:57:07,638 - INFO -  Skipped duplicate RSS article: Alleged Co-Founder of

RSS data collection complete.
✅ Finished function: collect_from_rss at 2025-04-09 22:57:08
 Starting function: collect_from_rapidapi (ip: 117.131.215.118) at 2025-04-09 22:57:08
✅ Finished function: collect_from_rapidapi (ip: 117.131.215.118) at 2025-04-09 22:57:08
 Starting function: collect_from_rapidapi (domain: example.com) at 2025-04-09 22:57:08


2025-04-09 22:57:08,671 - INFO -  Skipping domain:example.com (already collected within 24h)
2025-04-09 22:57:08,674 - INFO - Finished function: collect_from_rapidapi (domain: example.com)
2025-04-09 22:57:08,676 - INFO - Starting function: collect_from_rapidapi (url: https://malicious-site.com)
2025-04-09 22:57:08,714 - INFO -  Skipping url:https://malicious-site.com (already collected within 24h)
2025-04-09 22:57:08,719 - INFO - Finished function: collect_from_rapidapi (url: https://malicious-site.com)
2025-04-09 22:57:08,724 - INFO - Starting function: collect_from_rapidapi (hash: 44d88612fea8a8f36de82e1278abb02f)


✅ Finished function: collect_from_rapidapi (domain: example.com) at 2025-04-09 22:57:08
 Starting function: collect_from_rapidapi (url: https://malicious-site.com) at 2025-04-09 22:57:08
✅ Finished function: collect_from_rapidapi (url: https://malicious-site.com) at 2025-04-09 22:57:08
 Starting function: collect_from_rapidapi (hash: 44d88612fea8a8f36de82e1278abb02f) at 2025-04-09 22:57:08


2025-04-09 22:57:08,911 - INFO -  Skipping hash:44d88612fea8a8f36de82e1278abb02f (already collected within 24h)
2025-04-09 22:57:08,917 - INFO - Finished function: collect_from_rapidapi (hash: 44d88612fea8a8f36de82e1278abb02f)
2025-04-09 22:57:08,923 - INFO - Finished function: IOC Hourly Collection


✅ Finished function: collect_from_rapidapi (hash: 44d88612fea8a8f36de82e1278abb02f) at 2025-04-09 22:57:08
✅ Finished function: IOC Hourly Collection at 2025-04-09 22:57:08


KeyboardInterrupt: 