From 56a9b4ce2bfbed4b674b146ee4388e9c361890f0 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Sun, 30 Jul 2023 21:40:17 +0200 Subject: [PATCH] improve flow * add support for FB 3390 * improve transfer visualization through tftp * isort, black, typing, remove unused imports --- fritzflash.py | 126 ++++++++++++++++++++++++++----------------------- simple_tftp.py | 28 ++++++----- 2 files changed, 83 insertions(+), 71 deletions(-) mode change 100755 => 100644 fritzflash.py diff --git a/fritzflash.py b/fritzflash.py old mode 100755 new mode 100644 index 4641ec1..c4cd002 --- a/fritzflash.py +++ b/fritzflash.py @@ -1,22 +1,22 @@ #! /usr/bin/env python3 import argparse -import ipaddress -from ipaddress import ( - IPv4Interface, - IPv4Address, - IPv6Interface, - IPv6Address, - AddressValueError, -) import platform import socket import time +from contextlib import contextmanager from ftplib import FTP +from ipaddress import ( + AddressValueError, + IPv4Address, + IPv4Interface, + IPv6Address, + IPv6Interface, + ip_address, + ip_interface, +) from pathlib import Path from subprocess import run -from contextlib import contextmanager - -from typing import List, ContextManager, Union +from typing import Generator, List, Union from simple_tftp import serve_file @@ -52,7 +52,7 @@ def __init__( break except socket.timeout: i += 1 - except OSError as e: + except OSError: time.sleep(1) i += 1 if i > max_retry: @@ -98,7 +98,9 @@ def reboot(self): @contextmanager -def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None]: +def set_ip( + ipinterface: IPInterface, network_device: str +) -> Generator[bool, None, None]: if IS_POSIX: output = run( ["ip", "addr", "add", ipinterface.with_prefixlen, "dev", network_device], @@ -115,8 +117,7 @@ def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None ipinterface.with_prefixlen, "dev", network_device, - ], - capture_output=True, + ] ) else: output = run( @@ -126,7 +127,7 @@ def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None "ipv4", "add", "address", - f'"{network_device}"', + f"{network_device}", f"{ipinterface.ip}", f"{ipinterface.netmask}", ], @@ -143,10 +144,9 @@ def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None "ipv4", "delete", "address", - f'"{network_device}"', + f"{network_device}", f"{ipinterface.ip}", - ], - capture_output=True, + ] ) @@ -180,7 +180,7 @@ def ssh(host: IPAddress, cmd: List[str], user: str = "root"): run(["ssh", *args, f"{user}@{host}", *cmd]).check_returncode() -def scp(host: IPAddress, file: Path, user: str = "root", target: Path = "/tmp/"): +def scp(host: IPAddress, file: Path, user: str = "root", target: Path = Path("/tmp/")): null_file = "/dev/null" if IS_POSIX else "NUL" args = ["-o", "StrictHostKeyChecking=no", "-o", f"UserKnownHostsFile={null_file}"] if scp_legacy_check(): @@ -197,23 +197,13 @@ def connection_refused_message(): ) -def start_message(ip_address): +def start_message(): print( "This program will help you installing OpenWRT or Gluon, a widely used Firmware for Freifunk networks, onto your AVM device.\n" "You can always find the most current version of this script at https://www.github.com/freifunk-darmstadt/fritz-tools\n\n" "It is strongly recommended to only connect your computer to the device you want to flash.\n" "Try to disable all other connections (Ethernet, WiFi/WLAN, VMs) if detection fails.\n" - "Sometimes an unmanaged switch between your AVM device and your computer is helpful.\n\n" - "If you run this program with according permission, it will configure IPs on your host automatically.\n" - "Otherwise, make sure you have assigned your PC a static IP Address in the Subnet of the device you want to flash.\n" - "The following example would be a completely fine option:\n" - ) - print("IP-Address: %s" % str(ipaddress.ip_address(ip_address) + 1)) - print("Subnet: 255.255.255.0") - print("Gateway: %s" % str(ipaddress.ip_address(ip_address))) - print("DNS Servers: Leave blank\n") - print( - "Once you're ready to flash, press enter, disconnect power from your AVM device and reconnect the power-supply." + "Sometimes an unmanaged switch between your AVM device and your computer is helpful." ) @@ -278,7 +268,7 @@ def autodiscover_avm_ip(): bytearray.fromhex("0000120101000000c0a8b20100000000"), ("255.255.255.255", 5035), ) - while 1: + while True: data, addr = receiver.recvfrom(64) if addr[0] == "192.168.178.1": print("\rFritzBox found at %s" % addr[0]) @@ -325,6 +315,9 @@ def determine_image_name(env_string): "gluon": ["avm-fritz-box-7312-sysupgrade.bin"], "openwrt": ["avm_fritz7312-squashfs-sysupgrade.bin"], }, + "193": { + "openwrt": ["avm_fritz3390-initramfs-kernel.bin"], + }, "196": { "gluon": ["avm-fritz-box-7360-v2-sysupgrade.bin"], "openwrt": ["avm_fritz7360sl-squashfs-sysupgrade.bin"], @@ -333,6 +326,15 @@ def determine_image_name(env_string): "gluon": ["avm-fritz-wlan-repeater-450e-sysupgrade.bin"], "openwrt": ["fritz450e-squashfs-sysupgrade.bin"], }, + "203": { + "openwrt": ["openwrt-lantiq-xrx200-avm_fritz7362sl-initramfs-kernel.bin"] + }, + "209": { + "openwrt": ["openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin"] + }, + "218": { + "openwrt": ["openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin"] + }, "219": { "gluon": ["avm-fritz-box-4020-sysupgrade.bin"], "openwrt": [ @@ -344,17 +346,6 @@ def determine_image_name(env_string): "gluon": ["avm-fritz-box-4040-bootloader.bin"], "openwrt": ["avm_fritzbox-4040-squashfs-eva.bin"], }, - "203": { - "openwrt": [ - "openwrt-lantiq-xrx200-avm_fritz7362sl-initramfs-kernel.bin" - ] - }, - "209": { - "openwrt": ["openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin"] - }, - "218": { - "openwrt": ["openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin"] - }, "236": {"openwrt": ["uboot-fritz7530.bin"]}, "244": {"openwrt": ["uboot-fritz1200.bin"]}, "247": { @@ -465,6 +456,7 @@ def perform_flash(ip, file): "openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin", "openwrt-lantiq-xrx200-avm_fritz7362sl-initramfs-kernel.bin", "openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin", + "avm_fritz3390-initramfs-kernel.bin", ]: size = file.stat().st_size assert size < 0x2000000 @@ -485,6 +477,7 @@ def perform_flash(ip, file): if file.name in [ "openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin", "openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin", + "avm_fritz3390-initramfs-kernel.bin", ]: ftp.voidcmd("SETENV linux_fs_start 0") ftp.voidcmd("SETENV memsize 0x%08x" % (addr)) @@ -504,12 +497,12 @@ def perform_bootloader_flash( imagefile: Path = None, flash_tftp: bool = None, ): - with set_ip(ipaddress.ip_interface("192.168.1.70/24"), args.device) as can_set_ip: + with set_ip(ip_interface("192.168.1.70/24"), args.device) as can_set_ip: if not can_set_ip: print("could not set ip to 192.168.1.70/24") print("make sure to run with admin privileges") exit(1) - target_host = ipaddress.ip_address("192.168.1.1") + target_host = ip_address("192.168.1.1") print("Waiting for Host to come up with IP Adress 192.168.1.1 ...") await_online(target_host) print("-> Host online.\nTransfering sysupgrade target firmware") @@ -549,7 +542,7 @@ def perform_bootloader_flash( def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path): - with set_ip(ipaddress.ip_interface("192.168.1.70/24"), args.device) as can_set_ip: + with set_ip(ip_interface("192.168.1.70/24"), args.device) as can_set_ip: if not can_set_ip: print("could not set ip to 192.168.1.70/24") print( @@ -566,9 +559,6 @@ def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path): parser = argparse.ArgumentParser( description="Flash Gluon image to AVM devices using EVA or/and TFTP." ) - parser.add_argument( - "--ip", type=str, help="IP Address of device. Autodiscovery if not specified." - ) parser.add_argument( "--image", type=str, @@ -605,18 +595,31 @@ def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path): print("If this device is a FB 7520/7530 or a FR 1200 write y") flash_tftp = input().lower().startswith("y") - start_message("192.168.178.1") - input() + start_message() - with set_ip(ipaddress.ip_interface("192.168.178.2/24"), args.device) as can_set_ip: + client_intf = ip_interface("192.168.178.2/24") + with set_ip(client_intf, args.device) as can_set_ip: if can_set_ip: - print("did set ip to 192.168.178.2/24") + print(f"did set ip to {client_intf}") else: - print("could not set ip to 192.168.178.2/24") + print(f"could not set ip to {client_intf}") + print( + "If you run this program with according permission, it will configure IPs on your host automatically.\n" + "Otherwise, make sure you have assigned your PC a static IP Address in the Subnet of the device you want to flash.\n" + "The following example would be a completely fine option:\n" + ) + print(f"IP-Address: {client_intf.ip}") + print(f"Subnet: {client_intf.netmask}") + print("Gateway: 192.168.178.1") + print("DNS Servers: Leave blank\n") + print( + "Once you're ready to flash, press enter, disconnect power from your AVM device and reconnect the power-supply." + ) + input() if args.ip: try: - ip = ipaddress.ip_address(args.ip) + ip = ip_address(args.ip) except AddressValueError: print(f"{args.ip} is not a valid IPv4 address!") exit(1) @@ -662,6 +665,13 @@ def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path): time.sleep(60) print("Device will come up for ~5s about now, but we still need to wait 30s") time.sleep(30) - perform_bootloader_flash(sysupgradefile, imagefile, flash_tftp) + if args.sysupgrade: + sysupgradefile = Path(args.sysupgrade) + print(f"flashing sysupgrade {sysupgradefile} through ssh") + perform_bootloader_flash(sysupgradefile, imagefile, flash_tftp) + else: + print( + "no sysupgrade provided, should run sysupgrade to persistently install firmware" + ) print("Finished flash procedure") - finish_message() \ No newline at end of file + finish_message() diff --git a/simple_tftp.py b/simple_tftp.py index b487d0a..9b22ea8 100644 --- a/simple_tftp.py +++ b/simple_tftp.py @@ -1,15 +1,17 @@ -import os.path, random, sys -from socket import socket, AF_INET, AF_INET6, SOCK_DGRAM, timeout -from pathlib import Path +#! /usr/bin/env python3 +import random +import socket +import sys from contextlib import contextmanager -from typing import Tuple, Generator, Literal +from pathlib import Path +from typing import Generator, Literal, Tuple UDP_IP = "0.0.0.0" UDP_PORT = 69 # TFTP Protocol Port (69) SOCK_TIMEOUT = 5 MAX_TIMEOUT_RETRIES = 5 -SESSIONS = dict() +SESSIONS: dict = dict() FILE_DIR = Path(__file__) # header opcode is 2 bytes @@ -80,7 +82,7 @@ def create_error_packet(error_code: TFTP_ERRORS_T) -> bytes: return bytes(err) -def read_file(block: int, file: Path) -> str: +def read_file(block: int, file: Path) -> bytes: offset = (block - 1) * 512 with file.open("rb") as f: f.seek(offset, 0) @@ -114,8 +116,8 @@ def get_random_port() -> int: @contextmanager -def create_udp_socket(ip=UDP_IP, port=UDP_PORT) -> Generator[socket, None, None]: - sock = socket(AF_INET, SOCK_DGRAM) # Internet # UDP +def create_udp_socket(ip=UDP_IP, port=UDP_PORT) -> Generator[socket.socket, None, None]: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.bind((ip, port)) try: yield sock @@ -160,7 +162,7 @@ def serve(port: int, file: Path, mode: TRANSFER_MODES_T) -> Tuple[bool, str]: return True, addr block += 1 if block % 256 == 0: - print(f"{block} / {size}") + print(f"{block} / {size}", end="") packet = create_data_packet(block, file, mode) sock.sendto(packet, addr) @@ -169,7 +171,7 @@ def serve(port: int, file: Path, mode: TRANSFER_MODES_T) -> Tuple[bool, str]: # Threads only handle incoming packets with ACK opcodes, send # 'Illegal TFTP Operation' ERROR packet for any other opcode. sock.sendto(create_error_packet(4), addr) - except timeout: + except socket.timeout: if session["consec_timeouts"] < MAX_TIMEOUT_RETRIES: session["consec_timeouts"] += 1 sock.sendto(session["packet"], session["addr"]) @@ -197,7 +199,7 @@ def serve_file( continue rfile, mode = decode_request_header(data) # Mail is deprecated - if not mode in TRANSFER_MODES: + if mode not in TRANSFER_MODES: server_sock.sendto(create_error_packet(0), addr) print(f"unsupported mode: {mode} was requested") continue @@ -212,7 +214,7 @@ def serve_file( yield serve(port, file, mode) -if __name__ == "__main__": +def main_tftp(): for success, host in serve_file(sys.argv[1]): if success: print(f"Successfully served file {sys.argv[1]} to Host {host}") @@ -220,7 +222,7 @@ def serve_file( input( "Press any key to serve to another Host, CTRL-c or CTRL-d to stop TFTP server." ) - except (KeyboardInterrupt, EOFError) as interrupt: + except (KeyboardInterrupt, EOFError): break else: print(f"Timeout serving file to Host {host}")