•  Incident Detection and Response

•  Vulnerability Management

•  Network Security

•  Malware Analysis and Reverse Engineering

•  Automation of Security Tools

•  User and Access Management

•  Phishing and Social Engineering Detection

•  Cloud Security

•  Data Protection and Privacy

•  Security Auditing and Compliance

•  Incident Forensics and Data Collection

•  Social Media and External Threat Monitoring


In [1]:
import warnings
warnings.filterwarnings('ignore') #ignore warnings

### MALWARE ANALYSIS

8192 Bytes in Chunk-Based Hashing:

Some scanning systems use chunk-based hashing (breaking the file into fixed-size chunks like 8 KB) to analyze files incrementally or detect specific patterns within the file. These chunk hashes can be used for partial analysis, deduplication, or incremental scanning.

In [3]:
!pip install pefile

Collecting pefile
  Downloading pefile-2024.8.26-py3-none-any.whl.metadata (1.4 kB)
Downloading pefile-2024.8.26-py3-none-any.whl (74 kB)
Installing collected packages: pefile
Successfully installed pefile-2024.8.26


In [4]:
import hashlib
import os
import re
import pefile

# Compute file hashes (MD5, SHA-1, SHA-256)
def compute_file_hashes(file_path):
    hashes = {'MD5': hashlib.md5(), 'SHA1': hashlib.sha1(), 'SHA256': hashlib.sha256()}
    try:
        with open(file_path, 'rb') as f:
            while chunk := f.read(8192):
                for hash_type in hashes.values():
                    hash_type.update(chunk)
        return {name: hash_type.hexdigest() for name, hash_type in hashes.items()}
    except Exception as e:
        print(f"Error reading file: {e}")
        return None

# Extract printable strings from a binary file
def extract_strings(file_path, min_length=4):
    try:
        with open(file_path, 'rb') as f:
            data = f.read()
        strings = re.findall(f"[ -~]{{{min_length},}}", data.decode(errors="ignore"))
        return strings
    except Exception as e:
        print(f"Error extracting strings: {e}")
        return []

# Check if the file is a Portable Executable (PE) and analyze its headers
def analyze_pe_file(file_path):
    try:
        pe = pefile.PE(file_path)
        print(f"--- PE File Analysis ---")
        print(f"Entry Point: 0x{pe.OPTIONAL_HEADER.AddressOfEntryPoint:X}")
        print(f"Image Base: 0x{pe.OPTIONAL_HEADER.ImageBase:X}")
        print("Sections:")
        for section in pe.sections:
            print(f"  {section.Name.decode().strip()}: VirtualSize={section.Misc_VirtualSize}, RawSize={section.SizeOfRawData}")
    except pefile.PEFormatError:
        print("Not a valid PE file.")
    except Exception as e:
        print(f"Error analyzing PE file: {e}")

# Main function for malware analysis
def perform_static_analysis(file_path):
    if not os.path.isfile(file_path):
        print(f"File not found: {file_path}")
        return

    print(f"Analyzing file: {file_path}")

    # Get file size and metadata
    file_size = os.path.getsize(file_path)
    print(f"File size: {file_size} bytes")

    # Compute file hashes
    hashes = compute_file_hashes(file_path)
    if hashes:
        print("--- File Hashes ---")
        for name, hash_value in hashes.items():
            print(f"{name}: {hash_value}")

    # Extract strings
    strings = extract_strings(file_path)
    print(f"\n--- Extracted Strings (Top 20) ---")
    for string in strings[:20]:  # Display top 20 strings for brevity
        print(string)

    # Analyze Portable Executable (PE) files
    print("\n--- PE File Information ---")
    analyze_pe_file(file_path)



In [7]:
# file_path = "/Users/ken.clements/code/SEAS8416/data/SampleRes.dll"  # Replace with the path to your file

file_path = "/Windows/System32/aclui.dll"  # Replace with the path to your file

perform_static_analysis(file_path)

Analyzing file: /Windows/System32/aclui.dll
File size: 573440 bytes
--- File Hashes ---
MD5: de72b7b3481e6aae9d3fcca8c2951b62
SHA1: 9cb16302631f61a5d3cea5289f49d6996cc506dc
SHA256: ca6fffa511c699037394faa5c70d3c97d9d6471d78eb0fb47c35430cfd27c8b8

--- Extracted Strings (Top 20) ---
L!This program cannot be run in DOS mode.
2Mv#v#v#v#w#
'd#v"
!w#Richv#
.text
`fothk
`.rdata
@.data
.pdata
@.didat
.rsrc
@.reloc
Ht$ UHl$H
H3HEpH8
HE`LH0
E3HHEh
HHE8
EXH(
HE@H 
LEPDM\HEH

--- PE File Information ---
--- PE File Analysis ---
Entry Point: 0xF980
Image Base: 0x180000000
Sections:
  .text   : VirtualSize=395916, RawSize=397312
  fothk   : VirtualSize=4096, RawSize=4096
  .rdata  : VirtualSize=116522, RawSize=118784
  .data   : VirtualSize=12352, RawSize=12288
  .pdata  : VirtualSize=18024, RawSize=20480
  .didat  : VirtualSize=2712, RawSize=4096
  .rsrc   : VirtualSize=2160, RawSize=4096
  .reloc  : VirtualSize=5748, RawSize=8192


### LOG ANALYSIS

1	^

	This asserts that the match must start at the beginning of the line.

2	(\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})

	This captures the timestamp in the log.

	\w{3} matches the first three characters of the month (e.g., Jun).

	\s+ matches one or more spaces between fields.

	\d{1,2} matches the day of the month (1 or 2 digits).

	\d{2}:\d{2}:\d{2} matches the time (hour:minute:second).

	This entire expression is placed inside parentheses () to create capture group 1 for extracting the timestamp.

3	\s.*?sshd\(pam_unix\)\[\d+\]

	\s matches a single space.

	.*? lazy match for any characters until finds sshd(pam_unix) (non-greedy to avoid overshooting).

	\(\) escapes the parentheses around pam_unix.

	\[\d+\] matches a bracketed process ID (one or more digits).

4	: authentication failure; .*? user=root

	: authentication failure; matches the literal string indicating a failed login attempt.
	
	.*? lazily matches any characters until the user=root string.

In [8]:
from datetime import datetime, timedelta
import re
import smtplib
from email.mime.text import MIMEText

LOG_FILE_PATH = '../data/Linux_2k.log'
ADMIN_EMAIL = 'kenclements@gwu.edu'  # Email address to notify
SMTP_SERVER = 'localhost'  # Adjust if using an external SMTP server

# Pattern to detect failed login attempts for root
FAIL_PATTERN = re.compile(r"^(\w{3}\s+\d{1,2}\s\d{2}:\d{2}:\d{2})\s.*?sshd\(pam_unix\)\[\d+\]: authentication failure; .*? user=root")

# Store failed login timestamps
failed_logins = []

with open(LOG_FILE_PATH, 'r') as log_file:
    for line in log_file:
        match = FAIL_PATTERN.match(line)
        if match:
            # Extract and parse timestamp
            timestamp_str = match.group(1)
            timestamp = datetime.strptime(timestamp_str, "%b %d %H:%M:%S")
            current_year = datetime.now().year
            timestamp = timestamp.replace(year=current_year)  # Handle year ambiguity
            failed_logins.append(timestamp)

# Detect intervals with more than 5 failures within 10 minutes
if failed_logins:
    failed_logins.sort()
    alerts = []
    window_size = timedelta(minutes=10)

    for i in range(len(failed_logins)):
        window_end = failed_logins[i] + window_size
        count = sum(1 for ts in failed_logins[i:] if ts <= window_end)

        if count > 5:
            alerts.append((failed_logins[i], count))

# Display alert results and send email if necessary
if alerts:
    alert_message = "ALERT: Excessive failed root login attempts detected!\n"
    for alert_time, count in alerts:
        alert_message += f"Time: {alert_time.strftime('%Y-%m-%d %H:%M:%S')} - Count: {count}\n"
    print(alert_message)

    # Send email notification
    msg = MIMEText(alert_message)
    msg['Subject'] = 'Security Alert: Failed Root Login Attempts'
    msg['From'] = 'security-alert@yourdomain.com'
    msg['To'] = ADMIN_EMAIL

    try:
        with smtplib.SMTP(SMTP_SERVER) as server:
            server.sendmail('security-alert@gwu.edu', [ADMIN_EMAIL], msg.as_string())
        print(f"Alert email sent to {ADMIN_EMAIL}")
    except Exception as e:
        print(f"Failed to send email: {e}")
else:
    print("No excessive failed root login attempts detected.")


ALERT: Excessive failed root login attempts detected!
Time: 2025-06-15 02:04:59 - Count: 10
Time: 2025-06-15 02:04:59 - Count: 9
Time: 2025-06-15 02:04:59 - Count: 8
Time: 2025-06-15 02:04:59 - Count: 7
Time: 2025-06-15 02:04:59 - Count: 6
Time: 2025-06-22 03:17:26 - Count: 23
Time: 2025-06-22 03:17:26 - Count: 22
Time: 2025-06-22 03:17:35 - Count: 21
Time: 2025-06-22 03:17:36 - Count: 20
Time: 2025-06-22 03:17:36 - Count: 19
Time: 2025-06-22 03:17:45 - Count: 18
Time: 2025-06-22 03:17:46 - Count: 17
Time: 2025-06-22 03:17:46 - Count: 16
Time: 2025-06-22 03:17:52 - Count: 15
Time: 2025-06-22 03:17:55 - Count: 14
Time: 2025-06-22 03:17:56 - Count: 13
Time: 2025-06-22 03:17:56 - Count: 12
Time: 2025-06-22 03:18:02 - Count: 11
Time: 2025-06-22 03:18:05 - Count: 10
Time: 2025-06-22 03:18:06 - Count: 9
Time: 2025-06-22 03:18:06 - Count: 8
Time: 2025-06-22 03:18:10 - Count: 7
Time: 2025-06-22 03:18:12 - Count: 6
Time: 2025-06-23 23:30:03 - Count: 9
Time: 2025-06-23 23:30:03 - Count: 8
Time: 

https://www.ossec.net/docs/log_samples/

### EVENT CORRELATION

In [10]:
import re
from datetime import datetime

# Define a pattern to match typical log entries. Adjust this to match your log format.
LOG_PATTERN = re.compile(r'(?P<timestamp>\w{3}\s+\d{1,2}\s[\d:]+)\s(?P<hostname>\S+)\s(?P<process>\S+):\s(?P<message>.*)')

# Define keywords for login and other events to correlate
LOGIN_KEYWORDS = ['session opened for user', 'login']
OTHER_EVENT_KEYWORDS = ['sudo', 'failed password', 'file accessed']

# Define a function to parse a timestamp (assuming a standard log format, e.g., 'Jan  6 23:59:59')
def parse_timestamp(timestamp):
    try:
        return datetime.strptime(timestamp, '%b %d %H:%M:%S')
    except ValueError:
        return None

# Function to read and correlate events from a log file
def correlate_events(log_file_path):
    with open(log_file_path, 'r') as log_file:
        login_events = []
        correlated_events = []

        for line in log_file:
            match = LOG_PATTERN.match(line)
            if match:
                timestamp = parse_timestamp(match.group('timestamp'))
                process = match.group('process')
                message = match.group('message')

                # Check for login events
                if any(keyword in message.lower() for keyword in LOGIN_KEYWORDS):
                    login_events.append((timestamp, process, message))
                
                # Check for other events to correlate
                elif any(keyword in message.lower() for keyword in OTHER_EVENT_KEYWORDS):
                    event = (timestamp, process, message)
                    # Check if this event is close to any login event (e.g., within 5 minutes)
                    for login_event in login_events:
                        if abs((timestamp - login_event[0]).total_seconds()) <= 300:
                            correlated_events.append((login_event, event))

    # Print correlated events
    print("Correlated Events:")
    for login_event, other_event in correlated_events:
        print(f"Login Event: {login_event[0]} - {login_event[1]}: {login_event[2]}")
        print(f"Related Event: {other_event[0]} - {other_event[1]}: {other_event[2]}")
        print("---")

log_file_path = '../data/Linux_2k.log'
correlate_events(log_file_path)


Correlated Events:


### PORT SCANNING

In [11]:
import socket
import threading

def scan_target(ip, start_port, end_port):
    print(f"Scanning {ip} from port {start_port} to {end_port}")
    
    def scan_port(port):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket.setdefaulttimeout(1)  # Set timeout to 1 second
        result = sock.connect_ex((ip, port))  # Attempt to connect to the port
        if result == 0:
            print(f"Port {port} is open on {ip}")
        sock.close()

    for port in range(start_port, end_port + 1):
        thread = threading.Thread(target=scan_port, args=(port,))
        thread.start()

# Example usage (replace with the target IP and port range)
target_ip = "8.8.8.8"  # Replace with the IP address to scan
start_port = 1
end_port = 1024

scan_target(target_ip, start_port, end_port)


Scanning 8.8.8.8 from port 1 to 1024
Port 53 is open on 8.8.8.8


### ACCOUNT DEACTIVATION

In [None]:
import pwd
import os

# Function to disable a user account by appending ":!" to the password field in /etc/passwd
def disable_user_account(username):
    try:
        passwd_file = '../data/password.txt'
        backup_file = '../data/password.bak'
        
        # Backup the password file
        os.system(f'cp {passwd_file} {backup_file}')
        print(f"Backup created at {backup_file}")

        with open(passwd_file, 'r') as file:
            lines = file.readlines()

        with open(passwd_file, 'w') as file:
            for line in lines:
                print(username)
                if line.startswith(f"{username}"):
                    # Disable the account by replacing password with '!*' (or similar lock)
                    parts = line.split(":")
                    parts[1] = '!'
                    file.write(":".join(parts))
                else:
                    file.write(line)

        print(f"Account {username} has been disabled.")
    except KeyError:
        print(f"User {username} not found.")
    except Exception as e:
        print(f"Error: {e}")


# Disable user account example (replace 'username' with a real user)
username_to_disable = "joe"  # Replace with the target username
disable_user_account(username_to_disable)


Backup created at /Users/ken.clements/code/SEAS8416/src/password.bak
joe
Account joe has been disabled.


### FIREWALL BLOCKING

In [4]:
# Function to update firewall to block an IP address
def block_ip_address(ip_address):
    try:
        command = f"iptables -A INPUT -s {ip_address} -j DROP"
        os.system(command)
        print(f"Blocked IP address: {ip_address}")
    except Exception as e:
        print(f"Error blocking IP address {ip_address}: {e}")

def block_ip_address_macos(ip_address):
    try:
        command = f"sudo /usr/libexec/ApplicationFirewall/socketfilterfw --addblock {ip_address}"
        os.system(command)
        print(f"Blocked IP address: {ip_address} on macOS")
    except Exception as e:
        print(f"Error blocking IP address {ip_address} on macOS: {e}")

block_ip_address_macos('10.10.10.10')

Blocked IP address: 10.10.10.10 on macOS


sudo: a terminal is required to read the password; either use the -S option to read from standard input or configure an askpass helper
sudo: a password is required


### THREAT INTEL GATHERING

https://github.com/mitre-attack/mitreattack-python/

In [12]:
pip install mitreattack-python

Collecting mitreattack-python
  Downloading mitreattack_python-4.0.2-py3-none-any.whl.metadata (6.9 kB)
Collecting colour (from mitreattack-python)
  Downloading colour-0.1.5-py2.py3-none-any.whl.metadata (18 kB)
Collecting deepdiff (from mitreattack-python)
  Downloading deepdiff-8.5.0-py3-none-any.whl.metadata (8.2 kB)
Collecting drawsvg>=2.0.0 (from mitreattack-python)
  Downloading drawsvg-2.4.0-py3-none-any.whl.metadata (19 kB)
Collecting loguru (from mitreattack-python)
  Downloading loguru-0.7.3-py3-none-any.whl.metadata (22 kB)
Collecting openpyxl (from mitreattack-python)
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting pooch (from mitreattack-python)
  Downloading pooch-1.8.2-py3-none-any.whl.metadata (10 kB)
Collecting stix2 (from mitreattack-python)
  Downloading stix2-3.0.1-py2.py3-none-any.whl.metadata (10 kB)
Collecting tabulate (from mitreattack-python)
  Using cached tabulate-0.9.0-py3-none-any.whl.metadata (34 kB)
Collecting typer (from m

  DEPRECATION: Building 'antlr4-python3-runtime' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'antlr4-python3-runtime'. Discussion can be found at https://github.com/pypa/pip/issues/6334


In [13]:
!pip install OTXv2

Collecting OTXv2
  Downloading OTXv2-1.5.12-py3-none-any.whl.metadata (396 bytes)
Downloading OTXv2-1.5.12-py3-none-any.whl (16 kB)
Installing collected packages: OTXv2
Successfully installed OTXv2-1.5.12


In [14]:
from OTXv2 import OTXv2, IndicatorTypes

# Replace with your OTX API key
API_KEY = '635b5db4cd085e0fc25a959a8b5a7e3c200972df642468b7c1af1d9b78c64d4d'
otx = OTXv2(API_KEY, server='https://otx.alienvault.com/')


In [15]:
from OTXv2 import OTXv2, IndicatorTypes

# Query for a specific IP address (e.g., 8.8.8.8)
ip_address = "66.63.187.225"
result = otx.get_indicator_details_by_section(IndicatorTypes.IPv4, ip_address, 'general')

# Print the details
print(f"IP: {result.get('indicator')}")
print(f"Reputation: {result.get('reputation')}")
print(f"Country: {result.get('country_name')}")
print(f"Malware Activities: {result.get('pulse_info', {}).get('count', 0)} pulses")

print(f"Result: {result}")

if isinstance(result, dict) and 'pulse_info' in result:
    for pulse in result['pulse_info']['pulses']:
        print(f"Pulse: {pulse['name']} - {pulse['description']}")



IP: 66.63.187.225
Reputation: 0
Country: United States of America
Malware Activities: 17 pulses
Result: {'whois': 'http://whois.domaintools.com/66.63.187.225', 'reputation': 0, 'indicator': '66.63.187.225', 'type': 'IPv4', 'type_title': 'IPv4', 'base_indicator': {'id': 4019351765, 'indicator': '66.63.187.225', 'type': 'IPv4', 'title': '', 'description': '', 'content': '', 'access_type': 'public', 'access_reason': ''}, 'pulse_info': {'count': 17, 'pulses': [{'id': '67a908d5e56b2b72bec950c5', 'name': 'wikiworm', 'description': '', 'modified': '2025-03-11T19:04:45.862000', 'created': '2025-02-09T19:58:13.239000', 'tags': [], 'references': ['678e18fac3e3b92cb600dbec.xml'], 'public': 1, 'adversary': '', 'targeted_countries': [], 'malware_families': [], 'attack_ids': [], 'industries': [], 'TLP': 'white', 'cloned_from': None, 'export_count': 8, 'upvotes_count': 0, 'downvotes_count': 0, 'votes_count': 0, 'locked': False, 'pulse_source': 'web', 'validator_count': 0, 'comment_count': 0, 'followe

In [16]:
# Query for a specific domain (e.g., example.com)
domain = "fccfxejgtpqb.pw"
result = otx.get_indicator_details_by_section(IndicatorTypes.DOMAIN, domain, 'general')

# Print the details
print(f"Domain: {result.get('indicator')}")
print(f"Reputation: {result.get('reputation')}")
print(f"Malware Activities: {result.get('pulse_info', {}).get('count', 0)} pulses")


Domain: fccfxejgtpqb.pw
Reputation: None
Malware Activities: 12 pulses
