# Cyber Threat Analyzer (CTA) - Part 1: Parsing

This notebook details the process of ingesting and parsing a raw, unstructured system log into a clean Pandas DataFrame.

In [4]:
import pandas as pd
from pathlib import Path
import re

print("Import Complete")

Import Complete


## Step 1: Parsing the Log File

The log files have a custom format: TIMESTAMP [LOG_LEVEL] MESSAGE. A simple pd.read_csv won't work. We need to build a custom Regular Expression (regex) to match this pattern and extract the three key pieces of information.

In [5]:
# Define the 'regex' for a single log line telling Python *exactly* what pattern to look for.
#
#    ^         # Start of the line
#    (\S+)     # Capture Group 1: One or more non-space characters (the timestamp)
#    \s+       # Match one or more spaces (the gap after the timestamp)
#    \[        # Match a literal opening bracket '['
#    (\w+)     # Capture Group 2: One or more "word" characters (the log level)
#    \]        # Match a literal closing bracket ']'
#    \s+       # Match one or more spaces (the gap after the log level)
#    (.*)      # Capture Group 3: "Capture everything else" (the message)
#    $         # End of the line
#
log_pattern = re.compile(r'^(\S+)\s+\[(\w+)\]\s+(.*)$')

# Define the path to the log file using the 'pathlib' library.
log_file_path = Path.cwd().parent / "data" / "system.log"

# Create empty list to store structured data
data = []

# Open the log file and loop through it, line by line
with open(log_file_path, 'r') as f:
    for line in f:
        # Try to match regex "formula" to the current line
        match = log_pattern.match(line)
        
        # If the line matched our pattern, extract the captured groups
        if match:
            data.append({
                'timestamp': match.group(1),
                'log_level': match.group(2),
                'message': match.group(3).strip()
            })

# Create Pandas DataFrame from the list of dictionaries
df = pd.DataFrame(data)

print(f"--- CTA Parser finished. Found {len(data)} log entries. ---")

--- CTA Parser finished. Found 8 log entries. ---


## Step 2: Verify the DataFrame

Let's use df.head() to check the first 5 rows and confirm the parser worked as expected.

In [6]:
df.head()

Unnamed: 0,timestamp,log_level,message
0,2023-10-27T14:01:03,INFO,System startup complete.
1,2023-10-27T14:02:15,INFO,User 'admin' logged in from 192.168.1.100
2,2023-10-27T14:02:45,WARNING,Disk space low on /var. 85% used.
3,2023-10-27T14:03:10,ERROR,Failed to connect to database [db-01]. IP 10.0...
4,2023-10-27T14:03:12,INFO,Retrying connection...


## Next Steps

Now that we have clean data, the next step is to enrich it by calling the AbuseIPDB API...

In [8]:
import os
from dotenv import load_dotenv # Import the new library

# This line automatically finds the .env file in your project's root
# and loads the variables found inside it into your environment
# for this specific notebook session.
load_dotenv()

# Now, the standard os.environ.get() should find the key!
api_key = os.environ.get('ABUSEIPDB_KEY')

if api_key:
    print("✅ Success! API Key loaded successfully from .env file.")
    # Optional: Verify the key looks right
    # print(f"   Key starts with: {api_key[:5]}... and ends with: {api_key[-5:]}")
else:
    print("❌ Error: API Key not found.")
    print("   1. Did you create the `.env` file in the 'cta' root folder?")
    print("   2. Does the `.env` file contain 'ABUSEIPDB_KEY=your-key'?")
    print("   3. Did you install 'python-dotenv' (`pip install python-dotenv`)?")
    print("   4. Did you run the `load_dotenv()` command in this cell?")

✅ Success! API Key loaded successfully from .env file.


In [9]:
import os
import requests
import json # To pretty-print the result
from dotenv import load_dotenv

# --- 1. Load the API Key (using dotenv) ---
load_dotenv()
api_key = os.environ.get('ABUSEIPDB_KEY')

# --- 2. Define the Function to Check an IP ---
def check_ip(ip_address, key):
    """
    Calls the AbuseIPDB API to check a given IP address.
    Returns the JSON response from the API.
    """
    if not key:
        print("API Key not loaded. Cannot check IP.")
        return None # Return nothing if the key isn't loaded

    # Define the API endpoint and parameters
    url = 'https://api.abuseipdb.com/api/v2/check'
    params = {
        'ipAddress': ip_address,
        'maxAgeInDays': '90', # How far back to look for reports
        'verbose': True # Ask for more details if available
    }
    headers = {
        'Accept': 'application/json',
        'Key': key
    }

    print(f"Checking IP: {ip_address}...")
    try:
        response = requests.get(url=url, headers=headers, params=params)
        response.raise_for_status() # Raise an error for bad status codes (4xx or 5xx)

        # If successful, parse the JSON response
        report = response.json()
        return report['data'] # Return just the 'data' part of the response

    except requests.exceptions.RequestException as e:
        print(f"  Error during API request: {e}")
        return None
    except Exception as e:
        print(f"  An unexpected error occurred: {e}")
        return None

# --- 3. Test the function with a known bad IP ---
test_ip = '1.2.3.4' # A commonly reported IP for testing
if api_key:
    ip_data = check_ip(test_ip, api_key)

    # --- 4. Print the results nicely ---
    if ip_data:
        print("\n--- API Report Received ---")
        print(f"  IP Address: {ip_data.get('ipAddress')}")
        print(f"  Country: {ip_data.get('countryCode')}")
        print(f"  Abuse Score: {ip_data.get('abuseConfidenceScore')}%")
        print(f"  Total Reports: {ip_data.get('totalReports')}")
        print(f"  ISP: {ip_data.get('isp')}")
        # print("\nFull Report:") # Uncomment to see everything
        # print(json.dumps(ip_data, indent=2))
    else:
        print("\n--- Failed to get API report ---")
else:
    print("API Key not loaded, skipping test.")

Checking IP: 1.2.3.4...

--- API Report Received ---
  IP Address: 1.2.3.4
  Country: AU
  Abuse Score: 39%
  Total Reports: 38
  ISP: APNIC Debogon Project


In [10]:
import re

def extract_ip(message):
    """
    Uses regex to find the first IPv4 address in a string.
    Returns the IP address string if found, otherwise None.
    """
    # Regex pattern for an IPv4 address
    # \b matches word boundaries to avoid partial matches
    # (?:\d{1,3}\.){3} matches three groups of (1-3 digits followed by a dot)
    # \d{1,3} matches the final 1-3 digits
    ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'

    match = re.search(ip_pattern, message) # Find the first match in the message

    if match:
        return match.group(0) # Return the matched IP string
    else:
        return None # Return None if no IP was found

In [11]:
# Test cases
test_message_with_ip = "Failed login for user 'root' from IP 1.2.3.4"
test_message_no_ip = "System startup complete."

# Run the function
found_ip = extract_ip(test_message_with_ip)
no_ip = extract_ip(test_message_no_ip)

print(f"IP found in first message: {found_ip}")
print(f"IP found in second message: {no_ip}")

IP found in first message: 1.2.3.4
IP found in second message: None


In [12]:
# Create the new 'ip_address' column
# The 'lambda x:' part passes each message ('x') into our function.
df['ip_address'] = df['message'].apply(lambda x: extract_ip(x))

print("Created 'ip_address' column.")

# Display the DataFrame to see the new column!
df.head(10) # Show more rows to potentially see IPs and None values

Created 'ip_address' column.


Unnamed: 0,timestamp,log_level,message,ip_address
0,2023-10-27T14:01:03,INFO,System startup complete.,
1,2023-10-27T14:02:15,INFO,User 'admin' logged in from 192.168.1.100,192.168.1.100
2,2023-10-27T14:02:45,WARNING,Disk space low on /var. 85% used.,
3,2023-10-27T14:03:10,ERROR,Failed to connect to database [db-01]. IP 10.0...,10.0.0.5
4,2023-10-27T14:03:12,INFO,Retrying connection...,
5,2023-10-27T14:03:42,ERROR,Connection to [db-01] timed out. IP 10.0.0.5,10.0.0.5
6,2023-10-27T14:05:01,ERROR,Failed login for user 'root' from IP 1.2.3.4,1.2.3.4
7,2023-10-27T14:05:02,ERROR,Failed login for user 'guest' from IP 99.88.77.66,99.88.77.66
