Skip to content

Commit

Permalink
improve flow
Browse files Browse the repository at this point in the history
    * add support for FB 3390
    * improve transfer visualization through tftp
    * isort, black, typing, remove unused imports
  • Loading branch information
maurerle committed Jul 30, 2023
1 parent 7ed24a6 commit 56a9b4c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 71 deletions.
126 changes: 68 additions & 58 deletions fritzflash.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
#! /usr/bin/env python3
import argparse
import ipaddress
from ipaddress import (
IPv4Interface,
IPv4Address,
IPv6Interface,
IPv6Address,
AddressValueError,
)
import platform
import socket
import time
from contextlib import contextmanager
from ftplib import FTP
from ipaddress import (
AddressValueError,
IPv4Address,
IPv4Interface,
IPv6Address,
IPv6Interface,
ip_address,
ip_interface,
)
from pathlib import Path
from subprocess import run
from contextlib import contextmanager

from typing import List, ContextManager, Union
from typing import Generator, List, Union

from simple_tftp import serve_file

Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(
break
except socket.timeout:
i += 1
except OSError as e:
except OSError:
time.sleep(1)
i += 1
if i > max_retry:
Expand Down Expand Up @@ -98,7 +98,9 @@ def reboot(self):


@contextmanager
def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None]:
def set_ip(
ipinterface: IPInterface, network_device: str
) -> Generator[bool, None, None]:
if IS_POSIX:
output = run(
["ip", "addr", "add", ipinterface.with_prefixlen, "dev", network_device],
Expand All @@ -115,8 +117,7 @@ def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None
ipinterface.with_prefixlen,
"dev",
network_device,
],
capture_output=True,
]
)
else:
output = run(
Expand All @@ -126,7 +127,7 @@ def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None
"ipv4",
"add",
"address",
f'"{network_device}"',
f"{network_device}",
f"{ipinterface.ip}",
f"{ipinterface.netmask}",
],
Expand All @@ -143,10 +144,9 @@ def set_ip(ipinterface: IPInterface, network_device: str) -> ContextManager[None
"ipv4",
"delete",
"address",
f'"{network_device}"',
f"{network_device}",
f"{ipinterface.ip}",
],
capture_output=True,
]
)


Expand Down Expand Up @@ -180,7 +180,7 @@ def ssh(host: IPAddress, cmd: List[str], user: str = "root"):
run(["ssh", *args, f"{user}@{host}", *cmd]).check_returncode()


def scp(host: IPAddress, file: Path, user: str = "root", target: Path = "/tmp/"):
def scp(host: IPAddress, file: Path, user: str = "root", target: Path = Path("/tmp/")):
null_file = "/dev/null" if IS_POSIX else "NUL"
args = ["-o", "StrictHostKeyChecking=no", "-o", f"UserKnownHostsFile={null_file}"]
if scp_legacy_check():
Expand All @@ -197,23 +197,13 @@ def connection_refused_message():
)


def start_message(ip_address):
def start_message():
print(
"This program will help you installing OpenWRT or Gluon, a widely used Firmware for Freifunk networks, onto your AVM device.\n"
"You can always find the most current version of this script at https://www.github.com/freifunk-darmstadt/fritz-tools\n\n"
"It is strongly recommended to only connect your computer to the device you want to flash.\n"
"Try to disable all other connections (Ethernet, WiFi/WLAN, VMs) if detection fails.\n"
"Sometimes an unmanaged switch between your AVM device and your computer is helpful.\n\n"
"If you run this program with according permission, it will configure IPs on your host automatically.\n"
"Otherwise, make sure you have assigned your PC a static IP Address in the Subnet of the device you want to flash.\n"
"The following example would be a completely fine option:\n"
)
print("IP-Address: %s" % str(ipaddress.ip_address(ip_address) + 1))
print("Subnet: 255.255.255.0")
print("Gateway: %s" % str(ipaddress.ip_address(ip_address)))
print("DNS Servers: Leave blank\n")
print(
"Once you're ready to flash, press enter, disconnect power from your AVM device and reconnect the power-supply."
"Sometimes an unmanaged switch between your AVM device and your computer is helpful."
)


Expand Down Expand Up @@ -278,7 +268,7 @@ def autodiscover_avm_ip():
bytearray.fromhex("0000120101000000c0a8b20100000000"),
("255.255.255.255", 5035),
)
while 1:
while True:
data, addr = receiver.recvfrom(64)
if addr[0] == "192.168.178.1":
print("\rFritzBox found at %s" % addr[0])
Expand Down Expand Up @@ -325,6 +315,9 @@ def determine_image_name(env_string):
"gluon": ["avm-fritz-box-7312-sysupgrade.bin"],
"openwrt": ["avm_fritz7312-squashfs-sysupgrade.bin"],
},
"193": {
"openwrt": ["avm_fritz3390-initramfs-kernel.bin"],
},
"196": {
"gluon": ["avm-fritz-box-7360-v2-sysupgrade.bin"],
"openwrt": ["avm_fritz7360sl-squashfs-sysupgrade.bin"],
Expand All @@ -333,6 +326,15 @@ def determine_image_name(env_string):
"gluon": ["avm-fritz-wlan-repeater-450e-sysupgrade.bin"],
"openwrt": ["fritz450e-squashfs-sysupgrade.bin"],
},
"203": {
"openwrt": ["openwrt-lantiq-xrx200-avm_fritz7362sl-initramfs-kernel.bin"]
},
"209": {
"openwrt": ["openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin"]
},
"218": {
"openwrt": ["openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin"]
},
"219": {
"gluon": ["avm-fritz-box-4020-sysupgrade.bin"],
"openwrt": [
Expand All @@ -344,17 +346,6 @@ def determine_image_name(env_string):
"gluon": ["avm-fritz-box-4040-bootloader.bin"],
"openwrt": ["avm_fritzbox-4040-squashfs-eva.bin"],
},
"203": {
"openwrt": [
"openwrt-lantiq-xrx200-avm_fritz7362sl-initramfs-kernel.bin"
]
},
"209": {
"openwrt": ["openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin"]
},
"218": {
"openwrt": ["openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin"]
},
"236": {"openwrt": ["uboot-fritz7530.bin"]},
"244": {"openwrt": ["uboot-fritz1200.bin"]},
"247": {
Expand Down Expand Up @@ -465,6 +456,7 @@ def perform_flash(ip, file):
"openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin",
"openwrt-lantiq-xrx200-avm_fritz7362sl-initramfs-kernel.bin",
"openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin",
"avm_fritz3390-initramfs-kernel.bin",
]:
size = file.stat().st_size
assert size < 0x2000000
Expand All @@ -485,6 +477,7 @@ def perform_flash(ip, file):
if file.name in [
"openwrt-lantiq-xrx200-avm_fritz7412-initramfs-kernel.bin",
"openwrt-lantiq-xrx200-avm_fritz7430-initramfs-kernel.bin",
"avm_fritz3390-initramfs-kernel.bin",
]:
ftp.voidcmd("SETENV linux_fs_start 0")
ftp.voidcmd("SETENV memsize 0x%08x" % (addr))
Expand All @@ -504,12 +497,12 @@ def perform_bootloader_flash(
imagefile: Path = None,
flash_tftp: bool = None,
):
with set_ip(ipaddress.ip_interface("192.168.1.70/24"), args.device) as can_set_ip:
with set_ip(ip_interface("192.168.1.70/24"), args.device) as can_set_ip:
if not can_set_ip:
print("could not set ip to 192.168.1.70/24")
print("make sure to run with admin privileges")
exit(1)
target_host = ipaddress.ip_address("192.168.1.1")
target_host = ip_address("192.168.1.1")
print("Waiting for Host to come up with IP Adress 192.168.1.1 ...")
await_online(target_host)
print("-> Host online.\nTransfering sysupgrade target firmware")
Expand Down Expand Up @@ -549,7 +542,7 @@ def perform_bootloader_flash(


def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path):
with set_ip(ipaddress.ip_interface("192.168.1.70/24"), args.device) as can_set_ip:
with set_ip(ip_interface("192.168.1.70/24"), args.device) as can_set_ip:
if not can_set_ip:
print("could not set ip to 192.168.1.70/24")
print(
Expand All @@ -566,9 +559,6 @@ def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path):
parser = argparse.ArgumentParser(
description="Flash Gluon image to AVM devices using EVA or/and TFTP."
)
parser.add_argument(
"--ip", type=str, help="IP Address of device. Autodiscovery if not specified."
)
parser.add_argument(
"--image",
type=str,
Expand Down Expand Up @@ -605,18 +595,31 @@ def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path):
print("If this device is a FB 7520/7530 or a FR 1200 write y")
flash_tftp = input().lower().startswith("y")

start_message("192.168.178.1")
input()
start_message()

with set_ip(ipaddress.ip_interface("192.168.178.2/24"), args.device) as can_set_ip:
client_intf = ip_interface("192.168.178.2/24")
with set_ip(client_intf, args.device) as can_set_ip:
if can_set_ip:
print("did set ip to 192.168.178.2/24")
print(f"did set ip to {client_intf}")
else:
print("could not set ip to 192.168.178.2/24")
print(f"could not set ip to {client_intf}")
print(
"If you run this program with according permission, it will configure IPs on your host automatically.\n"
"Otherwise, make sure you have assigned your PC a static IP Address in the Subnet of the device you want to flash.\n"
"The following example would be a completely fine option:\n"
)
print(f"IP-Address: {client_intf.ip}")
print(f"Subnet: {client_intf.netmask}")
print("Gateway: 192.168.178.1")
print("DNS Servers: Leave blank\n")
print(
"Once you're ready to flash, press enter, disconnect power from your AVM device and reconnect the power-supply."
)
input()

if args.ip:
try:
ip = ipaddress.ip_address(args.ip)
ip = ip_address(args.ip)
except AddressValueError:
print(f"{args.ip} is not a valid IPv4 address!")
exit(1)
Expand Down Expand Up @@ -662,6 +665,13 @@ def perform_tftp_flash(initramfsfile: Path, sysupgradefile: Path):
time.sleep(60)
print("Device will come up for ~5s about now, but we still need to wait 30s")
time.sleep(30)
perform_bootloader_flash(sysupgradefile, imagefile, flash_tftp)
if args.sysupgrade:
sysupgradefile = Path(args.sysupgrade)
print(f"flashing sysupgrade {sysupgradefile} through ssh")
perform_bootloader_flash(sysupgradefile, imagefile, flash_tftp)
else:
print(
"no sysupgrade provided, should run sysupgrade to persistently install firmware"
)
print("Finished flash procedure")
finish_message()
finish_message()
28 changes: 15 additions & 13 deletions simple_tftp.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os.path, random, sys
from socket import socket, AF_INET, AF_INET6, SOCK_DGRAM, timeout
from pathlib import Path
#! /usr/bin/env python3
import random
import socket
import sys
from contextlib import contextmanager
from typing import Tuple, Generator, Literal
from pathlib import Path
from typing import Generator, Literal, Tuple

UDP_IP = "0.0.0.0"
UDP_PORT = 69 # TFTP Protocol Port (69)

SOCK_TIMEOUT = 5
MAX_TIMEOUT_RETRIES = 5
SESSIONS = dict()
SESSIONS: dict = dict()
FILE_DIR = Path(__file__)

# header opcode is 2 bytes
Expand Down Expand Up @@ -80,7 +82,7 @@ def create_error_packet(error_code: TFTP_ERRORS_T) -> bytes:
return bytes(err)


def read_file(block: int, file: Path) -> str:
def read_file(block: int, file: Path) -> bytes:
offset = (block - 1) * 512
with file.open("rb") as f:
f.seek(offset, 0)
Expand Down Expand Up @@ -114,8 +116,8 @@ def get_random_port() -> int:


@contextmanager
def create_udp_socket(ip=UDP_IP, port=UDP_PORT) -> Generator[socket, None, None]:
sock = socket(AF_INET, SOCK_DGRAM) # Internet # UDP
def create_udp_socket(ip=UDP_IP, port=UDP_PORT) -> Generator[socket.socket, None, None]:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP
sock.bind((ip, port))
try:
yield sock
Expand Down Expand Up @@ -160,7 +162,7 @@ def serve(port: int, file: Path, mode: TRANSFER_MODES_T) -> Tuple[bool, str]:
return True, addr
block += 1
if block % 256 == 0:
print(f"{block} / {size}")
print(f"{block} / {size}", end="")

packet = create_data_packet(block, file, mode)
sock.sendto(packet, addr)
Expand All @@ -169,7 +171,7 @@ def serve(port: int, file: Path, mode: TRANSFER_MODES_T) -> Tuple[bool, str]:
# Threads only handle incoming packets with ACK opcodes, send
# 'Illegal TFTP Operation' ERROR packet for any other opcode.
sock.sendto(create_error_packet(4), addr)
except timeout:
except socket.timeout:
if session["consec_timeouts"] < MAX_TIMEOUT_RETRIES:
session["consec_timeouts"] += 1
sock.sendto(session["packet"], session["addr"])
Expand Down Expand Up @@ -197,7 +199,7 @@ def serve_file(
continue
rfile, mode = decode_request_header(data)
# Mail is deprecated
if not mode in TRANSFER_MODES:
if mode not in TRANSFER_MODES:
server_sock.sendto(create_error_packet(0), addr)
print(f"unsupported mode: {mode} was requested")
continue
Expand All @@ -212,15 +214,15 @@ def serve_file(
yield serve(port, file, mode)


if __name__ == "__main__":
def main_tftp():
for success, host in serve_file(sys.argv[1]):
if success:
print(f"Successfully served file {sys.argv[1]} to Host {host}")
try:
input(
"Press any key to serve to another Host, CTRL-c or CTRL-d to stop TFTP server."
)
except (KeyboardInterrupt, EOFError) as interrupt:
except (KeyboardInterrupt, EOFError):
break
else:
print(f"Timeout serving file to Host {host}")

0 comments on commit 56a9b4c

Please sign in to comment.