# Build Your Own Firewall

Some sessions will be delivered using these notebooks within Teams assignments. This is an experiment but it should mean we explore can explore data and code whilst taking notes and storing your work.

- Assignment : Build Your Own Firewall
- 
Open firewall.ipynb
- **Trust** it
- Hit **Continue** in the top rightf
Click "Shift + Enter" to pass though each block
ed)

You will be pressing "Shift + Enter" a lot. There are blank blocks between slides thou you just need to "Shift + Enter" through

## Set up
Press "Shift + Enter" to run the code that will install the libraries you will need.
This is done in such a way that this code should run in several enviroment and when run on a Linux machine can work as a real firewall.

In [None]:
import os
import sys
import time
from collections import defaultdict

LIVE = os.uname().sysname == 'Linux' and 'codespaces' not in os.uname().nodename
PYOLITE = os.uname().sysname == 'Emscripten'
try:
    import micropip
    await micropip.install(["pyoliteutils"])
except:
    pass
    
WHITELISTFILE = "whitelist.txt"
BLACKLISTFILE = "blacklist.txt"
LOGFILE = "logs.txt"

if PYOLITE:
    from pyoliteutils import *
    GITFOLDERURL = "https://raw.githubusercontent.com/UTCSheffield/Y10-Computing-GCSE-jupyter/main/content/Firewall/"
    await get_file_from_url(GITFOLDERURL+WHITELISTFILE)
    await get_file_from_url(GITFOLDERURL+BLACKLISTFILE)
    await get_file_from_url(GITFOLDERURL+LOGFILE, force=True)
    LINUX = False

if LIVE:
    from scapy.all import sniff, IP, TCP

# What is a Firewall?

In [None]:
mermaid("""
mindmap
  )What is a Firewall?(
    Idea 1
    Idea 2
        Related Thought
""")

[Video answer](https://www.youtube.com/watch?v=UfyF6CvL4Ts&t=94s)

## Scaffolding

- Loading a text file of IPs into an array / list
- Logging the firewalls activity so we can check it later

In [None]:
THRESHOLD = 100 # 100 requests per second is too much
DEBUG = True
DEFAULTALLOW = True # Allow unknnown traffic (you would normally dow this)

# Read IPs from a file
def read_ip_file(filename):
    with open(filename, "r") as file:
        ips = [line.strip() for line in file]
    return set(ips)

# Log events to a file
def log_event(message):
    if (DEBUG):
        print(message)
    
    if (LIVE):
        log_folder = "logs"
        os.makedirs(log_folder, exist_ok=True)
        timestamp = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
        log_file = os.path.join(log_folder, f"log_{timestamp}.txt")
        
        with open(log_file, "a") as file:
            file.write(f"{message}\n")
    else:
        timestamp = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
        
        with open(LOGFILE, "a") as file:
            file.write(f"{timestamp} : {message}\n")

## Some checks

Because of python rules we need to define functions that do things before we try to use them

In [None]:
# Check for Nimda worm signature
def is_nimda_worm(packet):
    if packet.haslayer(TCP) and packet[TCP].dport == 80:
        payload = packet[TCP].payload
        return "GET /scripts/root.exe" in str(payload)
    return False

# Check a Scapy Packet and block if needed
def packet_callback(packet):
    # Check for Nimda worm signature
    if is_nimda_worm(packet):
        print(f"Blocking Nimda source IP: {src_ip}")
        if LIVE:
            os.system(f"iptables -A INPUT -s {src_ip} -j DROP")
        log_event(f"Blocking Nimda source IP: {src_ip}")
        return False

    return is_allowed(src_ip=packet[IP].src, dst_ip=packet[IP].dst, dport=packet[TCP].dport)

# Check if a packet trying to access a Port is allowed
def is_port_allowed(dport=None):
    return True

# Check if a packet is allowed to go to a destination machine
def is_dst_ip_allowed(dst_ip=None):
    return True

## Main Checking function

In [None]:
#Check if a packet is Allowed (if not its blocked)
def is_allowed(src_ip=None, dst_ip=None, dport=None):
    if src_ip==None:
        return False

    # Check if the port is allowed
    if not is_port_allowed(dport):
        return False

    # Check if IP is in the whitelist
    if src_ip in whitelist_ips:
        return True

    # Check if IP is in the blacklist
    if src_ip in blacklist_ips:
        if LIVE: # If we are running this live on a real linux machine
            os.system(f"iptables -A INPUT -s {src_ip} -j DROP") # Add this to our real firewall rules
        log_event(f"Blocking blacklisted IP: {src_ip}")
        return False

    # Check if IP is in the blocklist
    if src_ip in blocked_ips:
        if LIVE:
            os.system(f"iptables -A INPUT -s {src_ip} -j DROP")
        log_event(f"Blocking blocked IP: {src_ip}")
        return False

    # Count the packets coming from an IP
    packet_count[src_ip] += 1

    current_time = time.time()
    time_interval = current_time - start_time[0]
    
    # In pyolite there aren't delays, in the real world we can check every second
    if PYOLITE or time_interval >=1 :
        for ip, count in packet_count.items():
            if PYOLITE :
                packet_rate = count
            else:    
                packet_rate = count / time_interval
            #print("packet_rate", packet_rate)
            if (packet_rate > THRESHOLD) and ip not in blocked_ips:
                print(f"Blocking IP: {ip}, packet rate: {packet_rate}")
                if LIVE:
                    os.system(f"iptables -A INPUT -s {ip} -j DROP")
                log_event(f"Blocking IP: {ip}, packet rate: {packet_rate}")
                blocked_ips.add(ip)
                return False
        
        if not PYOLITE :
            packet_count.clear()
            start_time[0] = current_time

    if not is_dst_ip_allowed(dst_ip):
        return False
        
    if DEFAULTALLOW :
        log_event(f"Allowing IP: {src_ip}")
        return True
    log_event(f"Blocking IP: {src_ip}")
    return False #True

## Load the whitelists and blacklists

In [None]:
# Import whitelist and blacklist IPs    
whitelist_ips = read_ip_file("whitelist.txt")
blacklist_ips = read_ip_file("blacklist.txt")

if DEBUG :
    print("Whitelist IPs: ", whitelist_ips)
    print("Blacklist IPs: ", blacklist_ips)
    

## Test it

In [None]:
if not LIVE : # Run Tests
    packet_count = defaultdict(int)
    start_time = [time.time()]
    blocked_ips = set()

    ## Whitelist
    assert is_allowed(src_ip="127.0.0.1", dst_ip="127.0.0.1", dport=666) == True

    ## Blacklist
    assert is_allowed(src_ip="198.51.100.255") == False


    ## Not on a list
    assert is_allowed(src_ip="127.0.0.2") == DEFAULTALLOW

    ## Not on a list but about to DoS us
    assert is_allowed("127.0.0.3") == DEFAULTALLOW

    # Making 100 requests really fast
    for i in range(100):
        is_allowed(src_ip="127.0.0.3")
    
    # Has DoSed us
    assert is_allowed(src_ip="127.0.0.3") == False


## Task : Add Port Blocking

In [None]:
def is_port_allowed(dport=None):
    if dport in [22]:
        return False
    return DEFAULTALLOW

if not LIVE : # Run Tests
    packet_count = defaultdict(int)
    start_time = [time.time()]
    blocked_ips = set()

    assert is_allowed(src_ip="127.0.0.1", dst_ip="127.0.0.1", dport=80) == True
    assert is_allowed(src_ip="127.0.0.1", dst_ip="127.0.0.1", dport=22) == False
    assert is_allowed(src_ip="127.0.0.1", dst_ip="127.0.0.1", dport=666) == DEFAULTALLOW


- Look at this article https://www.makeuseof.com/vulnerable-ports-check-when-pentesting/
- Add tests for all the ports it says are a risk but we do want to run a webserver
- Make your firewall block those ports

## Task : Add Destination filtering

In [None]:
# Check if a packet is allowed to go to a destination machine
def is_dst_ip_allowed(dst_ip=None):
    

    return DEFAULTALLOW

if not LIVE : # Run Tests
    packet_count = defaultdict(int)
    start_time = [time.time()]
    blocked_ips = set()

    ## Whitelist
    assert is_allowed(src_ip="127.0.0.1", dst_ip="127.0.0.1", dport=80) == True
    assert is_allowed(src_ip="127.0.0.1", dst_ip="8.8.8.8", dport=80) == False
    assert is_allowed(src_ip="127.0.0.1", dst_ip="127.240.0.1", dport=80) == DEFAULTALLOW



## Testing for the scapy / Linux version

In [None]:
if LIVE: # run the tests using the scapy wrappers
    ## Whitelist
    assert packet_callback(IP(src="127.0.0.1", dst="127.0.0.1")/TCP(dport=666)) == True

    ## Blacklist
    assert packet_callback(IP(src="198.51.100.255", dst="127.0.0.1")/TCP(dport=666)) == False


    ## Not on a list
    assert packet_callback(IP(src="127.0.0.2", dst="127.0.0.1")/TCP(dport=666)) == DEFAULTALLOW

    ## Not on a list but about to DoS us
    assert packet_callback(IP(src="127.0.1.3", dst="127.0.0.1")/TCP(dport=666)) == DEFAULTALLOW

    # DoSing
    for i in range(50):
        packet_callback(IP(src="127.0.1.3", dst="127.0.0.1")/TCP(dport=666))
        time.sleep(0.01)
    
    # Has DoSed us
    assert packet_callback(IP(src="127.0.1.3", dst="127.0.0.1")/TCP(dport=666)) == False


### Show the Logs

In [None]:
with open(LOGFILE) as f:
    print(f.read())

## Make it really work

You can use the "Export As" to get this as a python file.
You need to :-
- Delete the the mindmap
- Delete any line starting including ```await```
- The rest should work on a Linux Machine

In [None]:
# This code will make the firewall actually function on a properly set up Linux machine
if __name__ == "__main__" and LIVE:
    packet_count = defaultdict(int)
    start_time = [time.time()]
    blocked_ips = set()
    if os.geteuid() != 0:
        print("This script requires root privileges.")
        sys.exit(1)
    print("Monitoring network traffic...")
    sniff(filter="ip", prn=packet_callback)



## What are the differences between what we have and a real one?

- Rules Order
- IP Ranges
- Content Filters
- .....

## Save and turn in