
get python params?


In [None]:
import ast
# import inspect


def get_parameter_names(source):
    """
    Extracts parameter names from a function using the ast module.

    Args:
        func: The function to inspect.

    Returns:
        A list of parameter names as strings.
    """
    #source = inspect.getsource(func)
    tree = ast.parse(source)
    function_def = next((node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)), None)

    if function_def:
        return [arg.arg for arg in function_def.args.args]
    else:
        return []


__all__ = {
  "get_parameter_names": get_parameter_names,
}



### import notebook


In [None]:
import types
import ctypes
import os
import re

def safeurl(name):
    return re.sub(r'\W+', '_', name)

modules = {}

def import_notebook(query_str, ctx):
    from Core import search_whoosh, build_dylib
    """Searches the Whoosh index and returns a module of the retrieved code."""
    results = search_whoosh(query_str)
    
    if not results:
        raise ImportError(f"No matching notebook found for query: {query_str}")
    
    if(results[0]["filename"] in modules):
        return vars(modules[results[0]["filename"]])["__all__"]


    module = types.ModuleType(results[0]["filename"])
    module.__file__ = results[0]["filename"]
    module.import_notebook = import_notebook

    if(results[0]["language"] == "c" or results[0]["language"] == "cpp" or results[0]["language"] == "objective-c"):
        build_dylib(results[0]["code"], results[0]["filename"], {})
        lib_name = safeurl(results[0]["questions"][0])
        BUILD_DIRECTORY = os.path.join(os.path.dirname(__file__), "../.build")
        result = ctypes.CDLL(os.path.join(BUILD_DIRECTORY, lib_name + '.dylib'))
        import_notebook("list c functions with python", ctx)
        functions = get_functions(results[0]["code"])
        module.__all__ = {}
        for f in functions:
            func_name = f.split('(')[0]
            vars(module)['__all__'][func_name] = getattr(result, func_name)
        
    elif(results[0]["language"] == "python"):
        exec(results[0]["code"], module.__dict__)  # Execute the first matching cell in the module namespace
    
    for name in vars(module)['__all__']:
        # setattr(vars(module)['__all__'], name, vars(module)['__all__'][name])
        ctx[name] = vars(module)['__all__'][name]
        module.__dict__[name] = vars(module)['__all__'][name]
        globals()[name] = vars(module)['__all__'][name]

    modules[results[0]["filename"]] = module

    return vars(module)['__all__']

__all__ = {
  "import_notebook": import_notebook,
}



### build_dylib()


In [None]:
import os
import re
import subprocess

BUILD_DIRECTORY = os.path.join(os.path.dirname(__file__), "../.build")
os.makedirs(BUILD_DIRECTORY, exist_ok=True)

def safeurl(name):
    return re.sub(r'\W+', '_', name)

def build_dylib(code, path_to_code, ctx):
    from Core import search_whoosh
    code_cell = search_whoosh(path_to_code)[0]
    lib_name = safeurl(code_cell['questions'][0])
    lib_ext = ".cpp" if code_cell['language'] == "cpp" else ".c"
    lib_path = os.path.join(BUILD_DIRECTORY, f"{lib_name}{lib_ext}")
    
    if not os.path.exists(lib_path) or os.path.getmtime(code_cell['filename']) > os.path.getmtime(lib_path):
        with open(lib_path, "w") as f:
            f.write("".join(code_cell['code']))
    
    env = {}
    
    for match in re.finditer(r'([A-Z_])\s*[:=-]+\s*(.*?)\s*(\n|$)', "".join(code_cell['markdown'])):
        env[match[1]] = match[2]
    
    if code_cell['language'] == "cpp":
        env["CXX"] = os.getenv("CXX", "clang++").split()
        env["STD"] = os.getenv("STD", "-std=c++17 -stdlib=libc++").split()
    else:
        env["CXX"] = os.getenv("CXX", "clang").split()
    
    obj_path = os.path.join(BUILD_DIRECTORY, f"{lib_name}.o")
    if not os.path.exists(obj_path) or os.path.getmtime(lib_path) > os.path.getmtime(obj_path):
        mods = ["-x", "objective-c", "-fno-objc-arc"] if code_cell['language'] == "objective-c" else []
        if "@import" in "".join(code_cell['code']):
            mods = ["-fmodules"] + mods
        
        cflags = []
        if "PKG_CONFIG" in env:
            result = subprocess.run(["pkg-config", "--cflags"] + env["PKG_CONFIG"].split(), capture_output=True, text=True)
            cflags = result.stdout.split()
        
        args = ["-c", lib_path, "-o", obj_path]
        subprocess.run(env["CXX"] + mods + cflags + args, check=True, env=os.environ)
    
    mod_path = os.path.join(BUILD_DIRECTORY, f"{lib_name}.dylib")
    if not os.path.exists(mod_path) or os.path.getmtime(obj_path) > os.path.getmtime(mod_path):
        libs = []
        if "PKG_CONFIG" in env:
            result = subprocess.run(["pkg-config", "--libs"] + env["PKG_CONFIG"].split(), capture_output=True, text=True)
            libs = result.stdout.split()
        
        mods = ["-dynamiclib", "-rdynamic"] if "clang" in env["CXX"][0] else []
        args = ["-o", mod_path]
        subprocess.run(env["CXX"] + [obj_path] + mods + libs + args, check=True, env=os.environ)

__all__ = {
    "build_dylib": build_dylib
}



### accumulate markdown


In [None]:
import os
import json
import re
import datetime

def get_questions(source, markdown):
    """Extracts questions from markdown and source code."""
    match_questions = re.compile(r'^.*\?.*$', re.IGNORECASE | re.MULTILINE)
    
    questions = [re.sub(r'how to|\?|#+', '', q, flags=re.IGNORECASE).strip()
                 for q in match_questions.findall(markdown)]
    
    questions += [re.sub(r'how to|\?|#+', '', q, flags=re.IGNORECASE).strip()
                  for q in match_questions.findall(source) if 'how to' in q.lower()]
    
    questions.sort(key=len)
    return questions + [questions[0]] if questions else ['']

def accumulate_markdown(cells):
    """Accumulates markdown leading up to code cells."""
    codes = [c for c in cells if c["cell_type"] == "code"]
    result = []
    
    for i, code_cell in enumerate(codes):
        from_idx = cells.index(codes[i-1]) + 1 if i > 0 else 0
        to_idx = cells.index(code_cell)
        markdown = "\n".join("".join(m["source"]) for m in cells[from_idx:to_idx])
        code = "".join(code_cell["source"])
        result.append({
            "id": code_cell["id"], 
            "language": code_cell["language"], 
            "from": from_idx, 
            "to": to_idx, 
            "markdown": markdown, 
            "code": code
        })
    
    return result

def cache_cells(filename):
    from Core import get_cells
    """Parses a Jupyter notebook, extracts relevant cells, and generates cache entries."""
    filename = os.path.abspath(filename)
    mtime = os.path.getmtime(filename)
    
    cells = get_cells(filename)
    new_cache = accumulate_markdown(cells)
    
    return [{
        "id": f"{os.path.basename(filename)}[{i}]",
        "filename": filename,
        "mtime": datetime.datetime.fromtimestamp(mtime),
        "questions": get_questions(c["code"], c["markdown"]),
        "notebook": os.path.basename(filename),
        **c
    } for i, c in enumerate(new_cache)]


__all__ = {
  "cache_cells": cache_cells,
  "accumulate_markdown": accumulate_markdown,
  "get_questions": get_questions,
}



### initialize database



In [None]:
import os
import json
import datetime
from whoosh.index import create_in
from whoosh.fields import Schema, TEXT, ID, DATETIME, KEYWORD
from whoosh.qparser import QueryParser, MultifieldParser
from whoosh.writing import AsyncWriter

# Define schema for Whoosh index
schema = Schema(markdown=TEXT(stored=True), language=TEXT(stored=True), mtime=DATETIME(stored=True), id=ID(stored=True), questions=TEXT(stored=True), filename=TEXT(stored=True), code=TEXT(stored=True))

# Ensure index directory exists
if not os.path.exists(".cache"):
    os.mkdir(".cache")
    index = create_in(".cache", schema)
else:
    from whoosh.index import open_dir
    index = open_dir(".cache")

def store_in_whoosh(cells):
    """Stores extracted cells in Whoosh index."""
    writer = AsyncWriter(index)
    for cell in cells:
        if 'code' in cell: # and cell["language"] == "python":
            # print(cell["questions"], cell["filename"])
            writer.add_document(markdown=cell["markdown"], language=cell["language"], mtime=cell["mtime"], id=cell["id"], questions="\n".join(cell["questions"]), filename=cell["filename"], code=cell["code"])
            # writer.update_document(id=cell["id"], questions=" ".join(cell["questions"]), filename=cell["filename"], code=cell["code"])
    writer.commit()

globals()['first'] = True
def search_whoosh(question):
#  with index.searcher() as searcher:
#    query = QueryParser("questions", index.schema).parse(question)  # Fuzzy search
#    results = searcher.search(query)
#    return results
    if globals()['first']:
        scan_directory(os.path.join(os.path.dirname(__file__), '..'), 3)
        globals()['first'] = False

    with index.searcher() as searcher:
        parser = MultifieldParser(["filename", "questions"], schema=schema)
        query = parser.parse(question)
        #results = searcher.search(query, limit=10)  # Adjust limit as needed
        results = searcher.search(query)  # Adjust limit as needed
        return [{"filename": r["filename"], "code": r["code"], "language": r["language"], "markdown": r["markdown"], "questions": r["questions"].split('\n')} for r in results]

def scan_directory(directory, limit):
    from Core import cache_cells
    """Recursively scans a directory for notebooks and stores extracted cells in Whoosh index."""
    all_cells = []
    
    for root, _, files in os.walk(directory):
        for file in files:
            if file.startswith("."):
                continue

            if os.path.basename(root).startswith("."):
                continue

            if file.endswith(".ipynb"):

                # TODO: compare date time and delete notebooks only if outdated
                #parser = QueryParser("date", schema=ix.schema)
                #query = DateRange("date", datetime(1, 1, 1), datetime())
                #results = searcher.search(query)

                notebook_path = os.path.join(root, file)
                parser = QueryParser("filename", schema=schema)
                query = parser.parse(os.path.abspath(notebook_path))
                results = index.searcher().search(query)
                if(not results or len(results) == 0
                    or results[0]['mtime'] < datetime.datetime.fromtimestamp(os.path.getmtime(os.path.abspath(notebook_path)))):
                    print("replacing: ", notebook_path)
                    writer = AsyncWriter(index)
                    writer.delete_by_query(query, index.searcher())
                    writer.commit()
                    all_cells.extend(cache_cells(notebook_path))
            

    store_in_whoosh(all_cells)
    print(f"Stored {len(all_cells)} cells in Whoosh index.")


__all__ = {
  "scan_directory": scan_directory,
  "search_whoosh": search_whoosh,
}




### get_cells(notebook_path)


In [None]:
import json
import os

def get_cells(notebook_path, types=['*', 'code']):
    """Extract cells from a Jupyter Notebook with additional metadata."""
    notebook_path = os.path.abspath(notebook_path)

    with open(notebook_path, 'r', encoding='utf-8') as f:
        notebook = json.load(f)

    kernel = notebook.get('metadata', {}).get('kernelspec', {})
    
    cells = [
        {
            **cell,
            "language": (cell.get("metadata", {}).get("vscode", {}).get("languageId") or
                         kernel.get("language") or
                         notebook.get("metadata", {}).get("language_info", {}).get("name", "")),
            "filename": notebook_path,
            "id": f"{os.path.basename(notebook_path)}[{i}]"
        }
        for i, cell in enumerate(notebook.get("cells", []))
        if '*' in types or cell.get("cell_type") in types
    ]

    return cells

__all__ = {
  "get_cells": get_cells
}



### run()

run python cells?



In [None]:
import json
import os
import sys
import types

def run():
    from Core import search_whoosh, import_notebook, get_parameter_names
    if len(sys.argv) < 2:
        print("Usage: python script.py <notebook_path> <function_args>")
        sys.exit(1)

    notebook_path = sys.argv[1]
    inputs = sys.argv[2:]

    # Import the notebook as a module
    results = search_whoosh(notebook_path)
    #print(results[0]['code'])

    module = import_notebook(notebook_path, globals())
    print(module)

    # Find the first function in the module
    func = None
    for name in list(module.keys()):
        # TODO: support classes ? attr = getattr(module, name)
        attr = module[name]
        if callable(attr):
            func = attr
            break

    if not func:
        print("No function found in the notebook.")
        sys.exit(1)
    print(func)

    # Extract parameters and map inputs
    params = []
    if(results[0]["language"] == 'python'):
        params = get_parameter_names(results[0]['code'])
        print(params)
    mapped_inputs = []

    for param in params:
        for i, arg in enumerate(inputs):
            if arg.startswith(f"--{param}="):
                mapped_inputs.append(arg.split("=")[1])
                break
        else:
            mapped_inputs.append(inputs.pop(0) if inputs else None)

    # Convert types based on function annotations (if available)
    if hasattr(func, '__annotations__'):
        func_annotations = func.__annotations__
        print(func_annotations)
        for i, param in enumerate(params):
            if param in func_annotations:
                mapped_inputs[i] = func_annotations[param](mapped_inputs[i])

    # Execute the function
    result = func(*mapped_inputs)
    print(result)

if __name__ == "__run__":
    run()

__all__ = {
  "run": run
}
