<a href="https://colab.research.google.com/github/M-EEKSHITHKUMAR/PortScanner-AutoFirewall/blob/main/Untitled4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Cell 1: Install required packages
!pip install pandas tqdm

print("Installed pandas and tqdm.")


Installed pandas and tqdm.


In [2]:
# Cell 2: Configure targets and scanning options
from google.colab import files
import socket, ipaddress, json, os, threading, time
from tqdm import tqdm
import pandas as pd

# --- EDIT THESE ---
# Enter targets as a comma separated list. Can be IPs or hostnames you own/are allowed to scan.
TARGETS_INPUT = "127.0.0.1"   # e.g., "192.168.1.10,example.com,10.0.0.5"
PORT_RANGE = "1-1024"         # e.g., "20-1024" or "22" or "80,443,8080"
THREADS = 200                 # number of worker threads (don't set too high)
TIMEOUT = 0.5                 # socket timeout in seconds
SIMULATE_ON_FAILURE = True    # If scanning fails (network restrictions), simulate results for demo

# convenience: parse targets and ports
def parse_targets(s):
    return [t.strip() for t in s.split(",") if t.strip()]

def parse_ports(pstr):
    ports = set()
    for part in pstr.split(","):
        part = part.strip()
        if "-" in part:
            a,b = part.split("-")
            ports.update(range(int(a), int(b)+1))
        else:
            if part:
                ports.add(int(part))
    return sorted(ports)

TARGETS = parse_targets(TARGETS_INPUT)
PORTS = parse_ports(PORT_RANGE)

print("Targets:", TARGETS)
print("Number of ports to check:", len(PORTS))
print("Thread workers:", THREADS)


Targets: ['127.0.0.1']
Number of ports to check: 1024
Thread workers: 200


In [3]:
# Cell 3: Multi-threaded TCP port scanner
import queue, socket, time
from tqdm import tqdm

def scan_host_ports(host, ports, timeout=0.5, workers=100):
    """
    Returns list of dicts: [{host, port, status, service_guess}]
    status: 'open' | 'closed' | 'filtered' | 'error'
    """
    results = []
    q = queue.Queue()
    for p in ports:
        q.put(p)

    lock = threading.Lock()
    stop_on_errors = False

    def worker():
        nonlocal results
        while not q.empty():
            try:
                port = q.get_nowait()
            except queue.Empty:
                return
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(timeout)
                    code = s.connect_ex((host, port))
                    # connect_ex returns 0 if open
                    if code == 0:
                        status = "open"
                    else:
                        # code values can indicate filtered or closed; we treat non-zero as closed/filtered
                        status = "closed"
            except Exception as e:
                status = "error"
            # best-effort service guess by common ports:
            service_guess = common_port_service(port)
            with lock:
                results.append({"host": host, "port": port, "status": status, "service_guess": service_guess})
            q.task_done()

    # start threads
    threads = []
    for i in range(min(workers, max(1, len(ports)))):
        t = threading.Thread(target=worker)
        t.daemon = True
        t.start()
        threads.append(t)

    # progress bar wait
    while any(t.is_alive() for t in threads):
        time.sleep(0.05)

    return results

# helper service guesses
COMMON_PORTS = {
    20:"ftp-data",21:"ftp",22:"ssh",23:"telnet",25:"smtp",53:"dns",80:"http",110:"pop3",
    123:"ntp",143:"imap",161:"snmp",443:"https",587:"smtp-submission",3306:"mysql",3389:"rdp",
    5900:"vnc",8080:"http-proxy"
}
def common_port_service(p):
    return COMMON_PORTS.get(p, "")


In [4]:
# Cell 4: Run scan across targets and ports. Will simulate if network blocked (optional).
all_results = []
actual_scan_success = True

for target in TARGETS:
    try:
        # try to resolve DNS first
        resolved = socket.gethostbyname(target)
    except Exception as e:
        print(f"Could not resolve {target}: {e}")
        # skip or continue with provided string (will likely fail)
        resolved = target

    try:
        print(f"\nScanning {target} ({resolved}) ...")
        res = scan_host_ports(resolved, PORTS, timeout=TIMEOUT, workers=THREADS)
        # if we get no results, consider failure
        if not res:
            raise RuntimeError("No scan results (possible network restrictions).")
        all_results.extend(res)
        print(f"Completed scan for {target}, checked {len(res)} ports.")
    except Exception as e:
        print(f"Scan failed for {target}: {e}")
        actual_scan_success = False
        if SIMULATE_ON_FAILURE:
            # produce simulated results for demo purposes
            import random
            print("Simulating results for demo...")
            sim = []
            for p in PORTS:
                # make a few common ports open
                if p in (22,80,443,3306) or random.random() < 0.02:
                    status="open"
                else:
                    status="closed"
                sim.append({"host": target, "port": p, "status": status, "service_guess": common_port_service(p)})
            all_results.extend(sim)

# Convert to DataFrame
df = pd.DataFrame(all_results)
print("\nScan summary (head):")
display(df.head())

# Basic aggregation
open_ports_df = df[df['status']=="open"].sort_values(['host','port'])
print(f"\nTotal open ports found: {len(open_ports_df)}")
display(open_ports_df)



Scanning 127.0.0.1 (127.0.0.1) ...
Completed scan for 127.0.0.1, checked 1024 ports.

Scan summary (head):


Unnamed: 0,host,port,status,service_guess
0,127.0.0.1,1,closed,
1,127.0.0.1,2,closed,
2,127.0.0.1,3,closed,
3,127.0.0.1,4,closed,
4,127.0.0.1,5,closed,



Total open ports found: 0


Unnamed: 0,host,port,status,service_guess


In [5]:
# Cell 5: Save CSV of raw results and provide a simple summary
df.to_csv("port_scan_results.csv", index=False)
print("Saved port_scan_results.csv")

summary = []
for host in sorted(df['host'].unique()):
    sub = df[df['host']==host]
    open_count = len(sub[sub['status']=="open"])
    top_open = list(sub[sub['status']=="open"].sort_values("port")['port'].astype(str).head(10))
    summary.append({"host": host, "total_ports_checked": len(sub), "open_ports_count": open_count, "open_ports_sample": ",".join(top_open)})

summary_df = pd.DataFrame(summary)
display(summary_df)
summary_df.to_csv("port_scan_summary.csv", index=False)
print("Saved port_scan_summary.csv")


Saved port_scan_results.csv


Unnamed: 0,host,total_ports_checked,open_ports_count,open_ports_sample
0,127.0.0.1,1024,0,


Saved port_scan_summary.csv


In [6]:
# Cell 6: Produce firewall rule suggestions for open ports
# We will produce:
#  - UFW deny commands to block incoming connections to discovered open ports
#  - iptables commands (DROP)
#  - A JSON file containing suggested rules

rules = []
for _, row in df[df['status']=="open"].iterrows():
    host = row['host']
    p = int(row['port'])
    svc = row['service_guess'] or ""
    rules.append({
        "host": host,
        "port": p,
        "service": svc,
        "ufw_command": f"sudo ufw deny proto tcp from any to {host} port {p}",
        "iptables_command": f"sudo iptables -A INPUT -p tcp --dport {p} -j DROP"
    })

# Save JSON
with open("firewall_suggestions.json", "w") as f:
    json.dump(rules, f, indent=2)
print("Saved firewall_suggestions.json with", len(rules), "entries.")

# Also write shell scripts for UFW and iptables
with open("apply_ufw_rules.sh", "w") as f:
    f.write("#!/bin/bash\n# Run these on the target host (with sudo) if you want to apply rules\n")
    for r in rules:
        f.write(r["ufw_command"] + "\n")
with open("apply_iptables_rules.sh", "w") as f:
    f.write("#!/bin/bash\n# Run these on the target host (with sudo) if you want to apply rules\n")
    for r in rules:
        f.write(r["iptables_command"] + "\n")

print("Saved apply_ufw_rules.sh and apply_iptables_rules.sh (download and run on target host with care).")


Saved firewall_suggestions.json with 0 entries.
Saved apply_ufw_rules.sh and apply_iptables_rules.sh (download and run on target host with care).


In [7]:
# Cell 7: Generate a sample Terraform snippet (AWS network ACL deny rules) as a .tf.json file
# NOTE: Security groups in AWS are allow-only. To model deny rules we use Network ACL entries (stateless).
# This JSON is a template; you must adapt VPC/subnet/acl IDs before using in production.

# helper to convert port to rule number (Network ACL ruleNumber must be unique 1-32766)
def rule_number_from_idx(i):
    return 100 + i

nacl_entries = []
i = 0
for r in rules:
    port = r['port']
    # cover a single port (tcp)
    entry = {
        "rule_number": rule_number_from_idx(i),
        "protocol": "6",  # TCP
        "rule_action": "deny",
        "egress": False,
        "cidr_block": "0.0.0.0/0",
        "from_port": port,
        "to_port": port
    }
    nacl_entries.append(entry)
    i += 1

tf_template = {
  "resource": {
    "aws_network_acl_rule": {
    }
  }
}

# Build resources keyed uniquely
for idx, entry in enumerate(nacl_entries):
    res_name = f"deny_port_{entry['from_port']}"
    tf_template['resource']['aws_network_acl_rule'][res_name] = {
        "network_acl_id": "${aws_network_acl.example.id}",  # placeholder - adapt in your TF
        "rule_number": entry['rule_number'],
        "egress": entry['egress'],
        "protocol": entry['protocol'],
        "rule_action": entry['rule_action'],
        "cidr_block": entry['cidr_block'],
        "from_port": entry['from_port'],
        "to_port": entry['to_port']
    }

with open("aws_nacl_deny_ports.tf.json", "w") as f:
    json.dump(tf_template, f, indent=2)

print("Saved aws_nacl_deny_ports.tf.json — adapt placeholders before applying with Terraform.")


Saved aws_nacl_deny_ports.tf.json — adapt placeholders before applying with Terraform.


In [8]:
# Cell 8: Offer downloads (Colab)
from google.colab import files as cf
files_to_download = ["port_scan_results.csv", "port_scan_summary.csv", "firewall_suggestions.json",
                     "apply_ufw_rules.sh", "apply_iptables_rules.sh", "aws_nacl_deny_ports.tf.json"]
for fn in files_to_download:
    if os.path.exists(fn):
        print("Prepared for download:", fn)
        cf.download(fn)
    else:
        print("Missing (not created):", fn)


Prepared for download: port_scan_results.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Prepared for download: port_scan_summary.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Prepared for download: firewall_suggestions.json


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Prepared for download: apply_ufw_rules.sh


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Prepared for download: apply_iptables_rules.sh


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Prepared for download: aws_nacl_deny_ports.tf.json


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>