Skip to content

Commit

Permalink
Linting.
Browse files Browse the repository at this point in the history
  • Loading branch information
btimby committed Jan 5, 2022
1 parent 8de70ed commit 9d577f4
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 33 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ deps: .venv
.PHONY: test
test: deps
pipenv run coverage run -m unittest tests
pipenv run coverage report -m

.PHONY: lint
lint: deps
Expand Down
4 changes: 4 additions & 0 deletions pywpas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
"All components of the public interface"

from .control import Control
from .models import Network

__all__ = ['Control', 'Network']
2 changes: 2 additions & 0 deletions pywpas/const.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"Some constants for dealing with wpa_supplicant values"

STATUS_CONNECTED = 'connected'
STATUS_CONNECTING = 'connecting'
STATUS_INACTIVE = 'inactive'
Expand Down
11 changes: 8 additions & 3 deletions pywpas/control.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"Control interface for wpa_supplicant"

import os
import logging

from typing import List

from .models import Network
from .interface import Interface
from .utils import find_sockets

Expand All @@ -14,7 +15,7 @@
DEFAULT_SOCK_PATH = os.environ.get('WPA_SOCK', '/var/run/wpa_supplicant')


class Control(object):
class Control:
"""
Control wpa_supplicant.
"""
Expand All @@ -26,25 +27,29 @@ def __del__(self):
self.close()

def close(self) -> None:
"Close all interfaces"
if self._interfaces is None:
return
for iface in self._interfaces:
iface.close()
self._interfaces = None

def interface(self, name: str) -> Interface:
"Get specific interface"
for iface in self.interfaces:
if iface.name == name:
return iface
raise ValueError('Invalid interface name: %s' % name)
raise ValueError(f'Invalid interface name: {name}')

def interface_names(self):
"List of interface names"
return [
interface.name for interface in self.interfaces
]

@property
def interfaces(self) -> List[Interface]:
"List of interfaces"
if self._interfaces is None:
self._interfaces = []
for name in find_sockets(self._sock_path):
Expand Down
30 changes: 23 additions & 7 deletions pywpas/interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"Communication with wpa_supplicant interface"

import os
import time
import tempfile
Expand All @@ -22,7 +24,7 @@
RECV_BUFFER_SIZE = 4096


class _BackgroundScan(object):
class _BackgroundScan:
"""
High-level scan.
Expand All @@ -34,6 +36,7 @@ def __init__(self, interface: 'Interface'):
self._t = None

def _scan(self, callback, timeout):
"Background scan thread entry point"
networks, started = set(), time.time()
self._interface.scan()
while self._running and time.time() - started < timeout:
Expand All @@ -44,13 +47,15 @@ def _scan(self, callback, timeout):
callback(network)

def start(self, callback: callable, timeout: float=SCAN_TIMEOUT):
"Start background scan"
assert callable(callback), 'Callback must be callable'
self._running = True
self._t = threading.Thread(
target=self._scan, args=(callback, timeout), daemon=True)
self._t.start()

def stop(self):
"Stop background scan"
if self._t is None:
return
self._running = False
Expand Down Expand Up @@ -87,7 +92,7 @@ def stop(self):
# WPS_ER_START WPS_NFC WPS_NFC_CONFIG_TOKEN WPS_NFC_TAG_READ WPS_NFC_TOKEN
# WPS_PBC WPS_PIN WPS_REG
#
class Interface(object):
class Interface:
"""
Handle a unix:// datagram connection for a given interface.
"""
Expand All @@ -108,10 +113,12 @@ def __del__(self):

@property
def name(self):
"This interface's name"
return self._name

@property
def control(self) -> 'Control':
"The parent object which gives access to additional interfaces"
return self._control

def close(self) -> None:
Expand All @@ -125,9 +132,8 @@ def close(self) -> None:
try:
os.remove(self._client_path)
except FileNotFoundError:
LOGGER.warn('Error deleting client socket at: %s',
LOGGER.warning('Error deleting client socket at: %s',
self._client_path)
pass
self._client_path = None

def _ensure_connection(self):
Expand Down Expand Up @@ -183,51 +189,61 @@ def _send_and_recv(self, cmd: Union[str, bytes]) -> List[str]:
return resp.split(b'\n')

def ping(self) -> None:
"Connection test"
resp = self._send_and_recv(b'PING')
assert resp == [b'PONG'], 'Did not receive proper reply'

def status(self) -> InterfaceStatus:
"Get interface status"
status = InterfaceStatus.deserialize(self._send_and_recv('STATUS'))
LOGGER.info('Interface status: %s', status)
return status

def scan(self) -> None:
"Start scanning"
LOGGER.info('Scanning')
self._send(b'SCAN')

def background_scan(self, callback: callable,
timeout: float=SCAN_TIMEOUT) -> None:
"Perform background scan on thread"
scan = _BackgroundScan(self)
scan.start(callback, timeout)
return scan

def results(self):
"Return scan results"
networks = deserialize_networks(self._send_and_recv(b'SCAN_RESULTS'))
for network in networks:
LOGGER.info('Found network: %s', network)
return networks

def add_network(self, network: Network) -> None:
pass
"Add network profile"

def networks(self) -> List[Network]:
pass
"List network profiles"

def del_network(self, network: Network) -> None:
"Delete given network profile"
self._send(b'REMOVE_NETWORK %s' % network.id)

def clear_networks(self) -> None:
"Delete all network profiles"
LOGGER.info('Removing all networks')
self._send(b'REMOVE_NETWORK all')

def connect(self, network: Network):
pass
"connect interface to given network"

def disconnect(self) -> None:
"Disconnect interface"
self._send(b'DISCONNECT')

def config_write(self):
"Save running config to file"
self._send('SAVE_CONFIG')

def stop_ap(self):
"Stop access point"
self._send('STOP_AP')
33 changes: 21 additions & 12 deletions pywpas/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"Representation of wpa_supplicant constructs"
# pylint: disable=too-many-instance-attributes

from dataclasses import dataclass
from typing import List

Expand All @@ -7,10 +10,11 @@

@dataclass
class InterfaceStatus:
"Represents a wifi interface status"
bssibd: str
frequency: int
ssid: str
id: str
id: str # pylint: disable=invalid-name
mode: str
wpa_state: str
pairwise_cipher: str
Expand All @@ -26,33 +30,36 @@ def __str__(self):

@staticmethod
def deserialize(data):
"Deserialize wpa_supplicant form of interface status to object"
kwargs = {}
for l in data:
k, v = l.split(b'=')
kwargs[safe_decode(k)] = safe_decode(v)
for line in data:
key, val = line.split(b'=')
kwargs[safe_decode(key)] = safe_decode(val)
kwargs['wpa_state'] = STATUS_CONSTS[kwargs['wpa_state'].lower()]
kwargs['frequency'] = int(kwargs.pop('freq'))
return InterfaceStatus(**kwargs)

def serialize(self):
pass
"Serialize interface status to wpa_supplicant form"


@dataclass
class Network:
"Represents a wifi network"
bssid: str
frequency: int
signal_level: int
flags: str
ssid: str

def __str__(self):
key_mgmt = '|'.join(self.key_mgmt)
return f'bssid={self.bssid}, freq={self.freq}, signal={self.signal},' \
f' ssid={self.ssid}, key_mgmt={key_mgmt}, auth={auth}'
return f'bssid={self.bssid}, frequency={self.frequency}, ' \
f'signal_level={self.signal_level}, flags={self.flags}, ' \
f'ssid={self.ssid}'

@staticmethod
def deserialize(header, network):
"Deserialize wpa_supplicant form of network into object"
kwargs = {}
fields = safe_decode(header).split(' / ')
fields = map(lambda x: x.strip().replace(' ', '_'), fields)
Expand All @@ -68,20 +75,22 @@ def deserialize(header, network):
return Network(**kwargs)

def serialize(self):
key_mgmt = ','.join(self.key_mgmt)
data = f'\n{self.bssid}\t{self.freq}\t{self.signal}\t' \
f'{key_mgmt}\t{self.ssid}'
return data.encode('utf-8')
"Serialize network into wpa_supplicant form"
return '\t'.join((
self.bssid, self.frequency, self.signal_level, self.flags, self.ssid
)).encode('utf-8')


def deserialize_networks(lines: str) -> List[Network]:
"Convert wpa_supplicant form of network list into objects"
header = lines[0]
return [
Network.deserialize(header, l) for l in lines[1:]
]


def serialize_networks(networks):
"Convert list of networks into wpa_supplicant form"
header = b'bssid / frequency / signal level / flags / ssid\n'
networks = b'\n'.join([network.serialize() for network in networks])
return header + networks
22 changes: 13 additions & 9 deletions pywpas/utils.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
"Utility functions"

import os
import tempfile
import stat

from functools import wraps
from os.path import dirname, join as pathjoin
from os.path import join as pathjoin


def safe_decode(b):
def safe_decode(val):
"Try to decode bytes to str"
try:
return b.decode('utf-8')
return val.decode('utf-8')
except AttributeError:
return b
return val


def tempnam(dir: str, prefix: str='') -> str:
def tempnam(dir: str, prefix: str='') -> str: # pylint: disable=redefined-builtin
"""
Utility function.
Creates a temporary file, but removes and closes the file. In effect
it creates a temporary path (that does not exist). For use as a socket
address.
"""
fd, path = tempfile.mkstemp(dir=dir, prefix=prefix)
fd, path = tempfile.mkstemp(dir=dir, prefix=prefix) # pylint: disable=invalid-name
os.close(fd)
os.remove(path)
return path


def is_sock(path):
"Checks if given path is a socket"
return stat.S_ISSOCK(os.stat(path).st_mode)


def find_sockets(dir):
return [path for path in os.listdir(dir) if is_sock(pathjoin(dir, path))]
def find_sockets(path):
"Finds sockets in given directory"
return [n for n in os.listdir(path) if is_sock(pathjoin(path, n))]
2 changes: 1 addition & 1 deletion pywpas/version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Do not edit. See setup.py.
"Do not edit. See setup.py."
__version__ = "0.9.0"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
readme = os.path.join(os.path.dirname(__file__), 'README.rst')
with open(readme) as readme_file:
long_description = readme_file.read()
version_msg = '# Do not edit. See setup.py.{nl}__version__ = "{ver}"{nl}'
version_msg = '"Do not edit. See setup.py.{nl}__version__ = "{ver}"{nl}"'
version_py = pathjoin(
dirname(__file__), package_name.replace('-', '_'), 'version.py')

Expand Down

0 comments on commit 9d577f4

Please sign in to comment.