Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ build/
tags
__pycache__/
*.pyc
*.profile
150 changes: 150 additions & 0 deletions examples/poor_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""PoorSession example.

Demonstrates ``PoorSession`` — a self-contained encrypted and authenticated
session cookie. User data is stored as a dictionary directly in the cookie;
no server-side session store is needed. No external dependencies are
required.

Run::

python examples/poor_session.py

Then open http://127.0.0.1:8080 in your browser.
Login with any non-empty username and password ``secret``.
"""
import logging
from functools import wraps
from os import path, urandom
from sys import path as python_path

python_path.insert(
0, path.abspath(path.join(path.dirname(__file__), path.pardir)))

# pylint: disable=wrong-import-position
from poorwsgi import Application, state # noqa
from poorwsgi.response import HTTPException, RedirectResponse, redirect # noqa
from poorwsgi.session import PoorSession, SessionError # noqa

# pylint: disable=unused-argument

logging.getLogger().setLevel("DEBUG")

app = application = Application(__name__) # pylint: disable=invalid-name
app.debug = True
app.secret_key = urandom(32)

PASSWORD = "secret" # nosec # noqa: S105


# --- helpers -----------------------------------------------------------------

def get_header(title):
"""Returns HTML page header lines."""
return (
"<html>", "<head>",
'<meta http-equiv="content-type" content="text/html; charset=utf-8"/>',
f"<title>{title}</title>", "</head>", "<body>",
f"<h1>{title}</h1>")


def get_footer():
"""Returns HTML page footer lines."""
return ("<hr>",
"<small>PoorSession example — PoorWSGI</small>",
"</body>", "</html>")


def check_login(fn):
"""Decorator that reads and verifies the PoorSession cookie.

Sets ``req.login`` to the stored username on success, otherwise
redirects to ``/login``.
"""
@wraps(fn)
def handler(req):
session = PoorSession(app.secret_key)
try:
session.load(req.cookies)
except SessionError:
redirect("/login", message=b"Invalid session")
username = session.data.get("user")
if not username:
redirect("/login", message=b"Login required")
req.login = username
return fn(req)
return handler


# --- routes ------------------------------------------------------------------

@app.route('/')
def root(_req):
"""Index page with navigation."""
body = ('<ul>',
'<li><a href="/login">/login</a> — log in</li>',
'<li><a href="/private">/private</a> — protected page</li>',
'<li><a href="/logout">/logout</a> — log out</li>',
'</ul>')
for line in get_header("PoorSession demo") + body + get_footer():
yield line.encode() + b'\n'


@app.route('/login')
def login_form(_req):
"""GET: show login form."""
form = ('<form method="post">',
'<label>Username: '
'<input type="text" name="username"/></label><br/>',
'<label>Password: <input type="password" name="password"/>'
'</label><br/>',
'<button type="submit">Login</button>', '</form>',
'<p><em>Password is: secret</em></p>')
for line in get_header("Login") + form + get_footer():
yield line.encode() + b'\n'


@app.route('/login', method=state.METHOD_POST)
def login_post(req):
"""POST: validate credentials and store username in encrypted cookie."""
username = req.form.getfirst('username', func=str) or ''
password = req.form.getfirst('password', func=str) or ''

if not username or password != PASSWORD:
redirect('/login')

response = RedirectResponse("/private")
session = PoorSession(app.secret_key, secure=False, same_site="Lax")
session.data["user"] = username
session.write()
session.header(response)
raise HTTPException(response)


@app.route('/private')
@check_login
def private(req):
"""A protected page — only accessible after login."""
body = (f'<p>Hello, <strong>{req.login}</strong>!</p>',
'<p>Your session is stored in an encrypted cookie '
'(no external dependencies).</p>',
'<p><a href="/logout">Log out</a></p>')
for line in get_header("Private page") + body + get_footer():
yield line.encode() + b'\n'


@app.route('/logout')
def logout(_req):
"""Clears the session cookie."""
response = RedirectResponse("/login")
session = PoorSession(app.secret_key)
session.destroy()
session.header(response)
raise HTTPException(response)


if __name__ == '__main__':
from wsgiref.simple_server import make_server # noqa: E402
httpd = make_server( # pylint: disable=invalid-name
'127.0.0.1', 8080, app)
logging.info("Starting to serve on http://127.0.0.1:8080")
httpd.serve_forever()
58 changes: 4 additions & 54 deletions examples/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import os
from base64 import decodebytes, encodebytes, urlsafe_b64encode
from collections import OrderedDict
from functools import wraps
from hashlib import md5
from io import BytesIO
from io import FileIO as file
Expand All @@ -23,16 +22,15 @@
0, os.path.abspath(os.path.join(EXAMPLES_PATH, os.path.pardir)))

# pylint: disable=import-error, wrong-import-position
from poorwsgi import Application, redirect, state # noqa
from poorwsgi import Application, state # noqa
from poorwsgi.fieldstorage import FieldStorageParser # noqa
from poorwsgi.headers import http_to_time, parse_range, time_to_http # noqa
from poorwsgi.response import FileResponse # noqa
from poorwsgi.response import HTTPException # noqa
from poorwsgi.response import (FileObjResponse, GeneratorResponse, # noqa
NoContentResponse, NotModifiedResponse,
PartialResponse, RedirectResponse, Response)
PartialResponse, Response)
from poorwsgi.results import html_escape, not_modified # noqa
from poorwsgi.session import PoorSession, SessionError # noqa

try:
import uwsgi # type: ignore
Expand All @@ -46,7 +44,6 @@
app.debug = True
app.document_root = '.'
app.document_index = True
app.secret_key = os.urandom(32) # random key each run


class MyValueError(ValueError):
Expand Down Expand Up @@ -159,27 +156,6 @@ def get_variables(req):
app.set_filter('email', r'[\w\.\-]+@[\w\.\-]+')


def check_login(fun):
"""Checks the session cookie."""

@wraps(fun)
def handler(req):
session = PoorSession(app.secret_key)
try:
session.load(req.cookies)
except SessionError:
pass
if 'login' not in session.data:
log.info('Login cookie not found.')
redirect(
"/",
message="Login required",
)
return fun(req)

return handler


@app.route('/')
def root(req):
"""Returns the root index page."""
Expand All @@ -204,12 +180,10 @@ def root(req):
' - Testing variable args</li>',
'<li><a href="/test/headers">/test/headers</a> - Testing Headers'
'</li>',
'<li><a href="/login">/login</a> - Create login session</li>',
'<li><a href="/logout">/logout</a> - Destroy login session</li>',
'<li><a href="/test/form">/test/form</a>'
' - Testing http form (only if you have login cookie / session)</li>',
' - Testing http form</li>',
'<li><a href="/test/upload">/test/upload</a> - '
'Testing file upload (only if you have login cookie / session)</li>',
'Testing file upload</li>',
'<li><a href="/debug-info">/debug-info</a>'
' - Debug Page (only if poor_Debug is set)</li>',
'<li><a href="/no-page">/no-page</a> - No Exist Page</li>',
Expand Down Expand Up @@ -346,30 +320,7 @@ def test_varargs(req, *args):
return response


@app.route('/login')
def login(req):
"""Creates a login session cookie."""
log.debug("Input cookies: %s", repr(req.cookies))
cookie = PoorSession(app.secret_key)
cookie.data['login'] = True
response = RedirectResponse('/')
cookie.header(response)
return response


@app.route('/logout')
def logout(req):
"""Destroys the login session cookie."""
log.debug("Input cookies: %s", repr(req.cookies))
cookie = PoorSession(app.secret_key)
cookie.destroy()
response = RedirectResponse('/')
cookie.header(response)
return response


@app.route('/test/form', method=state.METHOD_GET_POST)
@check_login
def test_form(req):
"""A form example."""
# pylint: disable=consider-using-f-string
Expand Down Expand Up @@ -448,7 +399,6 @@ def test_form(req):


@app.route('/test/upload', method=state.METHOD_GET_POST)
@check_login
def test_upload(req):
"""A file upload example."""
var_info = OrderedDict((
Expand Down
8 changes: 4 additions & 4 deletions poorwsgi/fieldstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from io import BytesIO, StringIO, TextIOWrapper
from typing import Any, Callable, Optional, Union

from poorwsgi.headers import parse_header
from poorwsgi.headers import parse_header, Headers

_RE_STR_BOUNDARY = re.compile("^[ -~]{0,200}[!-~]$")
_RE_BIN_BOUNDARY = re.compile(b"^[ -~]{0,200}[!-~]$")
Expand Down Expand Up @@ -425,7 +425,7 @@ class FieldStorageParser:
"""
BUFSIZE = 8*1024 # buffering size for copy to file and storing StringIO

def __init__(self, input_=None, headers=None, outerboundary=b'',
def __init__(self, input_=None, headers=None | Headers, outerboundary=b'',
keep_blank_values=0, strict_parsing=0,
limit=None, encoding='utf-8', errors='replace',
max_num_fields=None, separator='&', file_callback=None):
Expand Down Expand Up @@ -477,7 +477,7 @@ def __init__(self, input_=None, headers=None, outerboundary=b'',
allows you to write a file from the request directly to its
destination without temporary files.
"""
self.headers = headers
self.headers = headers or Headers()
self.outerboundary = outerboundary
self.keep_blank_values = keep_blank_values
self.strict_parsing = strict_parsing
Expand Down Expand Up @@ -551,7 +551,7 @@ def parse(self) -> FieldStorage:
try:
clen = int(self.headers['content-length'])
except ValueError:
pass
pass # clen will be still -1 if value is not integer
field.length = self.length = clen
if self.limit is None and clen >= 0:
self.limit = clen
Expand Down
16 changes: 8 additions & 8 deletions poorwsgi/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
:Classes: NoCompress, Session, PoorSession
:Functions: hidden, encrypt, decrypt, get_token, check_token

:class:`Session` is a plain cookie wrapper suitable for storing a server-side
``Session`` is a plain cookie wrapper suitable for storing a server-side
session ID or a JWT. No encryption is applied; the value is stored verbatim.

:class:`PoorSession` is a self-contained encrypted session cookie.
``PoorSession`` is a self-contained encrypted session cookie.

Cookie format for PoorSession: ``base64(ciphertext).base64(hmac-sha256)``

Expand Down Expand Up @@ -158,7 +158,7 @@ class Session:
The cookie is always set with ``HttpOnly=True``. Use ``secure=True``
when serving over HTTPS.

This class is also the base class for :class:`PoorSession`.
This class is also the base class for ``PoorSession``.

.. code:: python

Expand Down Expand Up @@ -213,7 +213,7 @@ def __init__(self, expires: int = 0, max_age: Optional[int] = None,
def _apply_cookie_attrs(self):
"""Apply security and configuration attributes to the session cookie.

Called by :meth:`write` and subclass overrides of :meth:`write`.
Called by ``write`` and subclass overrides of ``write``.
Sets ``HttpOnly``, ``Domain``, ``Path``, ``Secure``, ``SameSite``,
``Expires``, and ``Max-Age`` as configured.
"""
Expand All @@ -234,17 +234,17 @@ def _apply_cookie_attrs(self):
def load(self, cookies: Optional[SimpleCookie]):
"""Load the session value from the request's cookies.

Sets :attr:`data` to the raw cookie string, or leaves it as ``""``
Sets ``data`` to the raw cookie string, or leaves it as ``""``
if the cookie is absent or empty.
"""
if not isinstance(cookies, SimpleCookie) or self._sid not in cookies:
return
self.data = cookies[self._sid].value

def write(self) -> str:
"""Store :attr:`data` to the cookie value.
"""Store ``data`` to the cookie value.

This method is called automatically by :meth:`header`.
This method is called automatically by ``header``.
Returns the raw string written to the cookie.
"""
raw = self.data if isinstance(self.data, str) else str(self.data)
Expand Down Expand Up @@ -458,7 +458,7 @@ def load(self, cookies: Optional[SimpleCookie]):
def write(self) -> str:
"""Encrypt and sign the session data, write to cookie.

This method is called automatically by :meth:`header`.
This method is called automatically by ``header``.
"""
payload = self.__cps.compress(
encrypt(hidden(dumps(self.data), self.__secret_hash),
Expand Down
5 changes: 1 addition & 4 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Unit tests for Session and PoorSession classes."""
from os import urandom
from sys import version_info
from http.cookies import SimpleCookie, Morsel
from typing import Any

from pytest import fixture, raises, mark
from pytest import fixture, raises

from poorwsgi.session import Session, PoorSession, SessionError

Expand Down Expand Up @@ -122,8 +121,6 @@ def test_load_missing_sid(self):
assert session.data == {}


@mark.skipif(version_info.minor < 8,
reason="SameSite is supported from Python 3.8")
class TestSameSite:
"""Tests for the PoorSession same_site option."""

Expand Down
Loading
Loading