In [None]:
# | default_exp parse_module

In [None]:
# | export

import ast
import os
import re
from dataclasses import dataclass
from pathlib import Path

import pandas as pd
from nbdev.export import get_config

In [None]:
# | export


def extract_module_only(package_module_name):
    module_name = package_module_name
    if "." in module_name:
        package_name, module_name = module_name.split(".")
    return module_name

In [None]:
pkg_module_name = "test.module"
module_name = "module"
path_sep_module_name = module_name.replace(".", "/")

In [None]:
assert "module" == extract_module_only(module_name)
assert "module" == extract_module_only(pkg_module_name)

In [None]:
# | export


def extract_step_code(
    module_path: Path,
    export_comments=("#|export", "#|exporti", "#|exports"),
    remove_comment_lines=True,
):
    with open(module_path, "r") as module_file:
        lines = module_file.readlines()
    lines = pd.Series(lines)
    step_code = {}
    active_step = None
    for l in lines.tolist():
        trimmed_line = l.lower().replace(" ", "")
        if trimmed_line.startswith("#|export_step"):
            active_step = trimmed_line.split("#|export_step")[1].strip()
        elif trimmed_line.startswith(export_comments):
            active_step = None
        if l.startswith("#") and remove_comment_lines:
            continue
        if active_step:
            if not active_step in step_code:
                step_code[active_step] = []
            step_code[active_step].append(l)
    for key in step_code.keys():
        step_code[key] = "".join(step_code[key])
    return step_code

In [None]:
test_module = os.path.join(get_config().path("lib_path"), "test", "test_multistep.py")
step_code = extract_step_code(test_module)
step_names = step_code.keys()
assert ["first", "preprocess", "fit", "evaluate"] == list(step_names)
assert all(
    [
        len([i for i in range(len(sc)) if sc.startswith("def", i)]) == 1
        for sc in step_code.values()
    ]
)

In [None]:
test_module = os.path.join(get_config().path("lib_path"), "test", "test_export.py")
step_code = extract_step_code(test_module)
step_names = step_code.keys()
assert ["first", "preprocess", "train", "last"] == list(step_names)
assert not step_code["first"].startswith("#")
assert extract_step_code(test_module, remove_comment_lines=False)["first"].startswith(
    "#"
)
assert all(
    [
        len([i for i in range(len(sc)) if sc.startswith("def", i)]) == 1
        for sc in step_code.values()
    ]
)

AssertionError: 

In [None]:
# | export


class FuncLister(ast.NodeVisitor):
    has_return = False

    def visit_Return(self, node):
        self.has_return = True

    def visit_FunctionDef(self, node):
        self.name = node.name
        self.docstring = ast.get_docstring(node)
        self.args = node.args.args
        self.arg_names = [a.arg for a in node.args.args]
        self.generic_visit(node)


import pprint

pp = pprint.PrettyPrinter(indent=4, width=120, compact=True)


@dataclass
class FuncDetails:
    name: str
    docstring: str
    args: str
    has_return: bool
    return_stmt: str
    code: str

    def __repr__(self):
        return pp.pformat(
            f"FuncDetails(name={self.name},args={self.args},has_return={self.has_return}):\n{self.code.strip()}"
        )

In [None]:
some_func = """
def some_func():
    print 1
"""
assert (
    FuncDetails("a", None, "an_arg", True, "return True", some_func).__repr__()
    == "'FuncDetails(name=a,args=an_arg,has_return=True):\\ndef some_func():\\n    print 1'"
)

In [None]:
# | export


def extract_return_stmt(func_name, code):
    return_stmt = [
        l.strip().split("return")[1].strip()
        for l in code.splitlines()
        if l.strip().startswith("return")
    ]
    if len(return_stmt) == 0:
        return
    return_stmt = return_stmt[0]
    is_named_variable = bool(re.search("^[a-zA-Z]+[a-zA-Z0-9_]*$", return_stmt))
    if not is_named_variable:
        raise NotImplementedError(
            f"Inline return statements are not supported. Assign the return value of {func_name} to a variable before returning."
        )
    return return_stmt

In [None]:
named_return = """
def preprocess(conn, model_level, min_date, traffic_percent):
    data = get_utterances(conn, model_level, min_date, traffic_percent)
    button_filter = get_button_responses_filter(conn)
    user_texts = data[~data.Utterance.isin(button_filter)].copy()
    documents = {"some_field": user_texts.Utterance.tolist()}
    return documents
"""

multiple_key_return = """
def evaluate(model):
    topic_words, word_scores, topic_nums = model.get_topics(model.get_num_topics())

    topic_contains_non_empty_words = all([len(tw) > 0 for tw in topic_words])
    word_scores_in_range = word_scores.min() >= 0.0 and word_scores.max() <= 1.0
    as_many_items_as_topics = (
        model.get_num_topics() == len(topic_words) == word_scores.shape[0]
    )
    word_summaries = (
        topic_contains_non_empty_words
        and word_scores_in_range
        and as_many_items_as_topics
    )
    # You can add artifacts in a step that will be saved to block storage. Add the paths to the file on the local filesystem
    # and the artifact will be uploaded to remote storage.
    sample_df = pd.DataFrame(
        {"a": model.get_topic_sizes()[0], "b": model.get_topic_sizes()[1]}
    )
    sample_df.to_csv("/tmp/dataframe_artifact.csv", index=False)
    artifacts = ["/tmp/dataframe_artifact.csv"]
    # You can add step metrics too this time just add a list of 3-tuples where tuple order = (name, value, step)
    metrics = [("mae", 100, 0), ("mae", 67, 1), ("mae", 32, 2)]
    results = {
        "word_summaries": word_summaries,
        "artifacts": artifacts,
        "metrics": metrics,
    }
    return results
"""
unnamed_return = """
def fit(documents, workers=workers, speed="fast-learn"):
    return {Top2Vec(documents, workers=workers, speed=speed)}
"""

number_return = """
def fit(documents, workers=workers, speed="fast-learn"):
    return 1
"""

In [None]:
valid_code_block = """
def train(input_path: Path, model_path: Path):
    \"""Function docs\"""
    import time
    import pandas as pd
    print(f'Training {model_path} on {input_path}...')
    time.sleep(1)
"""

invalid_code_block = """
def train(input_path: Path, model_path: Path):
    import time
    import pandas as pd
    print(f'Training {model_path} on {input_path}...')
    time.slurp(1)
"""

In [None]:
assert extract_return_stmt("train", named_return) == "documents"
assert extract_return_stmt("train", valid_code_block) is None
try:
    extract_return_stmt("train", number_return)
except NotImplementedError as e:
    assert e is not None
try:
    extract_return_stmt("train", unnamed_return)
except NotImplementedError as e:
    assert e is not None

In [None]:
# | export


def parse_step(step_code: str):
    tree = ast.parse(step_code)
    lister = FuncLister()
    lister.visit(tree)
    if "name" not in lister.__dict__:
        raise (
            ValueError("Step must have a single valid function; check step definition")
        )
    return FuncDetails(
        lister.name,
        lister.docstring,
        ",".join(lister.arg_names),
        lister.has_return,
        extract_return_stmt(lister.name, step_code),
        step_code,
    )

In [None]:
# | export


def extract_return_var_names(step):
    results_index = step.code.find(f"{step.return_stmt} =")
    if results_index == -1:
        return []

    keys = []
    for l in step.code[results_index:].split("\n"):
        if l.strip().find(":") > -1:
            key_prefix = l.split(":")[0]
            key = key_prefix[key_prefix.find("{") + 1 :]
            keys.append(key.strip(' ",'))
    return keys

In [None]:
assert ["some_field"] == extract_return_var_names(parse_step(named_return))
assert ["word_summaries", "artifacts", "metrics"] == extract_return_var_names(
    parse_step(multiple_key_return)
)

In [None]:
assert "documents" == parse_step(named_return).return_stmt

In [None]:
func_dets = parse_step(valid_code_block)
assert func_dets.name == "train"
assert func_dets.args == ",".join(["input_path", "model_path"])
assert not func_dets.has_return
assert type(func_dets.code) == str

In [None]:
# | export


def extract_steps(module_path: Path):
    step_code = extract_step_code(module_path)
    steps = [parse_step(step_code[k]) for k in step_code.keys()]
    return steps

In [None]:
extract_steps(test_module)

In [None]:
# | export


def _convert_return_stmt(numbered_step):
    number, step = numbered_step
    step["return_stmt"] = "" if not step["return_stmt"] else step["return_stmt"]
    return number, step