# Threat Hunting with MSTICPy

This notebook demonstrates threat hunting capabilities using MSTICPy library.

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# MSTICPy imports
import msticpy as mp
from msticpy.data import QueryProvider
from msticpy.nbtools import *
from msticpy.common.timespan import TimeSpan

print("Libraries imported successfully!")
print(f"MSTICPy version: {mp.__version__}")

In [None]:
# Load sample threat hunting data from CSV
df = pd.read_csv('/shared/hunt.csv')
df['timestamp'] = pd.to_datetime(df['timestamp'])

print(f"Loaded {len(df)} threat hunting records")
df.head()

In [None]:
# Basic threat analysis
print("Threat Type Distribution:")
print(df['threat_type'].value_counts())

print("\nSeverity Distribution:")
print(df['severity'].value_counts())

print("\nStatus Distribution:")
print(df['status'].value_counts())

In [None]:
# Visualize threat data
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Threat type distribution
df['threat_type'].value_counts().plot(kind='bar', ax=axes[0,0])
axes[0,0].set_title('Threat Types')
axes[0,0].tick_params(axis='x', rotation=45)

# Severity distribution
df['severity'].value_counts().plot(kind='pie', ax=axes[0,1])
axes[0,1].set_title('Severity Levels')

# Timeline of threats
df.set_index('timestamp')['threat_type'].resample('D').count().plot(kind='line', ax=axes[1,0])
axes[1,0].set_title('Threats Over Time')

# Analyst performance
df['analyst'].value_counts().plot(kind='bar', ax=axes[1,1])
axes[1,1].set_title('Threats by Analyst')
axes[1,1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

In [None]:
# IP address analysis using MSTICPy
from msticpy.transform.iocextract import IoCExtract

# Extract unique IP addresses
source_ips = df['source_ip'].unique()
dest_ips = df['destination_ip'].unique()
all_ips = np.concatenate([source_ips, dest_ips])
unique_ips = np.unique(all_ips)

print(f"Found {len(unique_ips)} unique IP addresses:")
for ip in unique_ips[:10]:  # Show first 10
    print(f"  {ip}")

if len(unique_ips) > 10:
    print(f"  ... and {len(unique_ips) - 10} more")

In [None]:
# Critical threat analysis
critical_threats = df[df['severity'] == 'Critical']
print(f"Critical Threats Analysis ({len(critical_threats)} records):")
print("="*50)

for _, threat in critical_threats.iterrows():
    print(f"Timestamp: {threat['timestamp']}")
    print(f"Type: {threat['threat_type']}")
    print(f"Source: {threat['source_ip']} -> Destination: {threat['destination_ip']}")
    print(f"Status: {threat['status']}")
    print(f"Details: {threat['details']}")
    print("-" * 40)

In [None]:
# Generate threat hunting report
report = {
    'total_threats': len(df),
    'critical_threats': len(df[df['severity'] == 'Critical']),
    'confirmed_threats': len(df[df['status'] == 'Confirmed']),
    'unique_source_ips': len(df['source_ip'].unique()),
    'unique_dest_ips': len(df['destination_ip'].unique()),
    'date_range': f"{df['timestamp'].min().date()} to {df['timestamp'].max().date()}",
    'most_common_threat': df['threat_type'].value_counts().index[0],
    'most_active_analyst': df['analyst'].value_counts().index[0]
}

print("🔍 THREAT HUNTING SUMMARY REPORT")
print("=" * 40)
for key, value in report.items():
    print(f"{key.replace('_', ' ').title()}: {value}")

# Save report to file
with open('/shared/threat_report.txt', 'w') as f:
    f.write("THREAT HUNTING SUMMARY REPORT\n")
    f.write("=" * 40 + "\n")
    for key, value in report.items():
        f.write(f"{key.replace('_', ' ').title()}: {value}\n")

print("\n📄 Report saved to /shared/threat_report.txt")