# SMB Lateral Movement Burst

**Detect SMB scanning/lateral movement and contain source host.**

This notebook is a **SOC training lab + operational playbook**. It includes:

- Background and objective
- Mock dataset you can run locally (safe, synthetic)
- Splunk SPL, Sentinel KQL, Elastic KQL/EQL examples
- SOAR-style automated response templates
- Exercises + solutions

⚠️ **Safety note**: Response commands here are defensive and should be executed only with proper approval.


## Objective

**Goal:** Investigate and respond to **SMB Lateral Movement Burst**.

**Success criteria:** You can reproduce the detection using the mock dataset and describe containment actions.

**Dataset used:** `proxy_logs.csv`


## Lab Setup (Run This First)

Load helper functions and the mock dataset.


In [None]:
import pandas as pd
from pathlib import Path

DATA_DIR = Path("mock_data")

def load_csv(name: str) -> pd.DataFrame:
    return pd.read_csv(DATA_DIR / name)

def show(df, n=10):
    display(df.head(n))


In [None]:
df = load_csv('proxy_logs.csv')
show(df)
print('Rows:', len(df))


## Run the Detection (Mock Data)

This cell demonstrates the detection logic on the synthetic dataset.


In [None]:
# Demo detection logic
import pandas as pd

# Adjust these filters to test different scenarios
print('Demo logic:', 'Distinct SMB targets (dest port 445) from a source in 10 minutes exceeds threshold.')

# Example implementation per dataset:
if 'proxy_logs.csv' == 'auth_signins.csv':
    # impossible travel / MFA patterns
    user='<USER>'
    d=df.copy()
    if user != '<USER>':
        d=d[d['UserPrincipalName']==user]
    # Sort and show unique locations
    d['TimeGenerated']=pd.to_datetime(d['TimeGenerated'])
    d=d.sort_values('TimeGenerated')
    display(d[['TimeGenerated','UserPrincipalName','IPAddress','Location','ResultType','Roles']])

elif 'proxy_logs.csv' == 'device_process.csv':
    d=df[df['ProcessCommandLine'].str.contains('-enc|-encodedcommand|FromBase64String', case=False, na=False)]
    display(d)

elif 'proxy_logs.csv' == 'device_file.csv':
    d=df[df['ActionType'].isin(['FileDeleted','FileRenamed'])]
    display(d)

elif 'proxy_logs.csv' == 'proxy_logs.csv':
    d=df[df['DestinationHostName'].isin(['drive.google.com','dropbox.com','box.com','onedrive.live.com','mega.nz'])]
    display(d)

elif 'proxy_logs.csv' == 'dns_events.csv':
    d=df[df['DomainAgeDays']<=30]
    display(d)

elif 'proxy_logs.csv' == 'office_activity.csv':
    display(df)

elif 'proxy_logs.csv' == 'iam_events.csv':
    display(df)

elif 'proxy_logs.csv' == 'vpn_auth.csv':
    d=df[(df['Result']=='Success') & (df['DeviceCompliant']==False)]
    display(d)


## SIEM Queries (Copy/Paste)

Use these in production SIEMs. You will need to adjust field names and table/index names.


In [None]:
print('### Sentinel KQL (DeviceNetworkEvents)\n```kql\nDeviceNetworkEvents\n| where RemotePort == 445\n| summarize Targets=dcount(RemoteIP) by DeviceName, bin(TimeGenerated, 10m)\n| where Targets >= 10\n```\n### Splunk SPL\n```spl\nindex=network dest_port=445 | bucket _time span=10m | stats dc(dest_ip) as targets by src_ip _time | where targets>=10\n```\n### Elastic KQL\n```text\nevent.category:network AND destination.port:445\n```\n')


## Automated Response (SOAR-style)

These cells provide **ready-to-adapt templates** for Sentinel playbooks, Splunk SOAR, and Elastic actions.


In [None]:
# SOAR-style automation templates (pseudo / ready-to-adapt)

sentinel_logic_app = r'''
# Microsoft Sentinel Playbook (Logic App) - High level steps
# 1) Trigger: When Sentinel alert is created (incident created)
# 2) Actions:
#    - Get entities (user, host, ip, file hash)
#    - Enrich: Risky sign-ins, user info, device risk, TI lookup
#    - Contain: Disable user OR revoke sessions OR isolate device (conditional)
#    - Notify: Teams/Email to SOC channel + ticket creation
#    - Document: Add comments to incident + tags + severity update
'''

splunk_soar = r'''
# Splunk SOAR (Phantom) playbook - High level steps
# - on_start: collect artifacts (user/ip/host/hash)
# - enrich: WHOIS/TI, AD lookup, GeoIP, EDR context
# - decision: if high confidence -> containment actions
# - actions: disable_user, block_ip, isolate_host, add_case_notes
'''

elastic_actions = r'''
# Elastic Security - Rule actions (conceptual)
# - Add action connectors: Email, Slack, Jira, Webhook
# - For high severity: trigger a webhook to SOAR to isolate host / revoke tokens
'''
print("SOAR templates loaded (pseudo/ready-to-adapt).")


## Response Commands (Defensive)

Isolate source host, block SMB outbound temporarily, reset credentials if compromise suspected.

Replace placeholders before executing.


In [None]:
# Example defensive commands (placeholders)
# Azure AD token revoke:
# Revoke-AzureADUserAllRefreshToken -ObjectId <USER_OBJECT_ID>

# AD disable:
# Disable-ADAccount -Identity <USER>

# MDE isolate device (requires tooling/permissions):
# Invoke-MDATPDeviceIsolation -MachineId <DEVICE_ID>


## Exercises

1) Change `<USER>`/`<HOST>` placeholders and rerun the mock detection.
2) Identify at least two false positive causes.
3) Propose one tuning improvement (allowlist/threshold/correlation).
4) Write a short escalation note (who/what/when/impact/next steps).


## Solutions (Example)

✅ **False positives:** corporate VPN egress, admin maintenance windows, deployment tooling.

✅ **Tuning:** require device mismatch or risky sign-in for impossible travel; exclude jump hosts for SMB bursts; require external domain forward.

✅ **Escalation note template:**
- Summary:
- Entities:
- Evidence:
- Containment:
- Recommended next steps:
