<a href="https://colab.research.google.com/github/RatneshRawat25/Networking/blob/main/TCP_Port_Scanner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import argparse
import socket
import threading
from queue import Queue
import time

# A print_lock is what is used to prevent concurrent prints from multiple threads
print_lock = threading.Lock()

def grab_banner(sock):
    """
    Attempts to receive 1024 bytes of data from a connected socket
    to identify the service running on the port.
    """
    try:
        sock.settimeout(0.5) # Set a short timeout for receiving data
        return sock.recv(1024).decode('utf-8', errors='ignore').strip()
    except socket.timeout:
        return "Service detected, but no banner received (timeout)."
    except Exception:
        return "Service detected, but could not retrieve banner."

def port_scan(target, port):
    """
    Attempts to connect to a specific port on the target.
    If successful, the port is open. Also attempts banner grabbing.
    """
    try:
        # Create a new socket using IPv4 and TCP
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket.setdefaulttimeout(1) # Set a default timeout for the connection attempt

        # Attempt to connect
        result = s.connect_ex((target, port))

        if result == 0:
            banner = grab_banner(s)
            with print_lock:
                print(f"[+] Port {port:<5} is open   | Service: {banner}")

        s.close()

    except socket.error as e:
        with print_lock:
            print(f"[!] Socket error on port {port}: {e}")
    except Exception as e:
        with print_lock:
            print(f"[!] An error occurred on port {port}: {e}")

def threader(q, target):
    """
    Pulls a worker (port number) from the queue and processes it.
    """
    while True:
        worker = q.get()
        port_scan(target, worker)
        q.task_done()

def main():
    """Main function to parse arguments and orchestrate the scan."""
    parser = argparse.ArgumentParser(description="A simple multi-threaded TCP port scanner with banner grabbing.")
    parser.add_argument("target", help="The target IP address or hostname to scan.")
    parser.add_argument("-p", "--ports", default="1-1024", help="Port range to scan (e.g., 1-1024, 80, 443). Defaults to 1-1024.")
    parser.add_argument("-t", "--threads", type=int, default=100, help="Number of threads to use. Defaults to 100.")

    # --- FOR GOOGLE COLAB / JUPYTER NOTEBOOK ---
    # To run this script in a notebook, you must manually provide the arguments
    # as a list of strings. This bypasses the notebook's internal arguments.
    # The line below simulates running: python port_scanner.py 127.0.0.1 -p 21-25

    # Use this line to test the scanner on your local machine.
    # You can change '127.0.0.1' to another host or change the port range.
    args = parser.parse_args(['127.0.0.1', '-p', '21-80'])

    target_host = args.target
    num_threads = args.threads

    # Resolve hostname to IP address for clarity in the report
    try:
        target_ip = socket.gethostbyname(target_host)
    except socket.gaierror:
        print(f"[!] Error: Hostname '{target_host}' could not be resolved.")
        return

    # --- Print Scan Information ---
    print("-" * 50)
    print(f"[*] Scanning Target: {target_host} ({target_ip})")
    print(f"[*] Time started: {time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"[*] Using {num_threads} threads.")
    print("-" * 50)

    # --- Port Parsing Logic ---
    ports_to_scan = []
    try:
        if '-' in args.ports:
            start, end = map(int, args.ports.split('-'))
            if 1 <= start <= 65535 and 1 <= end <= 65535 and start <= end:
                ports_to_scan = range(start, end + 1)
            else:
                raise ValueError
        else:
            ports = [int(p.strip()) for p in args.ports.split(',')]
            for port in ports:
                if not 1 <= port <= 65535:
                    raise ValueError
            ports_to_scan = ports
    except ValueError:
        print("[!] Error: Invalid port specification. Use a range like '1-1024' or comma-separated ports like '80,443,8080'.")
        return

    # --- Threading Setup ---
    q = Queue()

    for _ in range(num_threads):
        t = threading.Thread(target=threader, args=(q, target_ip))
        t.daemon = True # Allows main program to exit even if threads are running
        t.start()

    for worker in ports_to_scan:
        q.put(worker)

    q.join() # Wait until the queue is empty and all threads have finished

    print("-" * 50)
    print("[*] Scan Complete.")
    print("-" * 50)

if __name__ == "__main__":
    main()



--------------------------------------------------
[*] Scanning Target: 127.0.0.1 (127.0.0.1)
[*] Time started: 2025-09-06 11:46:22
[*] Using 100 threads.
--------------------------------------------------
--------------------------------------------------
[*] Scan Complete.
--------------------------------------------------
