From 9d577f4177d318ba68d7ebf2312f94863ea65c6a Mon Sep 17 00:00:00 2001 From: Ben Timby Date: Wed, 5 Jan 2022 00:51:39 -0500 Subject: [PATCH] Linting. --- Makefile | 1 + pywpas/__init__.py | 4 ++++ pywpas/const.py | 2 ++ pywpas/control.py | 11 ++++++++--- pywpas/interface.py | 30 +++++++++++++++++++++++------- pywpas/models.py | 33 +++++++++++++++++++++------------ pywpas/utils.py | 22 +++++++++++++--------- pywpas/version.py | 2 +- setup.py | 2 +- 9 files changed, 74 insertions(+), 33 deletions(-) diff --git a/Makefile b/Makefile index 48a1820..03cb013 100644 --- a/Makefile +++ b/Makefile @@ -8,6 +8,7 @@ deps: .venv .PHONY: test test: deps pipenv run coverage run -m unittest tests + pipenv run coverage report -m .PHONY: lint lint: deps diff --git a/pywpas/__init__.py b/pywpas/__init__.py index a329ec4..6d62b77 100644 --- a/pywpas/__init__.py +++ b/pywpas/__init__.py @@ -1,2 +1,6 @@ +"All components of the public interface" + from .control import Control from .models import Network + +__all__ = ['Control', 'Network'] diff --git a/pywpas/const.py b/pywpas/const.py index 4ac9a94..b8e9faf 100644 --- a/pywpas/const.py +++ b/pywpas/const.py @@ -1,3 +1,5 @@ +"Some constants for dealing with wpa_supplicant values" + STATUS_CONNECTED = 'connected' STATUS_CONNECTING = 'connecting' STATUS_INACTIVE = 'inactive' diff --git a/pywpas/control.py b/pywpas/control.py index 1c1a9c7..28d9ee6 100644 --- a/pywpas/control.py +++ b/pywpas/control.py @@ -1,9 +1,10 @@ +"Control interface for wpa_supplicant" + import os import logging from typing import List -from .models import Network from .interface import Interface from .utils import find_sockets @@ -14,7 +15,7 @@ DEFAULT_SOCK_PATH = os.environ.get('WPA_SOCK', '/var/run/wpa_supplicant') -class Control(object): +class Control: """ Control wpa_supplicant. """ @@ -26,6 +27,7 @@ def __del__(self): self.close() def close(self) -> None: + "Close all interfaces" if self._interfaces is None: return for iface in self._interfaces: @@ -33,18 +35,21 @@ def close(self) -> None: self._interfaces = None def interface(self, name: str) -> Interface: + "Get specific interface" for iface in self.interfaces: if iface.name == name: return iface - raise ValueError('Invalid interface name: %s' % name) + raise ValueError(f'Invalid interface name: {name}') def interface_names(self): + "List of interface names" return [ interface.name for interface in self.interfaces ] @property def interfaces(self) -> List[Interface]: + "List of interfaces" if self._interfaces is None: self._interfaces = [] for name in find_sockets(self._sock_path): diff --git a/pywpas/interface.py b/pywpas/interface.py index a9f22f9..ad0213b 100644 --- a/pywpas/interface.py +++ b/pywpas/interface.py @@ -1,3 +1,5 @@ +"Communication with wpa_supplicant interface" + import os import time import tempfile @@ -22,7 +24,7 @@ RECV_BUFFER_SIZE = 4096 -class _BackgroundScan(object): +class _BackgroundScan: """ High-level scan. @@ -34,6 +36,7 @@ def __init__(self, interface: 'Interface'): self._t = None def _scan(self, callback, timeout): + "Background scan thread entry point" networks, started = set(), time.time() self._interface.scan() while self._running and time.time() - started < timeout: @@ -44,6 +47,7 @@ def _scan(self, callback, timeout): callback(network) def start(self, callback: callable, timeout: float=SCAN_TIMEOUT): + "Start background scan" assert callable(callback), 'Callback must be callable' self._running = True self._t = threading.Thread( @@ -51,6 +55,7 @@ def start(self, callback: callable, timeout: float=SCAN_TIMEOUT): self._t.start() def stop(self): + "Stop background scan" if self._t is None: return self._running = False @@ -87,7 +92,7 @@ def stop(self): # WPS_ER_START WPS_NFC WPS_NFC_CONFIG_TOKEN WPS_NFC_TAG_READ WPS_NFC_TOKEN # WPS_PBC WPS_PIN WPS_REG # -class Interface(object): +class Interface: """ Handle a unix:// datagram connection for a given interface. """ @@ -108,10 +113,12 @@ def __del__(self): @property def name(self): + "This interface's name" return self._name @property def control(self) -> 'Control': + "The parent object which gives access to additional interfaces" return self._control def close(self) -> None: @@ -125,9 +132,8 @@ def close(self) -> None: try: os.remove(self._client_path) except FileNotFoundError: - LOGGER.warn('Error deleting client socket at: %s', + LOGGER.warning('Error deleting client socket at: %s', self._client_path) - pass self._client_path = None def _ensure_connection(self): @@ -183,51 +189,61 @@ def _send_and_recv(self, cmd: Union[str, bytes]) -> List[str]: return resp.split(b'\n') def ping(self) -> None: + "Connection test" resp = self._send_and_recv(b'PING') assert resp == [b'PONG'], 'Did not receive proper reply' def status(self) -> InterfaceStatus: + "Get interface status" status = InterfaceStatus.deserialize(self._send_and_recv('STATUS')) LOGGER.info('Interface status: %s', status) return status def scan(self) -> None: + "Start scanning" LOGGER.info('Scanning') self._send(b'SCAN') def background_scan(self, callback: callable, timeout: float=SCAN_TIMEOUT) -> None: + "Perform background scan on thread" scan = _BackgroundScan(self) scan.start(callback, timeout) return scan def results(self): + "Return scan results" networks = deserialize_networks(self._send_and_recv(b'SCAN_RESULTS')) for network in networks: LOGGER.info('Found network: %s', network) return networks def add_network(self, network: Network) -> None: - pass + "Add network profile" def networks(self) -> List[Network]: - pass + "List network profiles" def del_network(self, network: Network) -> None: + "Delete given network profile" self._send(b'REMOVE_NETWORK %s' % network.id) def clear_networks(self) -> None: + "Delete all network profiles" LOGGER.info('Removing all networks') self._send(b'REMOVE_NETWORK all') def connect(self, network: Network): - pass + "connect interface to given network" def disconnect(self) -> None: + "Disconnect interface" self._send(b'DISCONNECT') def config_write(self): + "Save running config to file" self._send('SAVE_CONFIG') def stop_ap(self): + "Stop access point" self._send('STOP_AP') diff --git a/pywpas/models.py b/pywpas/models.py index 90201ed..10c583c 100644 --- a/pywpas/models.py +++ b/pywpas/models.py @@ -1,3 +1,6 @@ +"Representation of wpa_supplicant constructs" +# pylint: disable=too-many-instance-attributes + from dataclasses import dataclass from typing import List @@ -7,10 +10,11 @@ @dataclass class InterfaceStatus: + "Represents a wifi interface status" bssibd: str frequency: int ssid: str - id: str + id: str # pylint: disable=invalid-name mode: str wpa_state: str pairwise_cipher: str @@ -26,20 +30,22 @@ def __str__(self): @staticmethod def deserialize(data): + "Deserialize wpa_supplicant form of interface status to object" kwargs = {} - for l in data: - k, v = l.split(b'=') - kwargs[safe_decode(k)] = safe_decode(v) + for line in data: + key, val = line.split(b'=') + kwargs[safe_decode(key)] = safe_decode(val) kwargs['wpa_state'] = STATUS_CONSTS[kwargs['wpa_state'].lower()] kwargs['frequency'] = int(kwargs.pop('freq')) return InterfaceStatus(**kwargs) def serialize(self): - pass + "Serialize interface status to wpa_supplicant form" @dataclass class Network: + "Represents a wifi network" bssid: str frequency: int signal_level: int @@ -47,12 +53,13 @@ class Network: ssid: str def __str__(self): - key_mgmt = '|'.join(self.key_mgmt) - return f'bssid={self.bssid}, freq={self.freq}, signal={self.signal},' \ - f' ssid={self.ssid}, key_mgmt={key_mgmt}, auth={auth}' + return f'bssid={self.bssid}, frequency={self.frequency}, ' \ + f'signal_level={self.signal_level}, flags={self.flags}, ' \ + f'ssid={self.ssid}' @staticmethod def deserialize(header, network): + "Deserialize wpa_supplicant form of network into object" kwargs = {} fields = safe_decode(header).split(' / ') fields = map(lambda x: x.strip().replace(' ', '_'), fields) @@ -68,13 +75,14 @@ def deserialize(header, network): return Network(**kwargs) def serialize(self): - key_mgmt = ','.join(self.key_mgmt) - data = f'\n{self.bssid}\t{self.freq}\t{self.signal}\t' \ - f'{key_mgmt}\t{self.ssid}' - return data.encode('utf-8') + "Serialize network into wpa_supplicant form" + return '\t'.join(( + self.bssid, self.frequency, self.signal_level, self.flags, self.ssid + )).encode('utf-8') def deserialize_networks(lines: str) -> List[Network]: + "Convert wpa_supplicant form of network list into objects" header = lines[0] return [ Network.deserialize(header, l) for l in lines[1:] @@ -82,6 +90,7 @@ def deserialize_networks(lines: str) -> List[Network]: def serialize_networks(networks): + "Convert list of networks into wpa_supplicant form" header = b'bssid / frequency / signal level / flags / ssid\n' networks = b'\n'.join([network.serialize() for network in networks]) return header + networks diff --git a/pywpas/utils.py b/pywpas/utils.py index 5a3d933..b0e9c48 100644 --- a/pywpas/utils.py +++ b/pywpas/utils.py @@ -1,19 +1,21 @@ +"Utility functions" + import os import tempfile import stat -from functools import wraps -from os.path import dirname, join as pathjoin +from os.path import join as pathjoin -def safe_decode(b): +def safe_decode(val): + "Try to decode bytes to str" try: - return b.decode('utf-8') + return val.decode('utf-8') except AttributeError: - return b + return val -def tempnam(dir: str, prefix: str='') -> str: +def tempnam(dir: str, prefix: str='') -> str: # pylint: disable=redefined-builtin """ Utility function. @@ -21,15 +23,17 @@ def tempnam(dir: str, prefix: str='') -> str: it creates a temporary path (that does not exist). For use as a socket address. """ - fd, path = tempfile.mkstemp(dir=dir, prefix=prefix) + fd, path = tempfile.mkstemp(dir=dir, prefix=prefix) # pylint: disable=invalid-name os.close(fd) os.remove(path) return path def is_sock(path): + "Checks if given path is a socket" return stat.S_ISSOCK(os.stat(path).st_mode) -def find_sockets(dir): - return [path for path in os.listdir(dir) if is_sock(pathjoin(dir, path))] +def find_sockets(path): + "Finds sockets in given directory" + return [n for n in os.listdir(path) if is_sock(pathjoin(path, n))] diff --git a/pywpas/version.py b/pywpas/version.py index b66a274..ed54900 100644 --- a/pywpas/version.py +++ b/pywpas/version.py @@ -1,2 +1,2 @@ -# Do not edit. See setup.py. +"Do not edit. See setup.py." __version__ = "0.9.0" diff --git a/setup.py b/setup.py index 12651ad..8439de0 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ readme = os.path.join(os.path.dirname(__file__), 'README.rst') with open(readme) as readme_file: long_description = readme_file.read() -version_msg = '# Do not edit. See setup.py.{nl}__version__ = "{ver}"{nl}' +version_msg = '"Do not edit. See setup.py.{nl}__version__ = "{ver}"{nl}"' version_py = pathjoin( dirname(__file__), package_name.replace('-', '_'), 'version.py')