# ETL Pipeline for Cyber Threat Intelligence Aggregation

Data Extraction from AbuseIPDB and VirusTotal APIs

In [16]:
import requests
import logging

In [17]:
# Direct API keys (from AbuseIPDB API and VirusTotal API)
ABUSEIPDB_API_KEY = '7293c9adfec5ae491651890e2ec3ddc43d4f8f3474051fc6c20a442aa27237a1ae53814a1446f145'  
VIRUSTOTAL_API_KEY = 'dd4f9d70190ae332a2749b2ef0413d9b15c6300ff711fc7c124cb6d417ff57dc'

In [18]:
# Set up logging
logging.basicConfig(level=logging.INFO)

In [19]:
# Function to fetch data from AbuseIPDB
def fetch_abuseipdb_data(ip):
    url = f'https://api.abuseipdb.com/api/v2/check'
    headers = {
        'Key': ABUSEIPDB_API_KEY,
        'Accept': 'application/json'
    }
    params = {
        'ipAddress': ip,
        'maxAgeInDays': '90'  
    }

    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()  
        data = response.json()  
        return data
    except requests.exceptions.RequestException as e:
        logging.error(f"Error fetching data from AbuseIPDB: {e}")
        return None

In [20]:
# Function to fetch data from VirusTotal
def fetch_virustotal_data(ip):
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
    headers = {
        'x-apikey': VIRUSTOTAL_API_KEY
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  
        data = response.json()  
        return data
    except requests.exceptions.RequestException as e:
        logging.error(f"Error fetching data from VirusTotal: {e}")
        return None

In [21]:
# Check the fetching data using random IP address
def fetch_and_process_data(ip):
    logging.info(f"Fetching data for IP: {ip}")
    abuseipdb_data = fetch_abuseipdb_data(ip)
    virustotal_data = fetch_virustotal_data(ip)

    if not abuseipdb_data or not virustotal_data:
        logging.warning(f"Skipping IP {ip} due to API errors.")
        return None

    # Print or process the fetched data
    if abuseipdb_data:
        logging.info("AbuseIPDB Data: %s", abuseipdb_data)

    if virustotal_data:
        logging.info("VirusTotal Data: %s", virustotal_data)

    return abuseipdb_data, virustotal_data

# Random IP address
fetch_and_process_data('8.8.8.8')


INFO:root:Fetching data for IP: 8.8.8.8
INFO:root:AbuseIPDB Data: {'data': {'ipAddress': '8.8.8.8', 'isPublic': True, 'ipVersion': 4, 'isWhitelisted': True, 'abuseConfidenceScore': 0, 'countryCode': 'US', 'usageType': 'Content Delivery Network', 'isp': 'Google LLC', 'domain': 'google.com', 'hostnames': ['dns.google'], 'isTor': False, 'totalReports': 206, 'numDistinctUsers': 54, 'lastReportedAt': '2025-03-28T16:01:42+00:00'}}
INFO:root:VirusTotal Data: {'data': {'id': '8.8.8.8', 'type': 'ip_address', 'links': {'self': 'https://www.virustotal.com/api/v3/ip_addresses/8.8.8.8'}, 'attributes': {'whois_date': 1741786692, 'continent': 'NA', 'last_https_certificate_date': 1743185206, 'whois': 'NetRange: 8.8.8.0 - 8.8.8.255\nCIDR: 8.8.8.0/24\nNetName: GOGL\nNetHandle: NET-8-8-8-0-2\nParent: NET8 (NET-8-0-0-0-0)\nNetType: Direct Allocation\nOriginAS: \nOrganization: Google LLC (GOGL)\nRegDate: 2023-12-28\nUpdated: 2023-12-28\nRef: https://rdap.arin.net/registry/ip/8.8.8.0\nOrgName: Google LLC\nO

({'data': {'ipAddress': '8.8.8.8',
   'isPublic': True,
   'ipVersion': 4,
   'isWhitelisted': True,
   'abuseConfidenceScore': 0,
   'countryCode': 'US',
   'usageType': 'Content Delivery Network',
   'isp': 'Google LLC',
   'domain': 'google.com',
   'hostnames': ['dns.google'],
   'isTor': False,
   'totalReports': 206,
   'numDistinctUsers': 54,
   'lastReportedAt': '2025-03-28T16:01:42+00:00'}},
 {'data': {'id': '8.8.8.8',
   'type': 'ip_address',
   'links': {'self': 'https://www.virustotal.com/api/v3/ip_addresses/8.8.8.8'},
   'attributes': {'whois_date': 1741786692,
    'continent': 'NA',
    'last_https_certificate_date': 1743185206,
    'whois': 'NetRange: 8.8.8.0 - 8.8.8.255\nCIDR: 8.8.8.0/24\nNetName: GOGL\nNetHandle: NET-8-8-8-0-2\nParent: NET8 (NET-8-0-0-0-0)\nNetType: Direct Allocation\nOriginAS: \nOrganization: Google LLC (GOGL)\nRegDate: 2023-12-28\nUpdated: 2023-12-28\nRef: https://rdap.arin.net/registry/ip/8.8.8.0\nOrgName: Google LLC\nOrgId: GOGL\nAddress: 1600 Amph

Data Cleaning and Transformation

In [22]:
import pandas as pd
import re
from datetime import datetime

In [23]:
# Function to validate IP addresses (basic validation for IPv4)
def validate_ip(ip):
    """Validate if the given string is a valid IP address (IPv4)"""
    return bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip))

In [24]:
# Function to standardize and clean data
def clean_data(abuseipdb_data, virustotal_data):
    # Combine data from both APIs into a list
    combined_data = []

    # Extract relevant data from AbuseIPDB (fields: ipAddress, lastReportedAt, totalReports)
    if abuseipdb_data and 'data' in abuseipdb_data:
        abuse_data = abuseipdb_data['data']
        combined_data.append({
            'ip_address': abuse_data.get('ipAddress', ''),
            'timestamp': abuse_data.get('lastReportedAt', ''),
            'reports_count': abuse_data.get('totalReports', 0),
            'abuse_confidence_score': abuse_data.get('abuseConfidenceScore', 0),
            'country_code': abuse_data.get('countryCode', ''),
            'usage_type': abuse_data.get('usageType', ''),
            'is_public': abuse_data.get('isPublic', False),
            'domain': abuse_data.get('domain', '')
        })

    # Extract relevant data from VirusTotal (fields: ip_address, last_analysis_date)
    if virustotal_data and 'data' in virustotal_data:
        virustotal_data = virustotal_data['data']
        # Convert Unix timestamp to human-readable format for VirusTotal's last_analysis_date
        last_analysis_date = virustotal_data['attributes'].get('last_analysis_date', None)
        if last_analysis_date:
            last_analysis_date = datetime.utcfromtimestamp(last_analysis_date).strftime('%Y-%m-%dT%H:%M:%S+00:00')

        combined_data.append({
            'ip_address': virustotal_data.get('id', ''),
            'timestamp': last_analysis_date,  # Use the converted timestamp
            'last_https_certificate': virustotal_data['attributes'].get('last_https_certificate', {}).get('cert_signature', {}).get('signature', ''),
            'reputation': virustotal_data['attributes'].get('reputation', 0),
            'last_analysis_stats': virustotal_data['attributes'].get('last_analysis_stats', {}),
        })

    # Convert the combined data into a pandas DataFrame
    df = pd.DataFrame(combined_data)

    # Remove duplicates based on 'ip_address'
    df = df.drop_duplicates(subset=['ip_address'])

    # Validate IP address format and clean the 'ip_address' column
    df['ip_address'] = df['ip_address'].apply(lambda x: x if validate_ip(x) else None)

    # Normalize the 'timestamp' column into a consistent datetime format
    def standardize_timestamp(timestamp):
        try:
            # Attempt to parse timestamp (ISO 8601 format, e.g., "2025-03-28T08:02:57+00:00")
            return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S%z") if timestamp else None
        except (ValueError, TypeError):
            return None

    df['timestamp'] = df['timestamp'].apply(standardize_timestamp)

    # Ensure consistency in field names
    df = df.rename(columns={'ip_address': 'IP Address', 'timestamp': 'Timestamp'})

    return df

In [25]:
# Example data from AbuseIPDB and VirusTotal (replace with actual data)
abuseipdb_data = {
    "data": {
        "ipAddress": "8.8.8.8",
        "isPublic": True,
        "ipVersion": 4,
        "isWhitelisted": True,
        "abuseConfidenceScore": 0,
        "countryCode": "US",
        "usageType": "Content Delivery Network",
        "isp": "Google LLC",
        "domain": "google.com",
        "hostnames": ["dns.google"],
        "isTor": False,
        "totalReports": 205,
        "numDistinctUsers": 54,
        "lastReportedAt": "2025-03-28T08:02:57+00:00"
    }
}

virustotal_data = {
    "data": {
        "id": "8.8.8.8",
        "type": "ip_address",
        "attributes": {
            "last_analysis_date": 1743155612,  # Example Unix timestamp
            "reputation": 548,
            "last_analysis_stats": {
                "malicious": 0,
                "suspicious": 0,
                "undetected": 31,
                "harmless": 63,
                "timeout": 0
            }
        }
    }
}

In [26]:
# Clean the data
cleaned_data = clean_data(abuseipdb_data, virustotal_data)

# Display the cleaned data
print("Cleaned Data: ")
print(cleaned_data)

Cleaned Data: 
  IP Address                 Timestamp  reports_count  abuse_confidence_score  \
0    8.8.8.8 2025-03-28 08:02:57+00:00          205.0                     0.0   

  country_code                usage_type is_public      domain  \
0           US  Content Delivery Network      True  google.com   

  last_https_certificate  reputation last_analysis_stats  
0                    NaN         NaN                 NaN  


Insert Cleaned Data into SQL Database

In [27]:
import pandas as pd
import sqlite3
import os

In [28]:
# Path to SQLite database (use a valid local path)
db_path = 'cyber_threats.db'

In [29]:
# Check if the database file exists. If not, it will be created.
if not os.path.exists(db_path):
    print(f"{db_path} not found. Creating a new database.")

In [30]:
# Connect to SQLite (this will create the database file if it doesn't exist)
conn = sqlite3.connect(db_path)  

In [31]:
# Create a cursor object to interact with the database
cursor = conn.cursor()

In [32]:
# Create the table to store the data (replace with appropriate columns based on your cleaned data)
cursor.execute('''
CREATE TABLE IF NOT EXISTS threat_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ip_address TEXT,
    timestamp TEXT,
    reports_count INTEGER,
    abuse_confidence_score INTEGER,
    country_code TEXT,
    usage_type TEXT,
    is_public BOOLEAN,
    domain TEXT,
    reputation INTEGER,
    last_analysis_stats TEXT
);
''')

<sqlite3.Cursor at 0x1feb6f843c0>

In [33]:
# Commit the transaction
conn.commit()

In [34]:
# Assuming 'cleaned_data' is your DataFrame from the previous cleaning process
cleaned_data = pd.DataFrame({
    'IP Address': ['8.8.8.8'],
    'Timestamp': ['2025-03-28T08:02:57+00:00'],
    'reports_count': [205],
    'abuse_confidence_score': [0],
    'country_code': ['US'],
    'usage_type': ['Content Delivery Network'],
    'is_public': [True],
    'domain': ['google.com'],
    'reputation': [548],
    'last_analysis_stats': [{'malicious': 0, 'suspicious': 0, 'undetected': 31, 'harmless': 63}]
})

In [35]:
# Reopen the connection to insert data
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

In [36]:
# Insert data into the SQL table using iterrows()
for _, row in cleaned_data.iterrows():
    cursor.execute('''
    INSERT INTO threat_data (ip_address, timestamp, reports_count, abuse_confidence_score, country_code, usage_type, is_public, domain, reputation, last_analysis_stats)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', 
    (row['IP Address'], row['Timestamp'], row['reports_count'], row['abuse_confidence_score'],
     row['country_code'], row['usage_type'], row['is_public'], row['domain'],
     row['reputation'], str(row['last_analysis_stats']))  # Convert dictionary to string
)

In [37]:
# Commit the transaction
conn.commit()

In [38]:
# Query the data to check if it was inserted properly
cursor.execute('SELECT * FROM threat_data LIMIT 5')
rows = cursor.fetchall()

In [39]:
# Print the first 5 rows
for row in rows:
    print(row)

# Load the data from the database into a pandas DataFrame
query = "SELECT * FROM threat_data"
df_from_sql = pd.read_sql(query, conn)

(1, '8.8.8.8', '2025-03-28T08:02:57+00:00', 205, 0, 'US', 'Content Delivery Network', 1, 'google.com', 548, "{'malicious': 0, 'suspicious': 0, 'undetected': 31, 'harmless': 63}")
(2, '8.8.8.8', '2025-03-28T08:02:57+00:00', 205, 0, 'US', 'Content Delivery Network', 1, 'google.com', 548, "{'malicious': 0, 'suspicious': 0, 'undetected': 31, 'harmless': 63}")
(3, '8.8.8.8', '2025-03-28T08:02:57+00:00', 205, 0, 'US', 'Content Delivery Network', 1, 'google.com', 548, "{'malicious': 0, 'suspicious': 0, 'undetected': 31, 'harmless': 63}")
(4, '8.8.8.8', '2025-03-28T08:02:57+00:00', 205, 0, 'US', 'Content Delivery Network', 1, 'google.com', 548, "{'malicious': 0, 'suspicious': 0, 'undetected': 31, 'harmless': 63}")
(5, '8.8.8.8', '2025-03-28T08:02:57+00:00', 205, 0, 'US', 'Content Delivery Network', 1, 'google.com', 548, "{'malicious': 0, 'suspicious': 0, 'undetected': 31, 'harmless': 63}")


In [40]:
# Save the DataFrame to a CSV file 
csv_path = 'threat_data_export.csv' 
df_from_sql.to_csv(csv_path, index=False)

# print the data export
print(f"Data exported to {csv_path}")

# Close the connection
conn.close()

Data exported to threat_data_export.csv
