diff --git a/rotary_controller_python/network/__init__.py b/rotary_controller_python/network/__init__.py index 5e8f6bc..bb8f2e0 100644 --- a/rotary_controller_python/network/__init__.py +++ b/rotary_controller_python/network/__init__.py @@ -1,10 +1,12 @@ import datetime +import json import shutil import subprocess import logging import os from rotary_controller_python.network import models +from rotary_controller_python.network.models import RfkillStatus log = logging.getLogger(__name__) @@ -84,11 +86,51 @@ def render_interfaces(configuration: models.NetworkInterface, output_file: str = def reload_interfaces(): log.info("Call ifreload to reconfigure the network settings") - result = subprocess.run(["which", "ifreload"], timeout=1, text=True) - if result.returncode != 0: - message = "ifreload is not available on this system, install ifupdown2 to allow dynamic configuration changes" - log.error(message) - return message - - reload = subprocess.run(["ifreload", "-a"]) - print(result.stdout) + try: + reload = subprocess.run(["/usr/sbin/ifreload", "-a"], capture_output=True) + return reload.stdout + except Exception as e: + log.error(e.__str__()) + + +def read_wlan_status(): + try: + result = subprocess.run(["/usr/sbin/rfkill", "-J", "--output-all"], capture_output=True) + result.check_returncode() + json_data = result.stdout + rfkill_data = json.loads(json_data) + wlan_data = [v for k,v in rfkill_data.items()][0] + wlan_data = [item for item in wlan_data if item['type'] == 'wlan'] + if len(wlan_data) == 0: + raise Exception("No wlan detected") + if len(wlan_data) > 1: + raise Exception("More than one wlan detected") + + wlan_data = wlan_data[0] + result = RfkillStatus(**wlan_data) + return result + + except Exception as e: + log.error(e.__str__()) + + +def enable_wlan(device_id: int): + try: + result = subprocess.run(["/usr/sbin/rfkill", "unblock", f"{device_id}"], capture_output=True) + result.check_returncode() + log.info(f"Device {device_id} successfully unblocked") + return result + + except Exception as e: + log.error(e.__str__()) + + +def disable_wlan(device_id: int): + try: + result = subprocess.run(["/usr/sbin/rfkill", "block", f"{device_id}"], capture_output=True) + result.check_returncode() + log.info(f"Device {device_id} successfully blocked") + return result + + except Exception as e: + log.error(e.__str__()) diff --git a/rotary_controller_python/network/models.py b/rotary_controller_python/network/models.py index b421a24..3d30166 100644 --- a/rotary_controller_python/network/models.py +++ b/rotary_controller_python/network/models.py @@ -1,6 +1,6 @@ from typing import Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field class Wireless(BaseModel): @@ -15,3 +15,11 @@ class NetworkInterface(BaseModel): netmask: Optional[int] gateway: Optional[str] wireless: Optional[Wireless] + + +class RfkillStatus(BaseModel): + id: int + type: str + device: str + soft: str + hard: str diff --git a/rotary_controller_python/network/test.py b/rotary_controller_python/network/test.py index 8399733..937e5f1 100644 --- a/rotary_controller_python/network/test.py +++ b/rotary_controller_python/network/test.py @@ -1,9 +1,45 @@ -import pprint +import unittest +from unittest.mock import patch, MagicMock -from jinja2 import Environment, FileSystemLoader, select_autoescape +from rotary_controller_python import network -from rotary_controller_python.network.models import Wireless, NetworkInterface -from rotary_controller_python.network import reload_interfaces +class TestNetwork(unittest.TestCase): + def test_ifreload(self): + result = network.reload_interfaces() -def test_ifreload(): - result = reload_interfaces() + @patch('subprocess.run') + def test_read_wlan_status(self, mock_run): + # Example JSON output from rfkill command + example_output = """ +{ + "": [ + {"id":0, "type":"wlan", "device":"phy0", "type-desc":"Wireless LAN", "soft":"unblocked", "hard":"unblocked"}, + {"id":1, "type":"bluetooth", "device":"hci0", "type-desc":"Bluetooth", "soft":"unblocked", "hard":"unblocked"} + ] +} +""" + + # Mock the subprocess.run to return the example output + mock_run.return_value = MagicMock( + stdout=example_output.encode('utf-8'), + returncode=0 + ) + + from rotary_controller_python.network import read_wlan_status + from rotary_controller_python.network.models import RfkillStatus + + # Call the function to test + result = read_wlan_status() + + # Check that the subprocess.run was called with the correct arguments + mock_run.assert_called_with(["/usr/sbin/rfkill", "-J", "--output-all"], capture_output=True) + + # Check that the result is as expected + expected_result = RfkillStatus( + device="phy0", + id=0, + type="wlan", + soft="unblocked", + hard="unblocked" + ) + self.assertEqual(result, expected_result)