In [1]:
# default_exp core

# core

> API details.

In [2]:
#export
import json,tweepy,hmac,hashlib,traceback,shutil,time

from fastcore.imports import *
from fastcore.foundation import *
from fastcore.utils import *
from fastcore.script import *
from fastcore.meta import *
from fastcore.test import *
from configparser import ConfigParser
from ipaddress import ip_address,ip_network
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from textwrap import dedent

## UnbufferedServer -

In [3]:
#export
class _Unbuffered(GetAttr):
    def __init__(self, stream): self.default = stream
    def write     (self, data): self.default.write     (data); self.default.flush()
    def writelines(self, data): self.default.writelines(data); self.default.flush()

In [4]:
#export
@delegates()
class UnbufferedServer(ThreadingHTTPServer):
    "A `ThreadingHTTPServer` that disables output buffering, for more convenient logging"
    def __init__(self, server_address, RequestHandlerClass, **kwargs):
        super().__init__(server_address, RequestHandlerClass, **kwargs)
        self.oldunbuf = os.getenv('PYTHONUNBUFFERED', '')
        os.environ['PYTHONUNBUFFERED'] = '1'
        self.oldout,self.olderr = sys.stdout,sys.stderr
        if hasattr(sys.stdout, 'reconfigure'): #py37+
            sys.stdout.reconfigure(line_buffering=True)
            sys.stderr.reconfigure(line_buffering=True)
        else:
            sys.stdout = _Unbuffered(sys.stdout)
            sys.stderr = _Unbuffered(sys.stderr)
        print(f"Listening on {self.server_address}")
    
    def server_close(self): 
        time.sleep(0.3) # wait for last request to finish
        sys.stdout,sys.stderr = self.oldout,self.olderr
        os.environ['PYTHONUNBUFFERED'] = self.oldunbuf

Python's `HTTPServer` (and subclasses) call methods called `do_{HTTPMETHOD}`, e.g. `do_GET`, `do_POST`, etc, in a `BaseHTTPRequestHandler` subclass that you pass in to its constructor. For instance:

In [5]:
class _TestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.end_headers()
        self.wfile.write(b'ok')

@threaded
def _test_server():
    with UnbufferedServer(('localhost',8000), _TestHandler) as httpd: httpd.handle_request()

In [6]:
_test_server()
time.sleep(0.1) # wait for server to start
test_eq(urlread("http://localhost:8000"), b'ok')

Listening on ('127.0.0.1', 8000)


127.0.0.1 - - [11/Oct/2020 13:58:17] "GET / HTTP/1.1" 200 -


## fastwebhook server

In [7]:
#export
def tweet_text(payload):
    "Send a tweet announcing release based on `payload`"
    rel_json = payload['release']
    url = rel_json['url']
    owner,repo = re.findall(r'https://api.github.com/repos/([^/]+)/([^/]+)/', url)[0]
    tweet_tmpl = "New #{repo} release: v{tag_name}. {html_url}\n\n{body}"
    res = tweet_tmpl.format(repo=repo, tag_name=rel_json['tag_name'],
                            html_url=rel_json['html_url'], body=rel_json['body'])
    if len(res)<=280: return res
    return res[:279] + "…"

In [8]:
#export
def check_sig(content, headers, secret):
    digest = hmac.new(secret, content, hashlib.sha1).hexdigest()
    assert f'sha1={digest}' == headers.get('X-Hub-Signature')

In [9]:
#export
class _RequestHandler(BaseHTTPRequestHandler):
    def _post(self):
        if self.server.check_ip:
            src_ip = ip_address(self.client_address[0])
            assert any((src_ip in wl) for wl in self.server.whitelist)
        self.send_response(200)
        self.end_headers()
        length = self.headers.get('content-length')
        if not length: return
        content = self.rfile.read(int(length))
        if self.server.debug:
            print(self.headers, content)
            return
        payload = json.loads(content.decode())
        if payload.get('action',None)=='released':
            check_sig(content, self.headers, self.server.gh_secret)
            tweet = tweet_text(payload)
            stat = self.server.api.update_status(tweet)
            print(stat.id)
        self.wfile.write(b'ok')

    def do_POST(self):
        try: self._post()
        except Exception as e: sys.stderr.write(traceback.format_exc())

    def log_message(self, fmt, *args): sys.stderr.write(fmt%args)

In [10]:
#export
@call_parse
def run_server(hostname: Param("Host name or IP", str)='localhost',
               port:     Param("Port to listen on", int)=8000,
               debug:    Param("If True, do not trigger actions, just print", bool_arg)=False,
               inifile:  Param("Path to settings ini file", str)='twitter.ini',
               check_ip: Param("Check source IP against GitHub list", bool_arg)=True,
               single_request=False):
    "Run a GitHub webhook server that tweets about new releases"
    assert os.path.exists(inifile), f"{inifile} not found"
    cfg = ConfigParser(interpolation=None)
    cfg.read([inifile])
    cfg = cfg['DEFAULT']
    auth = tweepy.OAuthHandler(cfg['consumer_key'], cfg['consumer_secret'])
    auth.set_access_token(cfg['access_token'], cfg['access_token_secret'])
    with UnbufferedServer((hostname, port), _RequestHandler) as httpd:
        httpd.gh_secret = bytes(cfg['gh_secret'], 'utf-8')
        httpd.api = tweepy.API(auth)
        httpd.whitelist = L(urljson('https://api.github.com/meta')['hooks']).map(ip_network)
        httpd.check_ip,httpd.debug = check_ip,debug
        if single_request: httpd.handle_request()
        else: httpd.serve_forever()

In [11]:
time.sleep(0.5) # wait for previous server to stop
threaded(partial(run_server, check_ip=False, debug=True, single_request=True))()

<Thread(Thread-6, started 140112752244480)>

Listening on ('127.0.0.1', 8000)


In [12]:
time.sleep(0.1)
urlread("http://localhost:8000", spam=1)

"POST / HTTP/1.1" 200 -

Accept-Encoding: identity
Content-Type: application/x-www-form-urlencoded
Content-Length: 6
Host: localhost:8000
User-Agent: Python-urllib/3.8
Connection: close

 b'spam=1'


b''

## Installer

In [14]:
#export
@call_parse
def fastwebhook_install_service(hostname: Param("Host name or IP", str)='0.0.0.0',
                    port:     Param("Port to listen on", int)=8000,
                    inifile:  Param("Path to settings ini file", str)='twitter.ini',
                    check_ip: Param("Check source IP against GitHub list", bool_arg)=True,
                    service_path: Param("Directory to write service file to", str)="/etc/systemd/system/"):
    "Install fastwebhook as a service"
    script_loc = shutil.which('fastwebhook')
    inifile = Path(inifile).absolute()
    _unitfile = dedent(f"""
    [Unit]
    Description=fastwebhook
    Wants=network-online.target
    After=network-online.target

    [Service]
    ExecStart={script_loc} --inifile {inifile} --check_ip {check_ip} --hostname {hostname} --port {port}
    Restart=always

    [Install]
    WantedBy=multi-user.target""")
    Path("fastwebhook.service").write_text(_unitfile)
    run(f"sudo cp fastwebhook.service {service_path}")

This `fastcore.script` CLI installs `fastwebhook` as a `systemd` service. Run `fastwebhook_install_service --help` in your terminal for options.

## Export -

In [15]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
