Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add derivative to latcontrol #71

Merged
merged 8 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions common/colors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
class COLORS:
def __init__(self):
self.HEADER = '\033[95m'
self.OKBLUE = '\033[94m'
self.CBLUE = '\33[44m'
self.BOLD = '\033[1m'
self.CITALIC = '\33[3m'
self.OKGREEN = '\033[92m'
self.CWHITE = '\33[37m'
self.ENDC = '\033[0m' + self.CWHITE
self.UNDERLINE = '\033[4m'
self.PINK = '\33[38;5;207m'
self.PRETTY_YELLOW = self.BASE(220)

self.RED = '\033[91m'
self.PURPLE_BG = '\33[45m'
self.YELLOW = '\033[93m'
self.BLUE_GREEN = self.BASE(85)

self.FAIL = self.RED
self.INFO = self.PURPLE_BG
self.SUCCESS = self.OKGREEN
self.PROMPT = self.YELLOW
self.DBLUE = '\033[36m'
self.CYAN = self.BASE(39)
self.WARNING = '\033[33m'

def BASE(self, col): # seems to support more colors
return '\33[38;5;{}m'.format(col)

def BASEBG(self, col): # seems to support more colors
return '\33[48;5;{}m'.format(col)


COLORS = COLORS()


def opParams_warning(msg):
print('{}opParams WARNING: {}{}'.format(COLORS.WARNING, msg, COLORS.ENDC))


def opParams_error(msg):
print('{}opParams ERROR: {}{}'.format(COLORS.FAIL, msg, COLORS.ENDC))
172 changes: 172 additions & 0 deletions common/op_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#!/usr/bin/env python3
import os
import json
from common.colors import opParams_error as error
from common.colors import opParams_warning as warning
try:
from common.realtime import sec_since_boot
except ImportError:
import time
sec_since_boot = time.time
warning("Using python time.time() instead of faster sec_since_boot")

travis = False


class ValueTypes:
number = [float, int]
none_or_number = [type(None), float, int]


class Param:
def __init__(self, default=None, allowed_types=[], description=None, live=False, hidden=False):
self.default = default
if not isinstance(allowed_types, list):
allowed_types = [allowed_types]
self.allowed_types = allowed_types
self.description = description
self.hidden = hidden
self.live = live
self._create_attrs()

def is_valid(self, value):
if not self.has_allowed_types:
return True
return type(value) in self.allowed_types

def _create_attrs(self): # Create attributes and check Param is valid
self.has_allowed_types = isinstance(self.allowed_types, list) and len(self.allowed_types) > 0
self.has_description = self.description is not None
self.is_list = list in self.allowed_types
if self.has_allowed_types:
assert type(self.default) in self.allowed_types, 'Default value type must be in specified allowed_types!'
if self.is_list:
self.allowed_types.remove(list)


class opParams:
def __init__(self):
"""
To add your own parameter to opParams in your fork, simply add a new entry in self.fork_params, instancing a new Param class with at minimum a default value.
The allowed_types and description args are not required but highly recommended to help users edit their parameters with opEdit safely.
- The description value will be shown to users when they use opEdit to change the value of the parameter.
- The allowed_types arg is used to restrict what kinds of values can be entered with opEdit so that users can't crash openpilot with unintended behavior.
(setting a param intended to be a number with a boolean, or viceversa for example)
Limiting the range of floats or integers is still recommended when `.get`ting the parameter.
When a None value is allowed, use `type(None)` instead of None, as opEdit checks the type against the values in the arg with `isinstance()`.
- Finally, the live arg tells both opParams and opEdit that it's a live parameter that will change. Therefore, you must place the `op_params.get()` call in the update function so that it can update.

Here's an example of a good fork_param entry:
self.fork_params = {'camera_offset': Param(default=0.06, allowed_types=VT.number)} # VT.number allows both floats and ints
"""

VT = ValueTypes()
self.fork_params = {'derivative_tune': Param(1., VT.number, 'Gain for derivative, set to 0 to disable', live=True),

'op_edit_live_mode': Param(False, bool, 'This parameter controls which mode opEdit starts in. It should be hidden from the user with the hide key', hidden=True)}

self._params_file = "/data/op_params.json"
self._last_read_time = sec_since_boot()
self.read_frequency = 2.5 # max frequency to read with self.get(...) (sec)
self._to_delete = [] # a list of params you want to delete (unused)
self._run_init() # restores, reads, and updates params

def _run_init(self): # does first time initializing of default params
self.params = self._get_all_params(default=True) # in case file is corrupted
if travis:
return

to_write = False
if os.path.isfile(self._params_file):
if self._read():
to_write = self._add_default_params() # if new default data has been added
to_write |= self._delete_old() # or if old params have been deleted
else: # don't overwrite corrupted params, just print
error("Can't read op_params.json file")
else:
to_write = True # user's first time running a fork with op_params, write default params

if to_write:
self._write()
os.chmod(self._params_file, 0o764)

def get(self, key=None, force_live=False): # any params you try to get MUST be in fork_params
param_info = self.param_info(key)
self._update_params(param_info, force_live)

if key is None:
return self._get_all_params()

self._check_key_exists(key, 'get')
value = self.params[key]
if param_info.is_valid(value): # always valid if no allowed types, otherwise checks to make sure
return value # all good, returning user's value

warning('User\'s value type is not valid! Returning default') # somehow... it should always be valid
return param_info.default # return default value because user's value of key is not in allowed_types to avoid crashing openpilot

def put(self, key, value):
self._check_key_exists(key, 'put')
if not self.param_info(key).is_valid(value):
raise Exception('opParams: Tried to put a value of invalid type!')
self.params.update({key: value})
self._write()

def delete(self, key): # todo: might be obsolete. remove?
if key in self.params:
del self.params[key]
self._write()

def param_info(self, key):
if key in self.fork_params:
return self.fork_params[key]
return Param()

def _check_key_exists(self, key, met):
if key not in self.fork_params or key not in self.params:
raise Exception('opParams: Tried to {} an unknown parameter! Key not in fork_params'.format(met))

def _add_default_params(self):
added = False
for key, param in self.fork_params.items():
if key not in self.params:
self.params[key] = param.default
added = True
elif not param.is_valid(self.params[key]):
warning('Value type of user\'s {} param not in allowed types, replacing with default!'.format(key))
self.params[key] = param.default
added = True
return added

def _delete_old(self):
deleted = False
for param in self._to_delete:
if param in self.params:
del self.params[param]
deleted = True
return deleted

def _get_all_params(self, default=False, return_hidden=False):
if default:
return {k: p.default for k, p in self.fork_params.items()}
return {k: self.params[k] for k, p in self.fork_params.items() if k in self.params and (not p.hidden or return_hidden)}

def _update_params(self, param_info, force_live):
if force_live or param_info.live: # if is a live param, we want to get updates while openpilot is running
if not travis and sec_since_boot() - self._last_read_time >= self.read_frequency: # make sure we aren't reading file too often
if self._read():
self._last_read_time = sec_since_boot()

def _read(self):
try:
with open(self._params_file, "r") as f:
self.params = json.loads(f.read())
return True
except Exception as e:
error(e)
return False

def _write(self):
if not travis:
with open(self._params_file, "w") as f:
f.write(json.dumps(self.params, indent=2)) # can further speed it up by remove indentation but makes file hard to read
Loading