# Port Scanner mit Python


In [None]:
import socket

start_port = 0
end_port = 150
dst_host = "127.0.0.1"
verbose = False

open_ports = []
for testport in range(start_port, end_port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(0.5)
    try:
        sock.connect((dst_host, int(testport)))
        open_ports.append(testport)
        print(f"Offener Port: {testport}")
        sock.shutdown(socket.SHUT_RD)

    except Exception as e:
        if verbose == 1: print(f"Geschlossener Port: {testport} Expection: {e}")
    
    finally:
        sock.close()

open_ports

## 1. Inputs <br>
### Helper-Input Funktion für Eingabe von: <br>
Host-IP -> dst_host <br>
Erster Port -> start_port <br>
Letzter Port -> end_port<br>
Ausgabetyp -> verbose

def get_input(typ: type, text: str, std_value: any):
    var = input(text)
    
    if not var:
        return std_value
    if typ != str:
        try:
            return typ(var)
        except ValueError:
            print(f"Falsche Eingabe! Geben Sie eine etwas vom Typ {typ} ein.")
            return get_input(typ, text, std_value)
    else:
        return var

In [None]:
from helper import get_input

dst_host = "127.0.0.1"
start_port = 0
end_port = 1025

print("Input startet... (keine Eingabe = Standart Werte)")
dst_host = get_input(str, f"Gebe die Host-IP ein({dst_host}): ", dst_host)
start_port = get_input(int, f"Gebe den Start Port ein({start_port}): ", start_port)
end_port = get_input(int, f"Gebe den End Port ein({end_port}): ", end_port)

# Ausgabe
(dst_host, start_port, end_port)

## 2. Port Scan
### find_all_async() Funktion aus port_scanner.py

In [None]:
from concurrent import futures
import socket

# Checkt ob ein TCP-Port auf einem Host-System erreichbar/offen ist.
def port_checker(dst_host=str, testport=int, verbose=False) -> tuple[bool, int]:
    print(f'Port Checker on {testport} started...')

    port_open = False

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(0.5)
    val = sock.connect_ex((dst_host, testport))
    
    if val == 0:
        port_open = True
        print(f"Offener Port: {testport}")
    sock.close()

    if verbose and not port_open: print(f"Geschlossener Port: {testport}")

    print(f'Port Checker on {testport} finished...')
    return port_open, testport


# Asynchrone Funktionen brauchen eine Funktion, die den Aufruf managed
# -> hier find_all_async()
open_ports=[]

try:
    # sock.connect() versucht eine Verbindung mit der angegebenen IP/Port herzustellen
    hostinfo = socket.gethostbyaddr(dst_host)
    print(f"Hostname: {hostinfo[0]}, Aliaslist: {hostinfo[1]}, Ipaddrlist: {hostinfo[2]}")
    # wird keine Exception ausgelöst geht es hier weiter
    # close() schließt Verbindung und beendet Socket

    # Erzeugung der Asynchronen Prozesse https://docs.python.org/3/library/concurrent.futures.html
    with futures.ThreadPoolExecutor(max_workers=2000) as executor:
        # erzeugt eine Liste mit den Typen list[Future[tuple[bool, int]]] , also wird der Wert noch erwartet, da die Werte
        # vom Typ Future sind.
        results = [
            executor.submit(port_checker, dst_host=dst_host, testport=i) for i in range(start_port, end_port+1)
        ]
        # Sage dem executor das gewartet werden soll, bis ein Ergebnis zurückkommt
        executor.shutdown(wait=True, cancel_futures=False)
        
        # Wenn thread fertig ist wird der Port der Liste open_ports hinzugefügt
        # muss nicht in genauer Reihenfolge passieren
        for thread in futures.as_completed(results):
            # checken ob der Port offen ist -> Übergebene Bool-Variable
            if thread.result()[0]: open_ports.append(thread.result()[1])

        
#Exception wenn Host nicht erreichbar ist.
except Exception as e:
    print(f"Der Host existiert nicht. Exception: {e}")


# Sortiere Ports für aufsteigende Auflistung
open_ports.sort()
# Gebe sortierte Ports aus
print(f'open_ports returned: {open_ports}')


## 3. Services Dict
### get_services_dict() Funktion aus helper.py

In [None]:
# Öffne Datei und speichere sie in Variable
fileread = open("C:\Windows\System32\drivers\etc\services", "r").read()
# Erstelle eine Liste der einzelnen Zeilen der Services-Datei
lines_string = fileread.splitlines()

# erstelle das Dictionary mit {port: service}
services_dict = {}
# Laufe durch alle Elemente in lines_string
for line in lines_string:
    # Splite line beim Hashtag
    split_by_hashtag = line.split(sep="#", maxsplit=2)[0]
    # Splite neue Line in 3 Teile bei Leerzeichen
    split_in_3 = split_by_hashtag.split(maxsplit=3)
    if len(split_in_3) > 1:
        # Trenne zweite Spalte bei "/" auf
        split_port_protc = split_in_3[1].split(sep="/")

        # es wird getestet ob bereits ein Eintrag für den Port existiert
        # https://stackoverflow.com/questions/68193407/how-do-i-check-whether-an-open-port-is-tcp-or-udp
        if int(split_port_protc[0]) not in services_dict.keys():
            # trage key, value par in Dictionary ein
            services_dict[int(split_port_protc[0])] = split_in_3[0]

services_dict

## 4. Logs erzeugen

In [None]:
from helper import create_file_name
import logging 

# Erzeuge Logger und dessen File
file_name = create_file_name()
logging.basicConfig(filename=file_name, encoding='utf-8', level=logging.DEBUG)

In [None]:
from datetime import datetime


logging.info(f"""   Zeitstempel: {datetime.now().strftime("%H:%M:%S")}, ANGRIF auf: {dst_host}""")

# Ordne offenen Ports einem Service zu und schreibe in die Logger Datei
for port in open_ports:
    try:
        logging.info(f"   Offener Port: {port}, Protokoll: tcp, Protokoll: {services_dict.get(port)}")
    except TypeError as e:
        logging.error(f"   Port {port} offen, aber nicht in Protokoll: {e}")

print(f"Logs wurden unter dem Namen {file_name} erstellt")

