**Goal:** Implement a macro facility in Python. Do it at the syntax level (_à la_ LISP) rather than the text level (as with `#define` in C and derivatives).

**Why is this useful?** It allows for more code reuse. In my [DBussy](https://github.com/ldo/dbussy/) Python wrapper for `libdbus`, I try to provide both synchronous and asynchronous alternatives for performing the same function. Here’s one example:

    def get_proxy_interface(self, destination, path, interface, timeout = DBUS.TIMEOUT_USE_DEFAULT) :
        "sends an Introspect request to the specified bus name and object path" \
        " (if interface is not an Interface object or the name of one of the standard" \
        " interfaces), and generates a client-side proxy interface for that interface."
        if isinstance(interface, Introspection.Interface) :
            definition = interface
            interface = definition.name
        elif isinstance(interface, str) :
            if interface in dbus.standard_interfaces :
                definition = dbus.standard_interfaces[interface]
            else :
                introspection = self.introspect(destination, path, timeout)
                interfaces = introspection.interfaces_by_name
                if interface not in interfaces :
                    raise dbus.DBusError \
                      (
                        DBUS.ERROR_UNKNOWN_INTERFACE,
                            "peer “%s” object “%s” does not understand interface “%s”"
                        %
                            (destination, path, interface)
                      )
                #end if
                definition = interfaces[interface]
            #end if
        else :
            raise TypeError("interface must be an Interface or name of one")
        #end if
        return \
            def_proxy_interface \
              (
                name = interface,
                kind = INTERFACE.CLIENT,
                introspected = definition,
                is_async = False
              )
    #end get_proxy_interface

    async def get_proxy_interface_async(self, destination, path, interface, timeout = DBUS.TIMEOUT_USE_DEFAULT) :
        "sends an Introspect request to the specified bus name and object path" \
        " (if interface is not an Interface object or the name of one of the standard" \
        " interfaces), and generates a client-side proxy interface for that interface."
        assert self.loop != None, "no event loop to attach coroutine to"
        if isinstance(interface, Introspection.Interface) :
            definition = interface
            interface = definition.name
        elif isinstance(interface, str) :
            if interface in dbus.standard_interfaces :
                definition = dbus.standard_interfaces[interface]
            else :
                introspection = await self.introspect_async(destination, path, timeout)
                interfaces = introspection.interfaces_by_name
                if interface not in interfaces :
                    raise dbus.DBusError \
                      (
                        DBUS.ERROR_UNKNOWN_INTERFACE,
                            "peer “%s” object “%s” does not understand interface “%s”"
                        %
                            (destination, path, interface)
                      )
                #end if
                definition = interfaces[interface]
            #end if
        else :
            raise TypeError("interface must be an Interface or name of one")
        #end if
        return \
            def_proxy_interface \
              (
                name = interface,
                kind = INTERFACE.CLIENT,
                introspected = definition,
                is_async = True
              )
    #end get_proxy_interface_async

You can see how the two functions are so close to being the same, yet there is no way in Python to write just one function that can work both ways; a function declared with plain `def` is not allowed to have `async`/`await` constructs in it, and one declared `async` `def` cannot simply be invoked by the caller as a normal function. So we need two parallel versions, and so any change to fix a bug or add functionality in one will likely apply to the other as well, and so will need to be done twice.

You could imagine using the C preprocessor to `#define` some common macro that could expand to both versions. But it is well-known that text-level macro processing is fiddly and fragile, and prone to mysterious compile-time errors if you are not careful. Here is a simple example I found in `/usr/include/bits/select.h`:

    #define __FD_ZERO(s) \
      do {									      \
        unsigned int __i;							      \
        fd_set *__arr = (s);						      \
        for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)	      \
          __FDS_BITS (__arr)[__i] = 0;					      \
      } while (0)

(In `/usr/include/sys/select.h`, the name `FD_ZERO` is defined as a synonym for `__FD_ZERO`.)

While the macro uses a `do`-`while` block to define its own scope, the expansion of the `s` argument brings the whole of the caller’s expression for that argument into that scope. Note the double-underscores in front of all the local variable names: you better hope those will not clash with variables used in the caller’s code!

You may think that the C preprocessor is not a good example of the power available with a more general macro-processing facility. So how about [`m4`](https://www.gnu.org/software/m4/), then? If you have ever worked on a project that uses GNU Autotools for its build system, then you will have made use of `m4`, whether you noticed it or not.

Only this is, if anything, even more fiddly and fragile, and prone to causing mysterious syntax errors in the target language if you are not careful. Let me offer one mildly non-trivial example of `m4` in use that I did some years ago, based on one I found in a book I read back in my teens (enclosed `mow.m4` file).

Those who have encountered LISP or Scheme or one of those languages will be familiar with its having lots of parentheses. The weird syntax basically means that the language representation is already essentially in AST form. This means, for example, that LISP-family languages can provide very powerful, yet non-fiddly and non-fragile macro facilities, that work directly at the syntax level.

So anyway, how do we go about implementing a syntax-level macro system for Python, using its [`ast`](https://docs.python.org/3/library/ast.html) library module? In the following, I will go over the basic steps that are needed just for a proof of concept, rather than present a finished product. Just to simplify things, I won’t directly tackle the example I gave above, but make up a simpler one instead.

Here is a function that will sleep for the specified number of seconds. It is to be provided in a synchronous version that blocks the thread by calling `time.sleep()`, and an asynchronous version that only blocks the current `asyncio` task by calling `await asyncio.sleep()`. The two alternative code paths will be distinguished by testing a variable called `ASYNC`, which you will not is not defined anywhere:

In [None]:
class Sample :

    async def test_syncasync(interval) :
        print("begin")
        if ASYNC :
            print("async sleep for %.1fs" % interval)
            await asyncio.sleep(interval)
        else :
            print("sync sleep for %.1fs" % interval)
            time.sleep(interval)
        #end if
        print("end")
    #end test_syncasync

#end Sample


You can see that there is some code that will be common to both variants, while other code will be different for each variant.

We could have written the function source as a triple-quoted multiline Python string literal. I like writing the function as an ordinary part of the Python source code so that as it gets compiled along with the rest of the program, and even though the generated code is effectively thrown away, likely syntax errors will still get flushed out at this point.

This does mean we need to get back the source code from the function object. Luckily, we can use the `inspect` module to achieve this:

In [None]:
import inspect

funcsrc = inspect.getsource(Sample.test_syncasync)
funcsrc = "\n".join(l[4:] for l in funcsrc.split("\n"))
  # undo indentation
print(funcsrc)

Note the need to strip off the indentation from putting the function into its own separate namespace class.

Consider the transformations we have to perform on the code: the `async def` needs to be changed to plain `def` for the synchronous version. We also need to scan the body for `if ASYNC ...` blocks, and replace them with one alternative for the async version, and the other for the sync version.

The `ast` module defines a handy `NodeTransformer` class which actually makes it quite easy to perform these transformations. Here is my `ConditionalExpander` class which does the necessary work:

In [None]:
import ast

class ConditionalExpander(ast.NodeTransformer) :
    "generates synchronous or asynchronous variant of a function from common code."

    def __init__(self, funcname, newfuncname, is_async) :
        self.funcname = funcname
        self.newfuncname = newfuncname
        self.is_async = is_async
    #end __init__

    def visit_AsyncFunctionDef(self, node) :
        assert node.name == self.funcname
          # only expect to deal with one function
        body = list(self.visit(b) for b in node.body)
        if self.is_async :
            result = node
            result.name = self.newfuncname
            result.body = body
        else :
            result = ast.FunctionDef \
              (
                name = self.newfuncname,
                args = node.args,
                body = body,
                decorator_list = node.decorator_list,
                returns = node.returns,
                type_comment = node.type_comment
              )
        #end if
        return \
            result
    #end visit_AsyncFunctionDef

    def visit_If(self, node) :
        result = None
        if isinstance(node.test, ast.Name) :
            if node.test.id == "ASYNC" and isinstance(node.test.ctx, ast.Load) :
                if self.is_async :
                    if len(node.body) > 1 :
                        result = ast.If \
                          (
                            test = ast.Constant(True),
                            body = node.body,
                            orelse = []
                          )
                    elif len(node.body) == 1 :
                        result = node.body[0]
                    else :
                        result = ast.Pass()
                    #end if
                else :
                    if len(node.orelse) > 1 :
                        result = ast.If \
                          (
                            test = ast.Constant(True),
                            body = node.orelse,
                            orelse = []
                          )
                    elif len(node.orelse) == 1 :
                        result = node.orelse[0]
                    else :
                        result = ast.Pass()
                    #end if
                #end if
            #end if
        #end if
        if result == None :
            result = ast.If \
              (
                test = node.test,
                body = list(self.visit(b) for b in node.body),
                orelse = list(self.visit(b) for b in node.orelse),
              )
        #end if
        return \
            result
    #end visit_If

#end ConditionalExpander


Notice how this subclass defines `visit_`_xxx_`()` methods which will be automatically invoked when the appropriate node types are encountered. The one for `AsyncFunctionDef` does nothing more than change the function name for the asynchronous version, while completely replacing that with a `FunctionDef` node, with the same components, for the synchronous version.

The one that substitutes `if`-statements checks for a condition that is nothing more or less than a reference to the variable `ASYNC`. There is some trickiness about how to remove the `if`-statement, since I cannot return a list of nodes as its replacement, and there is no Python syntax for “sequence of nodes”. If the branch I am substituting consists of a single statement, I can return that directly, otherwise if there is more than one statement, I substitute an `if`-statement that just says `if True: «body»`, and if the `if`-branch is empty, I return a simple `pass` statement.

So how do I use this? I make two copies of the AST from the original source code (since the transformation process makes modifications to the AST), and run each through the transformer:

In [None]:
syntax = ast.parse(funcsrc, filename = "<Sample>", mode = "exec")
sync_version = ConditionalExpander(funcname = "test_syncasync", newfuncname = "test_sync", is_async = False).visit(syntax)
syntax = ast.parse(funcsrc, filename = "<Sample>", mode = "exec")
async_version = ConditionalExpander(funcname = "test_syncasync", newfuncname = "test_async", is_async = True).visit(syntax)


Now, let us try turning the result back into the source code, and see what we get. Note we need to call `fix_missing_locations` to fill in the source-location attributes that `ast.unparse()` needs:

In [None]:
ast.fix_missing_locations(sync_version)
ast.fix_missing_locations(async_version)
print(ast.unparse(sync_version))
print()
print(ast.unparse(async_version))


Notice that all traces of that `ASYNC` variable have disappeared.

That looks reasonable, doesn’t it? Can we actually call these functions?

Yes, but first we need to actually generate function objects that we can call. Let’s do this inside a separate module object, just to contain things to their own namespace:

In [None]:
import types

namespace = types.ModuleType("namespace")

exec("import time", namespace.__dict__)
exec("import asyncio", namespace.__dict__)

exec(compile(sync_version, filename = "<Sample>", mode = "exec"), namespace.__dict__)
exec(compile(async_version, filename = "<Sample>", mode = "exec"), namespace.__dict__)

print(dir(namespace))

Now let us try actually calling the synchronous version:

In [None]:
namespace.test_sync(5)

And the asynchronous version:

In [None]:
# Inside a regular top-level script, we would use
#
#     asyncio.run(namespace.test_async(5))
#
# but inside Jupyter, which is already running an asyncio event loop, we do

await namespace.test_async(5)