# kernel

> IPythonKernel based on pystata
- order: 14

The latest documentation for implementing a wrapper kernel is [here](https://jupyter-client.readthedocs.io/en/latest/wrapperkernels.html), but the current code deviates from those instructions (which call for inheriting from [kernelbase.Kernel](https://github.com/ipython/ipykernel/blob/main/ipykernel/kernelbase.py)), instead inheriting from the IPython kernel implementation, [IPythonKernel](https://github.com/ipython/ipykernel/blob/main/ipykernel/ipkernel.py).

In [None]:
#| default_exp kernel
%load_ext autoreload
%autoreload 2

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from nbstata.config import Config
from nbstata.misc_utils import print_red
from nbstata.stata import set_global
from nbstata.stata_more import user_expression
from nbstata.inspect import get_inspect
from nbstata.stata_session import StataSession
from nbstata.completions import CompletionsManager
from nbstata.cell import Cell
import nbstata # for __version__
from fastcore.basics import patch_to
from ipykernel.ipkernel import IPythonKernel

For a diagram of the main dependencies among the principal nbstata modules, [click here](https://hugetim.github.io/nbstata/dev_docs_index.html).

In [None]:
#| export
class PyStataKernel(IPythonKernel):
    """A jupyter kernel based on pystata"""
    implementation = 'nbstata'
    implementation_version = nbstata.__version__
    language_info = {
        'name': 'stata',
        'version': '17',
        'mimetype': 'text/x-stata',
        'file_extension': '.do',
    }
    banner = "nbstata: a Jupyter kernel for Stata based on pystata"
    help_links = [
        {
            "text": "Stata Documentation",
            "url": "https://www.stata.com/features/documentation/",
        },
        {
            "text": "nbstata Help",
            "url": "https://hugetim.github.io/nbstata/",
        },
    ]

    # for communication from Quarto
    def comm_open(self, stream, ident, msg):
        msg = msg['content']
        if msg['target_name'] == "quarto_kernel_setup":
            for key, value in msg['data']['options']['params'].items():
                set_global(key, value)
            # here, msg['data']['options'] has all quarto setup options
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.stata_ready = False
        self.ipydatagrid_height_set = False
        self.shell.execution_count = 0
        self.inspect_output = "Stata not yet initialized."
        self.nbstata_config = Config()
        self.stata_session = StataSession()
        self.completions = CompletionsManager(self.stata_session)
        self.inspect_output = ""
        self.shell_handlers['comm_open'] = self.comm_open

## Config and Stata initialization
Stata is initialized during the execution of the first cell (in `PyStataKernel.do_execute` below) so that any configuration errors can be displayed in the notebook.

In [None]:
#| export
@patch_to(PyStataKernel)
def init_session(self):
    self.nbstata_config.process_config_file()
    self.nbstata_config.init_stata()
    self.stata_ready = True

In [None]:
#| export
def _stata_error_reply(ename, evalue, execution_count=None):
    reply_content = {
        'status': "error",
        "traceback": [],
        "ename": ename,
        "evalue": evalue,
    }
    if execution_count is not None:
        reply_content['execution_count'] = execution_count
    return reply_content

In [None]:
#| export
_missing_stata_message = (
    "pystata path not found\n"
    "A Stata 17+ installation is required to use the nbstata Stata kernel. "
    "If you already have Stata 17+ installed, "
    "please specify its path in your configuration file."
)

In [None]:
#| hide
print(_missing_stata_message)

pystata path not found
A Stata 17+ installation is required to use the nbstata Stata kernel. If you already have Stata 17+ installed, please specify its path in your configuration file.


In [None]:
#| export
def _handle_stata_import_error(err, silent, execution_count):
    if not silent:
        print_red(f"ModuleNotFoundError: {_missing_stata_message}")
    return _stata_error_reply(
        ename = "ModuleNotFoundError", 
        evalue = _missing_stata_message, 
        execution_count = execution_count,
    )

In [None]:
#| export
def _handle_stata_init_error(err, silent, execution_count):
    reply_content = _stata_error_reply(
        ename = "Stata init error", 
        evalue = str(err), 
        execution_count = execution_count,
    )
    if not silent:
        print_red(reply_content['evalue'])
    return reply_content

## Stata code execution (and error display)

In [None]:
#| hide
#| eval: False
from nbstata.config import launch_stata
from nbstata.stata import run_direct

In [None]:
#| hide
#| eval: False
launch_stata(splash=False)

In [None]:
#| export
def print_stata_error(text):
    lines = text.splitlines()
    if len(lines) >= 2 and lines[-2] == lines[-1]:
        lines.pop(-1) # remove duplicate error code glitch in pystata.stata.run multi-line (ex. below)
    if len(lines) > 2:
        print("\n".join(lines[:-2]))
    print_red("\n".join(lines[-2:]))

In [None]:
from textwrap import dedent

In [None]:
#| hide
#| eval: False
try:
    run_direct(dedent("""\
        disp 1
        disp error"""), echo=False)
except SystemError as err:
    print(str(err))


. disp 1
1

. disp error
error not found
r(111);
r(111);



In [None]:
#| hide
#| eval: False
try:
    run_direct(dedent("""\
        disp 1
        disp error"""), echo=False)
except SystemError as err:
    print_stata_error(str(err))


. disp 1
1

. disp error
[31merror not found
r(111);[0m


In [None]:
print_stata_error(dedent("""\
    output prior to error
    error message
    error code
    """))

output prior to error
[31merror message
error code[0m


In [None]:
#| hide
print_stata_error("one line")

[31mone line[0m


In [None]:
#| export
def _handle_stata_error(err, silent=False, execution_count=None):
    reply_content = _stata_error_reply(
        ename = "Stata error", 
        evalue = str(err), 
        execution_count = execution_count,
    )
    if not silent:
        print_stata_error(reply_content['evalue'])
    return reply_content

In [None]:
#| export
def _format_user_obj(user_expr_output):
    return dict(
        status='ok',
        data={'text/plain': user_expr_output},
        metadata={},
    )

In [None]:
#| export
def _user_expressions(expressions):
    results = {}
    for key, expr in expressions.items():
        try:
            value = _format_user_obj(user_expression(expr))
        except Exception as err:
            value = _stata_error_reply(
                ename = "Stata user expression error",
                evalue = str(err)
            )
            print_red(value['evalue'])
        results[key] = value
    return results

In [None]:
#| eval: False
_user_expressions({"1": "5 + 1"})

{'1': {'status': 'ok', 'data': {'text/plain': '6'}, 'metadata': {}}}

In [None]:
#| eval: False
_user_expressions({"1": "sam"})

[31msam not found

Invalid Stata '[%fmt] [=]exp' display expression: sam[0m


{'1': {'status': 'error',
  'traceback': [],
  'ename': 'Stata user expression error',
  'evalue': "sam not found\n\nInvalid Stata '[%fmt] [=]exp' display expression: sam"}}

In [None]:
#| hide
#| eval: False
_user_expressions({})

{}

In [None]:
#| export
@patch_to(PyStataKernel)
def post_do_hook(self):
    self.inspect_output = ""

In [None]:
#| export
@patch_to(PyStataKernel)
def do_execute(self, code, silent,
               store_history=True, user_expressions=None, allow_stdin=False):
    """Execute Stata code cell"""
    if not self.stata_ready:
        try:
            self.init_session() # do this here so config error messages displayed in notebook
        except OSError as err:
            return _handle_stata_init_error(err, silent, self.execution_count)
        except ModuleNotFoundError as err: # this should almost always be preempted by OSErrors now
            return _handle_stata_import_error(err, silent, self.execution_count)
        
    self.shell.execution_count += 1
    is_setup_cell = code.strip() == "56ed7992-3715-4d16-a6c0-e5f98c12799d"       
    if is_setup_cell:
        code = ""
        # If this is a setup cell, quarto will ignore the output
        # (except for the metadata). Still, `execute_result` needs
        # something to send, so we send an empty string
        self.send_response(self.iopub_socket, 'execute_result', {
            'data': {"text/plain": ""}, 
            'metadata': {'quarto': {'daemonize': False}},
            'execution_count': self.shell.execution_count,
            })
    
    code_cell = Cell(self, code, silent)
    try:
        code_cell.run()
    except SystemError as err:
        return _handle_stata_error(err, silent, self.execution_count)
    self.post_do_hook()
    return {
        'status': "ok",
        'execution_count': self.execution_count,
        'payload': [],
        'user_expressions': _user_expressions(user_expressions or {}),
    }

## Other kernel capabilities

In [None]:
#| export
@patch_to(PyStataKernel)
def do_inspect(self, code, cursor_pos, detail_level=0, omit_sections=()):
    """Display Stata 'describe' output (regardless of cursor position)"""
    if self.stata_ready:
        if not self.inspect_output:
            self.inspect_output = get_inspect(code, cursor_pos, detail_level, omit_sections)
        data = {'text/plain': self.inspect_output}
    else:
        data = {}
    return {"status": "ok", "data": data, "metadata": {}, "found": True}

In [None]:
#| export
@patch_to(PyStataKernel)
def do_complete(self, code, cursor_pos):
    """Provide context-aware tab-autocomplete suggestions"""
    if self.stata_ready:
        cursor_start, cursor_end, matches = self.completions.do(
            code,
            cursor_pos,
        )
    else:
        cursor_start = cursor_end = cursor_pos
        matches = []
    return {
        'status': "ok",
        'cursor_start': cursor_start,
        'cursor_end': cursor_end,
        'metadata': {},
        'matches': matches,
    }

In [None]:
#| export
@patch_to(PyStataKernel)
def do_is_complete(self, code):
    """Overrides IPythonKernel with kernelbase default"""
    return {"status": "unknown"}

In [None]:
#| export
@patch_to(PyStataKernel)
def do_history(
    self,
    hist_access_type,
    output,
    raw,
    session=None,
    start=None,
    stop=None,
    n=None,
    pattern=None,
    unique=False,
):
    """Overrides IPythonKernel with kernelbase default"""
    return {"status": "ok", "history": []}

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()