In [None]:
# hide
# default_exp core
# all_slow

# fastcgi API

> API details for fastcgi

In [None]:
#export
from fastcore.foundation import *
from fastcore.utils import *
from fastcore.meta import *

import struct,socketserver
from enum import Enum
from io import BytesIO,TextIOWrapper

In [None]:
#hide
from nbdev.showdoc import *
from fastcore.test import *
import subprocess,time

This library follows the [FastCGI spec](http://www.mit.edu/~yandros/doc/specs/fcgi-spec.html). It only supports the *Responder* role, and does not support multiplexing (which is not supported by any servers, so is unlikely to be an issue).

## Enums

In [None]:
#export
Record = Enum('Record', 'BEGIN_REQUEST ABORT_REQUEST END_REQUEST PARAMS STDIN '
               'STDOUT STDERR DATA GET_VALUES GET_VALUES_RESULT')
Role = Enum('Role', 'RESPONDER AUTHORIZER FILTER')
Status = Enum('Status', 'REQUEST_COMPLETE CANT_MPX_CONN OVERLOADED UNKNOWN_ROLE')

These `enum`s are used throughout the library, and have the same meanings as the `FCGI_` constants `#define`d in the spec.

In [None]:
for o in Record,Role,Status: print(list(o))

[<Record.BEGIN_REQUEST: 1>, <Record.ABORT_REQUEST: 2>, <Record.END_REQUEST: 3>, <Record.PARAMS: 4>, <Record.STDIN: 5>, <Record.STDOUT: 6>, <Record.STDERR: 7>, <Record.DATA: 8>, <Record.GET_VALUES: 9>, <Record.GET_VALUES_RESULT: 10>]
[<Role.RESPONDER: 1>, <Role.AUTHORIZER: 2>, <Role.FILTER: 3>]
[<Status.REQUEST_COMPLETE: 1>, <Status.CANT_MPX_CONN: 2>, <Status.OVERLOADED: 3>, <Status.UNKNOWN_ROLE: 4>]


In [None]:
#export
def _S(fmt): return struct.Struct('!'+fmt) # use `struct` "network order"
_rec_struct = _S('BBHHbb')
_endreq_struct = _S('LBxxx')
_begreq_struct = _S('Hb5s')

_chk_typs = Record.STDIN,Record.DATA
_stream_typs = _chk_typs+(Record.PARAMS,Record.STDOUT,Record.STDERR)

In [None]:
#export
class _Parser:
    def __init__(self, c): self.c,self.i = c,0

    def unpack(self, fmt):
        if not isinstance(fmt,struct.Struct): fmt = _S(fmt)
        res = fmt.unpack_from(self.c, self.i)
        self.i += fmt.size
        return res[0] if len(res)==1 else res
    
    def loop(self, f):
        while self.i<len(self.c): yield f(self)

In [None]:
#export
def _record(self):
    _,typ,_,contentlen,padlen,_ = self.unpack(_rec_struct)
    return Record(typ),self.unpack(f'{contentlen}s{"x"*padlen}')

def _records(c): yield from _Parser(c).loop(_record)

In [None]:
#export
def _param_sz(self):
    # See http://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.4
    l = self.unpack('b')
    if not l>>7: return l
    self.i -= 1
    return self.unpack('L') & 0x7fffff

def _param(self):
    # See http://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.4
    lk,lv = _param_sz(self),_param_sz(self)
    return self.unpack(f'{lk}s{lv}s')

def _params(s):
    return {k.decode():v.decode() for k,v in _Parser(s.getvalue()).loop(_param)}

In [None]:
#export
def _unpack_content(typ, c):
    "Unpack the content section of a fastcgi binary record of `typ`"
    if typ==Record.BEGIN_REQUEST: return _begreq_struct.unpack(c)[:2]
    if typ==Record.ABORT_REQUEST: return ()
    raise Exception(typ)

def _pack_rec(typ, c=b''):
    "Create a fastcgi binary record containing optional content `c`"
    return _rec_struct.pack(1, typ.value, 1, len(c), 0, 0) + c

def _send_content(req, typ, *args):
    "Send the content section of a fastcgi binary record of `typ` to `req`"
    if typ==Record.END_REQUEST: c = _endreq_struct.pack(*args)
    else: c = args[0] if args else b''
    req.send(_pack_rec(typ, c))

In [None]:
#export
class _Wrapper(TextIOWrapper): close=TextIOWrapper.flush

def _print_bytes(s:str, stream):
    "Convert `s` to `bytes`, using `\r\n` for newlines"
    b = _Wrapper(stream, newline='\r\n', encoding='utf8')
    print(s, file=b)

In [None]:
#export
class _Stream(BytesIO):
    def __init__(self, typ:Record, req): self.typ,self.req=typ,req
    def _send(self, *args): _send_content(self.req, self.typ, *args)
    def send(self):
        for o in chunked(self.getvalue(), 2**15): self._send(bytes(o))
        self._send()

In [None]:
#export
class FcgiHandler(socketserver.BaseRequestHandler):
    def setup(self):
        while not self._recv(self.request.recv(2**17)): pass

    def finish(self):
        for o in self.stdout,self.stderr: o.send()
        _send_content(self.request, Record.END_REQUEST, 0, Status.REQUEST_COMPLETE.value)
        self.stdin.seek(0)
    
    def __getattr__(self,k):
        try: rec = Record[k.upper()]
        except KeyError: raise AttributeError(k)
        return self.streams[rec]

    def _recv(self,d):
        for typ,c in _records(d):
            if typ in (Record.PARAMS,*_chk_typs):
                self.streams[typ].write(c)
                if typ == Record.PARAMS and not c: self.environ = dict(_params(self.streams[typ]))
                elif typ in _chk_typs: self.sz += len(c)
            else: getattr(self,'_'+typ.name)(*_unpack_content(typ,c))
        if self.sz>=self.length: return True

    def _ABORT_REQUEST(self): self.sz=self.length
    def _BEGIN_REQUEST(self,role,keep_conn):
        self.streams = L(_stream_typs).map_dict(_Stream, req=self.request)
        self.sz,self.environ = 0,{}
        assert Role(role)==Role.RESPONDER, f"{role} not supported"
        assert not keep_conn, "FCGI_KEEP_CONN not supported"

    @property
    def content(self): return self.stdin.getvalue()
    @property
    def length(self): return int(self.environ.get('CONTENT_LENGTH', 1))

This is used in much the same way as [BaseRecordHandler](https://docs.python.org/3/library/socketserver.html#request-handler-objects), except that receiving the data is handled for you before your `handle` method is called. All headers are available in the `params` dictionary, and `stdin` contains the data sent to your handler.

Use the `stdout` and `stderr` streams to send data to the client.

Here's an example subclass:

In [None]:
class TestHandler(FcgiHandler):
    def handle(self):
        print('query:', self.environ['QUERY_STRING'])
        print('content type:', self.environ['HTTP_CONTENT_TYPE'])
        print('stdin:', self.content)
        self.stdout.write(b"Content-type: text/html\r\n\r\n<html>foobar</html>\r\n")

To test it, we'll use an http➡fcgi proxy. We can download `http2fcgi` and run it in the background as follows:

In [None]:
p = Path('test.sock').absolute()

run('./get_http2fcgi.sh')
proc = subprocess.Popen(f'./http2fcgi -fcgi unix:///{p} -ext py'.split())

We can now test the handler by running a server in the background...

In [None]:
@threaded
def _f():
    with socketserver.UnixStreamServer(str(p), TestHandler) as server: server.handle_request()

if p.exists(): p.unlink()
t = _f()
time.sleep(0.2) # wait for server to start

...and use `curl` to test it:

In [None]:
!curl 'http://localhost:6065/setup.py?a=1' -X POST -H "Content-Type: application/json" -d test

query: a=1
content type: application/json
stdin: b'test'
<html>foobar</html>


Finally, we kill the `http2fcgi` background process.

In [None]:
proc.terminate()

### Convenience methods

In [None]:
#export
@patch
def print(self:FcgiHandler,s=""):
    "Write a `str` to `self.stdout` as bytes, converting line endings to `\r\n`"
    _print_bytes(s, self.stdout)

Instead of `self.stdout.write(...)` (which requires byte strings and `\r\n` line endings, and does not append a line ending automatically) we can use `print`:

```python
self.print("Content-type: text/html")
self.print()
self.print("<html>foobar</html>")
```

In [None]:
#export
@patch
def err(self:FcgiHandler,s=""):
    "Write a `str` to `self.stderr` as bytes, converting line endings to `\r\n`"
    _print_bytes(s, self.stderr)

For errors, you can either `write` to `stderr`:

```python
self.stderr.write(b"Something went wrong\r\n")
```

or call `error`, which is like `print`, but for `stderr`:

```python
self.error("Something went wrong")
```

## Export -

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
