Skip to content

Commit

Permalink
feat: show argument infos for dynamic variables imports
Browse files Browse the repository at this point in the history
  • Loading branch information
d-biehl committed May 17, 2023
1 parent 3720109 commit 94b21fb
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,9 @@ def get_variables_doc(
command_line_variables: Optional[Dict[str, Optional[Any]]] = None,
variables: Optional[Dict[str, Optional[Any]]] = None,
) -> VariablesDoc:
from robot.libdocpkg.robotbuilder import KeywordDocBuilder
from robot.output import LOGGER
from robot.running.handlers import _PythonHandler
from robot.utils.importer import Importer
from robot.variables.filesetter import PythonImporter, YamlImporter

Expand All @@ -1523,6 +1525,7 @@ def get_variables_doc(
try:
with _std_capture() as std_capturer:
import_name = find_variables(name, working_dir, base_dir, command_line_variables, variables)
get_variables = None

if import_name.lower().endswith((".yaml", ".yml")):
source = import_name
Expand All @@ -1543,6 +1546,9 @@ def __init__(self, var_file: Any) -> None:
def import_variables(self, path: str, args: Optional[Tuple[Any, ...]] = None) -> Any:
return self._get_variables(self.var_file, args)

def is_dynamic(self) -> bool:
return bool(self._is_dynamic(self.var_file))

module_importer = Importer("variable file", LOGGER)

if get_robot_version() >= (5, 0):
Expand All @@ -1555,6 +1561,9 @@ def import_variables(self, path: str, args: Optional[Tuple[Any, ...]] = None) ->

importer = MyPythonImporter(libcode)

if importer.is_dynamic():
get_variables = getattr(libcode, "get_variables", None) or getattr(libcode, "getVariables", None)

# TODO: add type information of the value including dict key names and member names
vars: List[ImportedVariableDefinition] = [
ImportedVariableDefinition(
Expand All @@ -1572,14 +1581,54 @@ def import_variables(self, path: str, args: Optional[Tuple[Any, ...]] = None) ->
for name, value in importer.import_variables(import_name, args)
]

return VariablesDoc(
libdoc = VariablesDoc(
name=stem,
source=source or module_spec.origin if module_spec is not None else import_name,
module_spec=module_spec,
variables=vars,
stdout=std_capturer.getvalue(),
python_path=sys.path,
)

if get_variables is not None:

class VarHandler(_PythonHandler):
def _get_name(self, handler_name: Any, handler_method: Any) -> Any:
return get_variables.__name__ if get_variables is not None else ""

def _get_initial_handler(self, library: Any, name: Any, method: Any) -> Any:
return None

vars_initializer = VarHandler(libdoc, get_variables.__name__, get_variables)

libdoc.inits = KeywordStore(
keywords=[
KeywordDoc(
name=libdoc.name,
args=[KeywordArgumentDoc.from_robot(a) for a in kw[0].args],
doc=kw[0].doc,
tags=list(kw[0].tags),
source=kw[0].source,
line_no=kw[0].lineno if kw[0].lineno is not None else -1,
col_offset=-1,
end_col_offset=-1,
end_line_no=-1,
type="library",
libname=libdoc.name,
libtype=libdoc.type,
longname=f"{libdoc.name}.{kw[0].name}",
is_initializer=True,
arguments=ArgumentSpec.from_robot_argument_spec(kw[1].arguments),
parent=libdoc.digest,
)
for kw in [
(KeywordDocBuilder().build_keyword(k), k)
for k in [KeywordWrapper(vars_initializer, libdoc.source or "")]
]
]
)

return libdoc
except (SystemExit, KeyboardInterrupt, IgnoreEasterEggLibraryWarning):
raise
except BaseException as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1770,11 +1770,11 @@ async def complete_VariablesImport( # noqa: N802
position: Position,
context: Optional[CompletionContext],
) -> Union[List[CompletionItem], CompletionList, None]:
from robot.parsing.lexer.tokens import Token
from robot.parsing.model.statements import VariablesImport
from robot.parsing.lexer.tokens import Token as RobotToken
from robot.parsing.model.statements import Statement, VariablesImport

import_node = cast(VariablesImport, node)
import_token = import_node.get_token(Token.VARIABLES)
import_token = import_node.get_token(RobotToken.VARIABLES)

if import_token is None:
return []
Expand All @@ -1784,85 +1784,191 @@ async def complete_VariablesImport( # noqa: N802

import_token_index = import_node.tokens.index(import_token)

if len(import_node.tokens) > import_token_index + 2:
name_token = import_node.tokens[import_token_index + 2]
if not position.is_in_range(r := range_from_token(name_token)):
return None
async def complete_import() -> Optional[List[CompletionItem]]:
if len(import_node.tokens) > import_token_index + 2:
name_token = import_node.tokens[import_token_index + 2]
if not position.is_in_range(r := range_from_token(name_token)):
return None

elif len(import_node.tokens) > import_token_index + 1:
name_token = import_node.tokens[import_token_index + 1]
if position.is_in_range(r := range_from_token(name_token)):
if whitespace_at_begin_of_token(name_token) > 1:
ws_b = whitespace_from_begin_of_token(name_token)
r.start.character += 2 if ws_b and ws_b[0] != "\t" else 1
elif len(import_node.tokens) > import_token_index + 1:
name_token = import_node.tokens[import_token_index + 1]
if position.is_in_range(r := range_from_token(name_token)):
if whitespace_at_begin_of_token(name_token) > 1:
ws_b = whitespace_from_begin_of_token(name_token)
r.start.character += 2 if ws_b and ws_b[0] != "\t" else 1

if not position.is_in_range(r):
if not position.is_in_range(r):
return None
else:
return None
else:
return None
else:
return None
else:
return None

pos = position.character - r.start.character
text_before_position = str(name_token.value)[:pos].lstrip()
pos = position.character - r.start.character
text_before_position = str(name_token.value)[:pos].lstrip()

if text_before_position != "" and all(c == "." for c in text_before_position):
return None
if text_before_position != "" and all(c == "." for c in text_before_position):
return None

last_separator_index = (
len(text_before_position)
- next((i for i, c in enumerate(reversed(text_before_position)) if c in ["/", os.sep]), -1)
- 1
)
last_separator_index = (
len(text_before_position)
- next((i for i, c in enumerate(reversed(text_before_position)) if c in ["/", os.sep]), -1)
- 1
)

first_part = (
text_before_position[
: last_separator_index + (1 if text_before_position[last_separator_index] in ["/", os.sep] else 0)
first_part = (
text_before_position[
: last_separator_index + (1 if text_before_position[last_separator_index] in ["/", os.sep] else 0)
]
if last_separator_index < len(text_before_position)
else None
)

try:
complete_list = await self.namespace.imports_manager.complete_variables_import(
first_part if first_part else None,
str(self.document.uri.to_path().parent),
await self.namespace.get_resolvable_variables(nodes_at_position, position),
)
if not complete_list:
return None
except (SystemExit, KeyboardInterrupt, asyncio.CancelledError):
raise
except BaseException:
return None

if text_before_position == "":
r.start.character = position.character
else:
r.start.character += last_separator_index + 1 if last_separator_index < len(text_before_position) else 0

return [
CompletionItem(
label=e.label,
kind=CompletionItemKind.FILE
if e.kind in [CompleteResultKind.VARIABLES]
else CompletionItemKind.FILE
if e.kind in [CompleteResultKind.FILE]
else CompletionItemKind.FOLDER
if e.kind in [CompleteResultKind.FOLDER]
else None,
detail=e.kind.value,
sort_text=f"030_{e}",
insert_text_format=InsertTextFormat.PLAIN_TEXT,
text_edit=TextEdit(range=r, new_text=e.label) if r is not None else None,
data=CompletionItemData(
document_uri=str(self.document.uri),
type=e.kind.name,
name=((first_part) if first_part is not None else "") + e.label,
),
)
for e in complete_list
]
if last_separator_index < len(text_before_position)
else None
)

try:
complete_list = await self.namespace.imports_manager.complete_variables_import(
first_part if first_part else None,
str(self.document.uri.to_path().parent),
await self.namespace.get_resolvable_variables(nodes_at_position, position),
)
if not complete_list:
async def complete_arguments() -> Optional[List[CompletionItem]]:
if (
import_node.name is None
or position <= range_from_token(import_node.get_token(RobotToken.NAME)).extend(end_character=1).end
):
return None
except (SystemExit, KeyboardInterrupt, asyncio.CancelledError):
raise
except BaseException:

with_name_token = next((v for v in import_node.tokens if v.value == "WITH NAME"), None)
if with_name_token is not None and position >= range_from_token(with_name_token).start:
return None

if context is None or context.trigger_kind != CompletionTriggerKind.INVOKED:
return []

kw_node = cast(Statement, node)

tokens_at_position = get_tokens_at_position(kw_node, position)

if not tokens_at_position:
return None

token_at_position = tokens_at_position[-1]

if token_at_position.type not in [RobotToken.ARGUMENT, RobotToken.EOL, RobotToken.SEPARATOR]:
return None

if (
token_at_position.type == RobotToken.EOL
and len(tokens_at_position) > 1
and tokens_at_position[-2].type == RobotToken.KEYWORD
):
return None

token_at_position_index = kw_node.tokens.index(token_at_position)

argument_token_index = token_at_position_index
while argument_token_index >= 0 and kw_node.tokens[argument_token_index].type != RobotToken.ARGUMENT:
argument_token_index -= 1

argument_token: Optional[RobotToken] = None
if argument_token_index >= 0:
argument_token = kw_node.tokens[argument_token_index]

completion_range = range_from_token(argument_token or token_at_position)
completion_range.end = range_from_token(token_at_position).end
if (w := whitespace_at_begin_of_token(token_at_position)) > 0:
if w > 1 and range_from_token(token_at_position).start.character + 1 < position.character:
completion_range.start = position
elif completion_range.start != position:
return None
else:
if "=" in (argument_token or token_at_position).value:
equal_index = (argument_token or token_at_position).value.index("=")
if completion_range.start.character + equal_index < position.character:
return None

completion_range.end.character = completion_range.start.character + equal_index + 1
else:
completion_range.end = position

try:
libdoc = await self.namespace.get_imported_variables_libdoc(import_node.name, import_node.args)

except (SystemExit, KeyboardInterrupt, asyncio.CancelledError):
raise
except BaseException as e:
self._logger.exception(e)
return None

if libdoc is not None:
init = next((v for v in libdoc.inits.values()), None)

if init:
return [
CompletionItem(
label=f"{e.name}=",
kind=CompletionItemKind.VARIABLE,
sort_text=f"010{i}_{e.name}",
filter_text=e.name,
insert_text_format=InsertTextFormat.PLAIN_TEXT,
text_edit=TextEdit(range=completion_range, new_text=f"{e.name}="),
data=CompletionItemData(
document_uri=str(self.document.uri),
type="Argument",
name=e.name,
),
)
for i, e in enumerate(init.args)
if e.kind
not in [
KeywordArgumentKind.VAR_POSITIONAL,
KeywordArgumentKind.VAR_NAMED,
KeywordArgumentKind.NAMED_ONLY_MARKER,
KeywordArgumentKind.POSITIONAL_ONLY_MARKER,
]
]

return None

if text_before_position == "":
r.start.character = position.character
else:
r.start.character += last_separator_index + 1 if last_separator_index < len(text_before_position) else 0
result = await complete_import() or []
# TODO this is not supported in robotframework, but it would be nice to have
# result.extend(await complete_arguments() or [])

return [
CompletionItem(
label=e.label,
kind=CompletionItemKind.FILE
if e.kind in [CompleteResultKind.VARIABLES]
else CompletionItemKind.FILE
if e.kind in [CompleteResultKind.FILE]
else CompletionItemKind.FOLDER
if e.kind in [CompleteResultKind.FOLDER]
else None,
detail=e.kind.value,
sort_text=f"030_{e}",
insert_text_format=InsertTextFormat.PLAIN_TEXT,
text_edit=TextEdit(range=r, new_text=e.label) if r is not None else None,
data=CompletionItemData(
document_uri=str(self.document.uri),
type=e.kind.name,
name=((first_part) if first_part is not None else "") + e.label,
),
)
for e in complete_list
]
return result # noqa: RET504

async def _complete_KeywordCall_or_Fixture( # noqa: N802
self,
Expand Down
Loading

0 comments on commit 94b21fb

Please sign in to comment.