In [14]:
import pandas as pd

# Load the generated DNS logs
df = pd.read_csv("../data/dns_logs.csv")


# Display the first few rows
df.head()


Unnamed: 0,timestamp,source_ip,queried_domain,response_code,process_name
0,2025-07-21 10:00:00,192.168.1.10,malicious.ru,SERVFAIL,cmd.exe
1,2025-07-21 10:00:20,192.168.1.25,google.com,NXDOMAIN,svchost.exe
2,2025-07-21 10:00:40,192.168.1.25,bbc.co.uk,NXDOMAIN,chrome.exe
3,2025-07-21 10:01:00,192.168.1.10,YXNkZmcxMjMuZXhhbXBsZS5jb20=,NXDOMAIN,cmd.exe
4,2025-07-21 10:01:20,192.168.1.15,google.com,NOERROR,chrome.exe


In [15]:
import re  # re = Regular Expression module (used to detect patterns in text)

# Create a list of known suspicious keywords often used in malicious domains
# These are often used by attackers to host malware, phishing pages, or exploit kits
suspicious_keywords = ['malicious', 'xyz', 'ru', 'top', 'cn']

# Regular Expression pattern to match domains that look like base64-encoded strings
# Explanation: 
# - ^ = start of string
# - [A-Za-z0-9+/]{16,} = match 16 or more characters that are upper/lower letters, numbers, +, or /
# This catches domains like 'YXNkZGFzZGZhc2RmYXNk' which are highly random or encoded ‚Äî suspicious!
base64_pattern = r'^[A-Za-z0-9+/]{16,}$'

# Convert the 'timestamp' column from plain text to real datetime format
# Why? Because you may want to filter logs by time later ‚Äî datetime format allows comparisons like < or >=
df['timestamp'] = pd.to_datetime(df['timestamp'])

# Step 1: Flag domains that look suspicious either by:
# - Containing a known bad keyword (e.g. "malicious", ".ru")
# - Matching our base64/random-looking pattern
suspicious_dns = df[
    df['queried_domain'].str.contains('|'.join(suspicious_keywords), case=False) |  # Check for bad keywords
    df['queried_domain'].apply(lambda d: bool(re.match(base64_pattern, d)))         # Check for randomness
]

# Step 2: Narrow it down ‚Äî keep only the ones that also failed to resolve
# - Why? Because failed lookups (NXDOMAIN, SERVFAIL) to bad-looking domains are strong indicators of malware testing out its C2 channels
suspicious_dns = suspicious_dns[
    suspicious_dns['response_code'].isin(['NXDOMAIN', 'SERVFAIL'])
]

# Step 3: Print the final suspicious results
# - Show relevant columns to keep the output clear and focused
if not suspicious_dns.empty:
    print("‚ö†Ô∏è Suspicious DNS queries detected:\n")
    print(suspicious_dns[['timestamp', 'source_ip', 'queried_domain', 'response_code', 'process_name']])
else:
    print("‚úÖ No suspicious DNS activity detected.")


‚ö†Ô∏è Suspicious DNS queries detected:

             timestamp     source_ip queried_domain response_code  \
0  2025-07-21 10:00:00  192.168.1.10   malicious.ru      SERVFAIL   
23 2025-07-21 10:07:40      10.0.0.5     stealer.cn      SERVFAIL   
24 2025-07-21 10:08:00  192.168.1.10   phishing.xyz      SERVFAIL   
39 2025-07-21 10:13:00    172.16.0.7   phishing.xyz      NXDOMAIN   
46 2025-07-21 10:15:20  192.168.1.10     stealer.cn      NXDOMAIN   
51 2025-07-21 10:17:00      10.0.0.5     stealer.cn      SERVFAIL   
57 2025-07-21 10:19:00  192.168.1.15   phishing.xyz      NXDOMAIN   
60 2025-07-21 10:20:00  192.168.1.15   phishing.xyz      SERVFAIL   
66 2025-07-21 10:22:00      10.0.0.5     stealer.cn      SERVFAIL   
73 2025-07-21 10:24:20  192.168.1.10   malicious.ru      NXDOMAIN   
75 2025-07-21 10:25:00  192.168.1.10   phishing.xyz      SERVFAIL   
78 2025-07-21 10:26:00  192.168.1.25   phishing.xyz      SERVFAIL   
89 2025-07-21 10:29:40  192.168.1.10   malicious.ru      SERVF

In [17]:
# DNS Rule 2 ‚Äì Repeated DNS queries to suspicious domains in short intervals

import pandas as pd
import re

# 1. Load the DNS logs (use the correct filename)
df = pd.read_csv('../data/dns_logs.csv')

# 2. Convert timestamp to datetime format for time calculations
df['timestamp'] = pd.to_datetime(df['timestamp'])

# 3. Define risky domain indicators and failure codes
RISKY_KEYWORDS = ['malicious', 'xyz', 'ru', 'top', 'cn']
FAIL_RCODES = ['NXDOMAIN', 'SERVFAIL']

# 4. Helper function: decide if a domain looks suspicious
def looks_risky_domain(domain):
    if pd.isna(domain):
        return False
    domain = str(domain).lower()
    return any(keyword in domain for keyword in RISKY_KEYWORDS)

# 5. Prepare alert list
alerts = []

# 6. Group logs by source IP and queried domain
for (src_ip, domain), group in df.groupby(['source_ip', 'queried_domain']):
    # Only check domains that are risky OR have repeated failures
    if not looks_risky_domain(domain) and not group['response_code'].astype(str).str.upper().isin(FAIL_RCODES).any():
        continue

    # Sort entries by timestamp
    group = group.sort_values('timestamp')

    # Check for multiple queries within 60 seconds
    for i in range(len(group) - 1):
        start_time = group.iloc[i]['timestamp']
        window = group[(group['timestamp'] >= start_time) &
                       (group['timestamp'] <= start_time + pd.Timedelta(seconds=60))]

        if len(window) >= 3:  # trigger alert if 3 or more queries in a minute
            alerts.append({
                'source_ip': src_ip,
                'domain': domain,
                'count': len(window),
                'first_seen': start_time,
                'last_seen': window.iloc[-1]['timestamp']
            })
            break  # avoid duplicate alerts for the same domain

# 7. Convert alerts to DataFrame
alerts_df = pd.DataFrame(alerts)

# 8. Display results
if not alerts_df.empty:
    print("‚ö† Repeated DNS query alerts:\n")
    print(alerts_df)
else:
    print("‚úÖ No repeated DNS query patterns detected.")


‚úÖ No repeated DNS query patterns detected.


In [23]:
# Rule 3 ‚Äì DNS Exfiltration Detection via Encoded Subdomains

import pandas as pd
import re

# 1. Load the DNS logs (correct path from 'notebooks/' to 'data/')
df = pd.read_csv("../data/dns_logs.csv")

# 2. Convert 'timestamp' to datetime format for sorting/filtering
df['timestamp'] = pd.to_datetime(df['timestamp'])

# 3. Define patterns and criteria for suspicious encoded subdomains
# Base64-like pattern: long random-looking strings
BASE64_PATTERN = r'^[A-Za-z0-9+/]{20,}={0,2}$'
# Hex-like pattern: sequences of only 0-9 and a-f (at least 20 chars)
HEX_PATTERN = r'^[0-9a-f]{20,}$'

# 4. Helper function to check if subdomain looks encoded
def is_encoded_label(label):
    return bool(re.match(BASE64_PATTERN, label)) or bool(re.match(HEX_PATTERN, label))

# 5. Prepare alert list
alerts = []

# 6. Loop through each DNS query and check for encoded subdomains
for _, row in df.iterrows():
    # Extract the full domain queried
    domain = str(row['queried_domain'])
    # Split into labels (subdomain parts before each dot)
    labels = domain.split('.')

    # Check each label (excluding the TLD and second-level domain)
    for label in labels[:-2]:
        if is_encoded_label(label):
            alerts.append({
                'timestamp': row['timestamp'],
                'source_ip': row['source_ip'],
                'queried_domain': domain,
                'response_code': row['response_code'],
                'process_name': row['process_name'],
                'encoded_label': label
            })
            break  # Avoid duplicate alerts for the same query

# 7. Convert alerts to DataFrame
alerts_df = pd.DataFrame(alerts)

# 8. Display results
if not alerts_df.empty:
    print("üö® Suspicious DNS exfiltration activity detected:")
    display(alerts_df)
else:
    print("‚úÖ No suspicious DNS exfiltration patterns detected.")


‚úÖ No suspicious DNS exfiltration patterns detected.
