Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/security scanners #49

Merged
merged 12 commits into from
May 29, 2024
57 changes: 32 additions & 25 deletions core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import asyncio
import datetime
import logging
import re
from typing import TYPE_CHECKING, Any, Self

import aiohttp
Expand All @@ -31,16 +30,18 @@

from . import utils
from .models import FileModel, PasteModel
from .scanners import SecurityInfo, Services


if TYPE_CHECKING:
_Pool = asyncpg.Pool[asyncpg.Record]
from types_.config import Github
from types_.github import PostGist
from types_.scanner import ScannerSecret
else:
_Pool = asyncpg.Pool

DISCORD_TOKEN_REGEX: re.Pattern[str] = re.compile(r"[a-zA-Z0-9_-]{23,28}\.[a-zA-Z0-9_-]{6,7}\.[a-zA-Z0-9_-]{27,}")

LOGGER: logging.Logger = logging.getLogger(__name__)


Expand All @@ -53,7 +54,7 @@ def __init__(self, *, dsn: str, session: aiohttp.ClientSession | None = None, gi
self._handling_tokens = bool(self.session and github_config)

if self._handling_tokens:
LOGGER.info("Will handle compromised discord info.")
LOGGER.info("Setup to handle Discord Tokens.")
assert github_config # guarded by if here

self._gist_token = github_config["token"]
Expand Down Expand Up @@ -83,20 +84,15 @@ async def _token_task(self) -> None:

await asyncio.sleep(self._gist_timeout)

def _handle_discord_tokens(self, *bodies: dict[str, str], paste_id: str) -> None:
formatted_bodies = "\n".join(b["content"] for b in bodies)

tokens = list(DISCORD_TOKEN_REGEX.finditer(formatted_bodies))

def _handle_discord_tokens(self, tokens: list[str], paste_id: str) -> None:
if not tokens:
EvieePy marked this conversation as resolved.
Show resolved Hide resolved
return

LOGGER.info(
"Discord bot token located and added to token bucket. Current bucket size is: %s", len(self.__tokens_bucket)
)

tokens = "\n".join([m[0] for m in tokens])
self.__tokens_bucket[paste_id] = tokens
self.__tokens_bucket[paste_id] = "\n".join(tokens)

async def _post_gist_of_tokens(self) -> None:
assert self.session # guarded in caller
Expand Down Expand Up @@ -211,8 +207,8 @@ async def create_paste(self, *, data: dict[str, Any]) -> PasteModel:
"""

file_query: str = """
INSERT INTO files (parent_id, content, filename, loc, annotation)
VALUES ($1, $2, $3, $4, $5)
INSERT INTO files (parent_id, content, filename, loc, annotation, warning_positions)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"""

Expand Down Expand Up @@ -246,28 +242,39 @@ async def create_paste(self, *, data: dict[str, Any]) -> PasteModel:
name: str = (file.get("filename") or f"file_{index}")[-CONFIG["PASTES"]["name_limit"] :]
name = "_".join(name.splitlines())

content: str = file["content"]
# Normalise newlines...
content: str = file["content"].replace("\r\n", "\n").replace("\r", "\n")
loc: int = file["content"].count("\n") + 1
annotation: str = ""

tokens = [t for t in utils.TOKEN_REGEX.findall(content) if utils.validate_discord_token(t)]
if tokens:
annotation = "Contains possibly sensitive information: Discord Token(s)"
if not password:
annotation += ", which have now been invalidated."
positions: list[int] = []
extra: str = ""

secrets: list[ScannerSecret] = SecurityInfo.scan_file(content)
for payload in secrets:
service: Services = payload["service"]

extra += f"{service.value}, "
positions += [t[0] for t in payload["tokens"]]

if not password and self._handling_tokens and service is Services.discord:
self._handle_discord_tokens(tokens=[t[1] for t in payload["tokens"]], paste_id=paste.id)

extra = extra.removesuffix(", ")
annotation = f"Contains possibly sensitive data from: {extra}" if extra else ""

row: asyncpg.Record | None = await connection.fetchrow(
file_query, paste.id, content, name, loc, annotation
file_query,
paste.id,
content,
name,
loc,
annotation,
sorted(positions),
)

if row:
paste.files.append(FileModel(row))

if not password:
# if the user didn't provide a password (a public paste)
# we check for discord tokens
self._handle_discord_tokens(*data["files"], paste_id=paste.id)

return paste

async def fetch_paste_security(self, *, token: str) -> PasteModel | None:
Expand Down
1 change: 1 addition & 0 deletions core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, record: asyncpg.Record | dict[str, Any]) -> None:
self.charcount: int = record["charcount"]
self.index: int = record["file_index"]
self.annotation: str = record["annotation"]
self.warning_positions: list[int] = record["warning_positions"]


class PasteModel(BaseModel):
Expand Down
128 changes: 128 additions & 0 deletions core/scanners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""MystBin. Share code easily.

Copyright (C) 2020-Current PythonistaGuild

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

import base64
import binascii
import enum
import logging
import re
from typing import ClassVar

from types_.scanner import ScannerSecret
EvieePy marked this conversation as resolved.
Show resolved Hide resolved


logger: logging.Logger = logging.getLogger(__name__)
EvieePy marked this conversation as resolved.
Show resolved Hide resolved


class Services(enum.Enum):
discord = "Discord"
pypi = "PyPi"
github = "GitHub"


class BaseScanner:
REGEX: ClassVar[re.Pattern[str]]
SERVICE: Services
EvieePy marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def match(cls, content: str) -> ScannerSecret:
matches: list[tuple[int, str]] = [(m.start(0), m.group(0)) for m in cls.REGEX.finditer(content)]

payload: ScannerSecret = {
"service": cls.SERVICE,
"tokens": matches,
}

return payload


class DiscordScanner(BaseScanner):
REGEX = re.compile(r"[a-zA-Z0-9_-]{23,28}\.[a-zA-Z0-9_-]{6,7}\.[a-zA-Z0-9_-]{27,}")
SERVICE = Services.discord

@staticmethod
def validate_discord_token(token: str) -> bool:
try:
# Just check if the first part validates as a user ID
(user_id, _, _) = token.split(".")
user_id = int(base64.b64decode(user_id + "==", validate=True))
except (ValueError, binascii.Error):
return False
else:
return True

@classmethod
def match(cls, content: str) -> ScannerSecret:
matches: list[tuple[int, str]] = [
(m.start(0), m.group(0)) for m in cls.REGEX.finditer(content) if cls.validate_discord_token(m.group(0))
]

payload: ScannerSecret = {
"service": cls.SERVICE,
"tokens": matches,
}

return payload


class PyPiScanner(BaseScanner):
REGEX = re.compile(r"pypi-AgEIcHlwaS5vcmc[A-Za-z0-9-_]{70,}")
SERVICE = Services.pypi


class GitHubScanner(BaseScanner):
REGEX = re.compile(r"((ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9_]{36})")
SERVICE = Services.github


class SecurityInfo:
__SERVICE_MAPPING: ClassVar[dict[Services, type[BaseScanner]]] = {
Services.discord: DiscordScanner,
Services.pypi: PyPiScanner,
Services.github: GitHubScanner,
}

@classmethod
def scan_file(
cls,
file: str,
/,
*,
allowed: list[Services] = [],
disallowed: list[Services] = [],
) -> list[ScannerSecret]:
"""Scan for tokens in a given files content.
EvieePy marked this conversation as resolved.
Show resolved Hide resolved

You may pass a list of allowed or disallowed Services.
If both lists are empty (Default) all available services will be scanned.
"""
allowed = allowed if allowed else list(Services)
EvieePy marked this conversation as resolved.
Show resolved Hide resolved
services: list[Services] = [s for s in allowed if s not in disallowed]
secrets: list[ScannerSecret] = []

for service in services:
scanner: type[BaseScanner] | None = cls.__SERVICE_MAPPING.get(service, None)
if not scanner:
logging.warning("The provided service %r is not a supported or a valid service.", service)
continue

found: ScannerSecret = scanner.match(file)
if found["tokens"]:
secrets.append(found)

return secrets
1 change: 1 addition & 0 deletions migration.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ALTER TABLE files ALTER COLUMN filename SET NOT NULL; -- always require filenam
ALTER TABLE files DROP COLUMN IF EXISTS attachment; -- we don't have these anymore
ALTER TABLE files ADD COLUMN IF NOT EXISTS annotation TEXT;
ALTER TABLE files RENAME COLUMN index TO file_index; -- bad column name
ALTER TABLE files ADD COLUMN IF NOT EXISTS warning_positions INTEGER[]; -- New line warning positions

SAVEPOINT drops;
DROP TABLE IF EXISTS bans CASCADE; -- no longer needed
Expand Down
1 change: 1 addition & 0 deletions schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ CREATE TABLE IF NOT EXISTS files (
charcount INTEGER GENERATED ALWAYS AS (LENGTH(content)) STORED,
file_index SERIAL NOT NULL,
annotation TEXT,
warning_positions INTEGER[],
PRIMARY KEY (parent_id, file_index)
);
30 changes: 30 additions & 0 deletions types_/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""MystBin. Share code easily.

Copyright (C) 2020-Current PythonistaGuild

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, TypedDict


if TYPE_CHECKING:
from core.scanners import Services


class ScannerSecret(TypedDict):
service: Services
tokens: list[tuple[int, str]]
42 changes: 38 additions & 4 deletions views/htmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,46 @@ def highlight_code(self, *, files: list[dict[str, Any]]) -> str:

raw_url: str = f'/raw/{file["parent_id"]}'
annotation: str = file["annotation"]
positions: list[int] = file.get("warning_positions", [])
original: str = file["content"]

content = bleach.clean(
file["content"].replace("<!", "&lt;&#33;"), attributes=[], tags=[], strip_comments=False
parts: list[str] = annotation.split(":")
annotation = parts.pop(0)

extra: str = (
f"""<span class="annotationSecond" data-text="Discord tokens will be invalidated automatically">{parts[0]}"""
if parts
else ""
)
annotations: str = (
f'<small class="annotations">❌ {annotation}{": " + extra if extra else ""}</small>'
if annotation
else ""
)
annotations: str = f'<small class="annotations">❌ {annotation}</small>' if annotation else ""

position: int = 0
next_pos: int | None = positions.pop(0) if positions else None

numbers: list[str] = []
for n, line in enumerate(original.splitlines(), 1):
length: int = len(line)

if next_pos is not None and position <= next_pos <= position + length:
numbers.append(f"""<tr><td class="lineNumRow">{n}</td><td class="lineWarn"></td></tr>""")

try:
next_pos = positions.pop(0)
except IndexError:
next_pos = None

else:
numbers.append(f"""<tr><td class="lineNumRow">{n}</td></tr>""")

position += length + 1

content = bleach.clean(original.replace("<!", "&lt;&#33;"), attributes=[], tags=[], strip_comments=False)

lines: str = f"""<table class="lineNums"><tbody>\n{"".join(numbers)}\n</tbody></table>"""
html += f"""
<div id="__paste_a_{index}" class="pasteArea">
<div class="pasteHeader">
Expand All @@ -72,7 +106,7 @@ def highlight_code(self, *, files: list[dict[str, Any]]) -> str:
</div>
</div>
{annotations}
<pre id="__paste_c_{index}" class="fileContent" style="display: flex; flex-grow: 1;"><code>{content}</code></pre>
<pre id="__paste_c_{index}" class="fileContent" style="display: flex; flex-grow: 1;">{lines}<code>{content}</code></pre>
</div>"""

return html
Expand Down
2 changes: 1 addition & 1 deletion web/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<!-- STYLESHEETS -->
<!-- <link rel="preload" href="static/styles/global.css" as="style" /> -->
<!-- <link rel="preload" href="static/styles/highlights.css" as="style" /> -->
<link rel="stylesheet" type="text/css" href="/static/styles/global.css?v=5" />
<link rel="stylesheet" type="text/css" href="/static/styles/global.css?v=6" />

<!-- FONTS -->
<link rel="preconnect" href="https://fonts.googleapis.com">
Expand Down
2 changes: 1 addition & 1 deletion web/maint.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<!-- STYLESHEETS -->
<!-- <link rel="preload" href="static/styles/global.css" as="style" /> -->
<link rel="stylesheet" type="text/css" href="static/styles/global.css?v=5" />
<link rel="stylesheet" type="text/css" href="static/styles/global.css?v=6" />


<!-- FONTS -->
Expand Down
Loading
Loading