-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alias 0.2.0 #97
Alias 0.2.0 #97
Changes from 3 commits
c40e132
50c7b71
0782dc0
9b61a4d
bcfab3a
0b5312a
9555056
6f71339
7e9261b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,27 +5,31 @@ | |
|
||
import os | ||
import re | ||
import hashlib | ||
import sys | ||
import json | ||
import shlex | ||
import hashlib | ||
from collections import defaultdict | ||
from six.moves import configparser | ||
|
||
from knack.log import get_logger | ||
from knack.util import CLIError | ||
|
||
from azext_alias import telemetry | ||
from azext_alias._const import ( | ||
GLOBAL_CONFIG_DIR, | ||
ALIAS_FILE_NAME, | ||
ALIAS_HASH_FILE_NAME, | ||
COLLIDED_ALIAS_FILE_NAME, | ||
PLACEHOLDER_REGEX, | ||
INCONSISTENT_INDEXING_ERROR, | ||
CONFIG_PARSING_ERROR, | ||
INSUFFICIENT_POS_ARG_ERROR, | ||
DEBUG_MSG, | ||
POS_ARG_DEBUG_MSG, | ||
COLLISION_CHECK_LEVEL_DEPTH | ||
COLLISION_CHECK_LEVEL_DEPTH, | ||
POS_ARG_DEBUG_MSG | ||
) | ||
from azext_alias.argument import ( | ||
build_pos_args_table, | ||
render_template | ||
) | ||
|
||
|
||
GLOBAL_ALIAS_PATH = os.path.join(GLOBAL_CONFIG_DIR, ALIAS_FILE_NAME) | ||
GLOBAL_ALIAS_HASH_PATH = os.path.join(GLOBAL_CONFIG_DIR, ALIAS_HASH_FILE_NAME) | ||
|
@@ -34,12 +38,24 @@ | |
logger = get_logger(__name__) | ||
|
||
|
||
def get_config_parser(): | ||
""" | ||
Disable configparser's interpolation function and return an instance of config parser. | ||
|
||
Returns: | ||
An instance of config parser with interpolation disabled. | ||
""" | ||
if sys.version_info.major == 3: | ||
return configparser.ConfigParser(interpolation=None) # pylint: disable=unexpected-keyword-arg | ||
return configparser.ConfigParser() | ||
|
||
|
||
class AliasManager(object): | ||
|
||
def __init__(self, **kwargs): | ||
self.alias_table = configparser.ConfigParser() | ||
self.alias_table = get_config_parser() | ||
self.kwargs = kwargs | ||
self.collided_alias = dict() | ||
self.collided_alias = defaultdict(list) | ||
self.reserved_commands = [] | ||
self.alias_config_str = '' | ||
self.alias_config_hash = '' | ||
|
@@ -81,7 +97,7 @@ def load_collided_alias(self): | |
collided_alias_str = collided_alias_file.read() | ||
try: | ||
self.collided_alias = json.loads(collided_alias_str if collided_alias_str else '{}') | ||
except Exception: # pylint: disable=broad-except | ||
except Exception: # pylint: disable=broad-except | ||
self.collided_alias = {} | ||
|
||
def detect_alias_config_change(self): | ||
|
@@ -136,42 +152,33 @@ def transform(self, args): | |
continue | ||
|
||
full_alias = self.get_full_alias(alias) | ||
num_pos_args = AliasManager.count_positional_args(full_alias) | ||
|
||
if self.alias_table.has_option(full_alias, 'command'): | ||
cmd_derived_from_alias = self.alias_table.get(full_alias, 'command') | ||
if not num_pos_args: | ||
logger.debug(DEBUG_MSG, alias, cmd_derived_from_alias) | ||
telemetry.set_alias_hit(full_alias) | ||
else: | ||
transformed_commands.append(alias) | ||
continue | ||
|
||
if num_pos_args: | ||
# Take arguments indexed from alias_index to alias_index + num_pos_args and inject | ||
# them as positional arguments into the command | ||
pos_args_iter = AliasManager.pos_args_iter(alias, args, alias_index, num_pos_args) | ||
pos_arg_debug_msg = POS_ARG_DEBUG_MSG.format(alias, cmd_derived_from_alias) | ||
for placeholder, pos_arg in pos_args_iter: | ||
if placeholder not in full_alias: | ||
raise CLIError(INCONSISTENT_INDEXING_ERROR.format(placeholder, full_alias)) | ||
|
||
cmd_derived_from_alias = cmd_derived_from_alias.replace(placeholder, pos_arg) | ||
pos_arg_debug_msg += "({}: {}) ".format(placeholder, pos_arg) | ||
# Skip the next arg because it has been already consumed as a positional argument above | ||
next(alias_iter) | ||
logger.debug(pos_arg_debug_msg) | ||
pos_args_table = build_pos_args_table(full_alias, args, alias_index) | ||
if pos_args_table: | ||
logger.debug(POS_ARG_DEBUG_MSG, full_alias, cmd_derived_from_alias, pos_args_table) | ||
transformed_commands += render_template(cmd_derived_from_alias, pos_args_table) | ||
|
||
# Invoke split() because the command derived from the alias might contain spaces | ||
transformed_commands += cmd_derived_from_alias.split() | ||
# Skip the next arg(s) because they have been already consumed as a positional argument above | ||
for pos_arg in pos_args_table: # pylint: disable=unused-variable | ||
next(alias_iter) | ||
else: | ||
logger.debug(DEBUG_MSG, full_alias, cmd_derived_from_alias) | ||
transformed_commands += shlex.split(cmd_derived_from_alias) | ||
|
||
return self.post_transform(transformed_commands) | ||
|
||
def build_collision_table(self, levels=COLLISION_CHECK_LEVEL_DEPTH): | ||
""" | ||
Build the collision table according to the alias configuration file against the entire command table. | ||
|
||
if the word collided with a reserved command. self.collided_alias is structured as: | ||
self.collided_alias is structured as: | ||
{ | ||
'collided_alias': [the command level at which collision happens] | ||
} | ||
|
@@ -193,8 +200,6 @@ def build_collision_table(self, levels=COLLISION_CHECK_LEVEL_DEPTH): | |
for level in range(1, levels + 1): | ||
collision_regex = r'^{}{}($|\s)'.format(r'([a-z\-]*\s)' * (level - 1), word.lower()) | ||
if list(filter(re.compile(collision_regex).match, self.reserved_commands)): | ||
if word not in self.collided_alias: | ||
self.collided_alias[word] = [] | ||
self.collided_alias[word].append(level) | ||
telemetry.set_collided_aliases(list(self.collided_alias.keys())) | ||
|
||
|
@@ -210,14 +215,15 @@ def get_full_alias(self, query): | |
""" | ||
if query in self.alias_table.sections(): | ||
return query | ||
|
||
return next((section for section in self.alias_table.sections() if section.split()[0] == query), '') | ||
|
||
def load_full_command_table(self): | ||
""" | ||
Perform a full load of the command table to get all the reserved command words. | ||
""" | ||
load_cmd_tbl_func = self.kwargs.get('load_cmd_tbl_func', None) | ||
if load_cmd_tbl_func: | ||
if callable(load_cmd_tbl_func): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This worries me a bit. In what situation this variable is not callable? If that's valid scenario it worths consider changing the variable name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think ...
load_cmd_tbl_func = self.kwargs.get('load_cmd_tbl_func', None)
if load_cmd_tbl_func is not None:
self.reserved_commands = list(load_cmd_tbl_func([]).keys())
... |
||
self.reserved_commands = list(load_cmd_tbl_func([]).keys()) | ||
telemetry.set_full_command_table_loaded() | ||
|
||
|
@@ -233,9 +239,6 @@ def post_transform(self, args): | |
|
||
post_transform_commands = [] | ||
for arg in args: | ||
# Trim leading and trailing quotes | ||
if arg and arg[0] == arg[-1] and arg[0] in '\'"': | ||
arg = arg[1:-1] | ||
post_transform_commands.append(os.path.expandvars(arg)) | ||
|
||
self.write_alias_config_hash() | ||
|
@@ -291,36 +294,3 @@ def process_exception_message(exception): | |
for replace_char in ['\t', '\n', '\\n']: | ||
exception_message = exception_message.replace(replace_char, '' if replace_char != '\t' else ' ') | ||
return exception_message.replace('section', 'alias') | ||
|
||
@staticmethod | ||
def pos_args_iter(alias, args, start_index, num_pos_args): | ||
""" | ||
Generate an tuple iterator ([0], [1]) where the [0] is the positional argument | ||
placeholder and [1] is the argument value. e.g. ('{0}', pos_arg_1) -> ('{1}', pos_arg_2) -> ... | ||
|
||
Args: | ||
alias: The current alias we are processing. | ||
args: The list of input commands. | ||
start_index: The index where we start selecting the positional arguments | ||
(one-index instead of zero-index). | ||
num_pos_args: The number of positional arguments that this alias has. | ||
""" | ||
pos_args = args[start_index: start_index + num_pos_args] | ||
if len(pos_args) != num_pos_args: | ||
raise CLIError(INSUFFICIENT_POS_ARG_ERROR.format(alias, num_pos_args, len(pos_args))) | ||
|
||
for i, pos_arg in enumerate(pos_args): | ||
yield ('{{{}}}'.format(i), pos_arg) | ||
|
||
@staticmethod | ||
def count_positional_args(arg): | ||
""" | ||
Count how many positional arguments ({0}, {1} ...) there are. | ||
|
||
Args: | ||
arg: The word which this function performs counting on. | ||
|
||
Returns: | ||
The number of placeholders in arg. | ||
""" | ||
return len(re.findall(PLACEHOLDER_REGEX, arg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 times the regex, 3 times the fun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed all the complex regular expressions above and convert them into actual code/function. Though simple regular expressions are still used in some places to simplify implementation (e.g. find all numbers in a string).