Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
new: implemented led plugin (closes #522)
- Loading branch information
1 parent
09f80cc
commit 89450ec
Showing
2 changed files
with
181 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
from threading import Event | ||
import _thread | ||
import logging | ||
import time | ||
|
||
import pwnagotchi.plugins as plugins | ||
|
||
|
||
class Led(plugins.Plugin): | ||
__author__ = 'evilsocket@gmail.com' | ||
__version__ = '1.0.0' | ||
__license__ = 'GPL3' | ||
__description__ = 'This plugin blinks the PWR led with different patterns depending on the event.' | ||
|
||
def __init__(self): | ||
self._is_busy = False | ||
self._event = Event() | ||
self._event_name = None | ||
self._led_file = "/sys/class/leds/led0/brightness" | ||
self._delay = 200 | ||
|
||
# called when the plugin is loaded | ||
def on_loaded(self): | ||
self._led_file = "/sys/class/leds/led%d/brightness" % self.options['led'] | ||
self._delay = int(self.options['delay']) | ||
|
||
logging.info("[led] plugin loaded for %s" % self._led_file) | ||
self._on_event('loaded') | ||
_thread.start_new_thread(self._worker, ()) | ||
|
||
def _on_event(self, event): | ||
if not self._is_busy: | ||
self._event_name = event | ||
self._event.set() | ||
logging.debug("[led] event '%s' set", event) | ||
else: | ||
logging.debug("[led] skipping event '%s' because the worker is busy", event) | ||
|
||
def _led(self, on): | ||
with open(self._led_file, 'wt') as fp: | ||
fp.write(str(on)) | ||
|
||
def _blink(self, pattern): | ||
logging.debug("[led] using pattern '%s' ..." % pattern) | ||
for c in pattern: | ||
if c == ' ': | ||
self._led(0) | ||
else: | ||
self._led(1) | ||
time.sleep(self._delay / 1000.0) | ||
# reset | ||
self._led(1) | ||
|
||
def _worker(self): | ||
while True: | ||
self._event.wait() | ||
self._event.clear() | ||
self._is_busy = True | ||
|
||
try: | ||
if self._event_name in self.options['patterns']: | ||
pattern = self.options['patterns'][self._event_name] | ||
self._blink(pattern) | ||
else: | ||
logging.debug("[led] no pattern defined for %s" % self._event_name) | ||
except Exception as e: | ||
logging.exception("[led] error while blinking") | ||
|
||
finally: | ||
self._is_busy = False | ||
|
||
# called in manual mode when there's internet connectivity | ||
def on_internet_available(self, agent): | ||
self._on_event('internet_available') | ||
|
||
# called when everything is ready and the main loop is about to start | ||
def on_ready(self, agent): | ||
self._on_event('ready') | ||
|
||
# called when the AI finished loading | ||
def on_ai_ready(self, agent): | ||
self._on_event('ai_ready') | ||
|
||
# called when the AI starts training for a given number of epochs | ||
def on_ai_training_start(self, agent, epochs): | ||
self._on_event('ai_training_start') | ||
|
||
# called when the AI got the best reward so far | ||
def on_ai_best_reward(self, agent, reward): | ||
self._on_event('ai_best_reward') | ||
|
||
# called when the AI got the worst reward so far | ||
def on_ai_worst_reward(self, agent, reward): | ||
self._on_event('ai_worst_reward') | ||
|
||
# called when the status is set to bored | ||
def on_bored(self, agent): | ||
self._on_event('bored') | ||
|
||
# called when the status is set to sad | ||
def on_sad(self, agent): | ||
self._on_event('sad') | ||
|
||
# called when the status is set to excited | ||
def on_excited(self, agent): | ||
self._on_event('excited') | ||
|
||
# called when the status is set to lonely | ||
def on_lonely(self, agent): | ||
self._on_event('lonely') | ||
|
||
# called when the agent is rebooting the board | ||
def on_rebooting(self, agent): | ||
self._on_event('rebooting') | ||
|
||
# called when the agent is waiting for t seconds | ||
def on_wait(self, agent, t): | ||
self._on_event('wait') | ||
|
||
# called when the agent is sleeping for t seconds | ||
def on_sleep(self, agent, t): | ||
self._on_event('sleep') | ||
|
||
# called when the agent refreshed its access points list | ||
def on_wifi_update(self, agent, access_points): | ||
self._on_event('wifi_update') | ||
|
||
# called when the agent is sending an association frame | ||
def on_association(self, agent, access_point): | ||
self._on_event('association') | ||
|
||
# called when the agent is deauthenticating a client station from an AP | ||
def on_deauthentication(self, agent, access_point, client_station): | ||
self._on_event('deauthentication') | ||
|
||
# called when a new handshake is captured, access_point and client_station are json objects | ||
# if the agent could match the BSSIDs to the current list, otherwise they are just the strings of the BSSIDs | ||
def on_handshake(self, agent, filename, access_point, client_station): | ||
self._on_event('handshake') | ||
|
||
# called when an epoch is over (where an epoch is a single loop of the main algorithm) | ||
def on_epoch(self, agent, epoch, epoch_data): | ||
self._on_event('epoch') | ||
|
||
# called when a new peer is detected | ||
def on_peer_detected(self, agent, peer): | ||
self._on_event('peer_detected') | ||
|
||
# called when a known peer is lost | ||
def on_peer_lost(self, agent, peer): | ||
self._on_event('peer_lost') |