Skip to content

Commit

Permalink
fix: add more helper functions and tests for network configuration wh…
Browse files Browse the repository at this point in the history
…en running on the raspberry pi
  • Loading branch information
Stefano Bertelli committed May 15, 2024
1 parent 2938d82 commit a5deac9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 15 deletions.
58 changes: 50 additions & 8 deletions rotary_controller_python/network/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import json
import shutil
import subprocess
import logging
import os

from rotary_controller_python.network import models
from rotary_controller_python.network.models import RfkillStatus

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -84,11 +86,51 @@ def render_interfaces(configuration: models.NetworkInterface, output_file: str =
def reload_interfaces():
log.info("Call ifreload to reconfigure the network settings")

result = subprocess.run(["which", "ifreload"], timeout=1, text=True)
if result.returncode != 0:
message = "ifreload is not available on this system, install ifupdown2 to allow dynamic configuration changes"
log.error(message)
return message

reload = subprocess.run(["ifreload", "-a"])
print(result.stdout)
try:
reload = subprocess.run(["/usr/sbin/ifreload", "-a"], capture_output=True)
return reload.stdout
except Exception as e:
log.error(e.__str__())


def read_wlan_status():
try:
result = subprocess.run(["/usr/sbin/rfkill", "-J", "--output-all"], capture_output=True)
result.check_returncode()
json_data = result.stdout
rfkill_data = json.loads(json_data)
wlan_data = [v for k,v in rfkill_data.items()][0]
wlan_data = [item for item in wlan_data if item['type'] == 'wlan']
if len(wlan_data) == 0:
raise Exception("No wlan detected")
if len(wlan_data) > 1:
raise Exception("More than one wlan detected")

wlan_data = wlan_data[0]
result = RfkillStatus(**wlan_data)
return result

except Exception as e:
log.error(e.__str__())


def enable_wlan(device_id: int):
try:
result = subprocess.run(["/usr/sbin/rfkill", "unblock", f"{device_id}"], capture_output=True)
result.check_returncode()
log.info(f"Device {device_id} successfully unblocked")
return result

except Exception as e:
log.error(e.__str__())


def disable_wlan(device_id: int):
try:
result = subprocess.run(["/usr/sbin/rfkill", "block", f"{device_id}"], capture_output=True)
result.check_returncode()
log.info(f"Device {device_id} successfully blocked")
return result

except Exception as e:
log.error(e.__str__())
10 changes: 9 additions & 1 deletion rotary_controller_python/network/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field


class Wireless(BaseModel):
Expand All @@ -15,3 +15,11 @@ class NetworkInterface(BaseModel):
netmask: Optional[int]
gateway: Optional[str]
wireless: Optional[Wireless]


class RfkillStatus(BaseModel):
id: int
type: str
device: str
soft: str
hard: str
48 changes: 42 additions & 6 deletions rotary_controller_python/network/test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
import pprint
import unittest
from unittest.mock import patch, MagicMock

from jinja2 import Environment, FileSystemLoader, select_autoescape
from rotary_controller_python import network

from rotary_controller_python.network.models import Wireless, NetworkInterface
from rotary_controller_python.network import reload_interfaces
class TestNetwork(unittest.TestCase):
def test_ifreload(self):
result = network.reload_interfaces()

def test_ifreload():
result = reload_interfaces()
@patch('subprocess.run')
def test_read_wlan_status(self, mock_run):
# Example JSON output from rfkill command
example_output = """
{
"": [
{"id":0, "type":"wlan", "device":"phy0", "type-desc":"Wireless LAN", "soft":"unblocked", "hard":"unblocked"},
{"id":1, "type":"bluetooth", "device":"hci0", "type-desc":"Bluetooth", "soft":"unblocked", "hard":"unblocked"}
]
}
"""

# Mock the subprocess.run to return the example output
mock_run.return_value = MagicMock(
stdout=example_output.encode('utf-8'),
returncode=0
)

from rotary_controller_python.network import read_wlan_status
from rotary_controller_python.network.models import RfkillStatus

# Call the function to test
result = read_wlan_status()

# Check that the subprocess.run was called with the correct arguments
mock_run.assert_called_with(["/usr/sbin/rfkill", "-J", "--output-all"], capture_output=True)

# Check that the result is as expected
expected_result = RfkillStatus(
device="phy0",
id=0,
type="wlan",
soft="unblocked",
hard="unblocked"
)
self.assertEqual(result, expected_result)

0 comments on commit a5deac9

Please sign in to comment.