Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
43d05aa
Sketch out guard log.
JosephCatrambone Jun 24, 2024
35df11c
Add some experimental code sketches.
JosephCatrambone Jun 25, 2024
9598ab3
Add docstrings. Small improvements.
JosephCatrambone Jun 25, 2024
dd02436
Update: this is no longer the approach we're going to use. We're swit…
JosephCatrambone Jun 25, 2024
2689ddc
Merge branch '0.5.0-dev' of github.com:guardrails-ai/guardrails into …
JosephCatrambone Jun 26, 2024
fda35ad
Start integration with spans.
JosephCatrambone Jun 26, 2024
fdf89ec
Integrate logging with trace.
JosephCatrambone Jun 26, 2024
2805426
Merge branch '0.5.0-dev' of github.com:guardrails-ai/guardrails into …
JosephCatrambone Jun 26, 2024
fceb95c
Output as table.
JosephCatrambone Jun 26, 2024
6c9348e
Output as table.
JosephCatrambone Jun 26, 2024
c44b573
Make sure logging works across async, multiple threads, and multiple …
JosephCatrambone Jun 27, 2024
c44799f
Move test for guard logger to the right place.
JosephCatrambone Jun 27, 2024
c7f8f79
Write to tempfile rather than local directory. Add method to truncat…
JosephCatrambone Jun 27, 2024
cd5f7c4
Reformat.
JosephCatrambone Jun 27, 2024
f49e8fc
Default to follow (by request). Remove unused log level.
JosephCatrambone Jun 27, 2024
9b1b512
Fix doctest. Update docstring.
JosephCatrambone Jun 27, 2024
f10a6d5
Format.
JosephCatrambone Jun 27, 2024
3d12e66
Relint.
JosephCatrambone Jun 27, 2024
6f1a379
Accidentally only returned Noop handler.
JosephCatrambone Jun 27, 2024
6b66bda
Remove error level and reformat.
JosephCatrambone Jun 27, 2024
5da2e5d
Linting.
JosephCatrambone Jun 27, 2024
8e2ac75
Fix lint and pyrite issues.
JosephCatrambone Jun 27, 2024
6e38053
PR Feedback: Move guard_call_logging to tracing.
JosephCatrambone Jun 27, 2024
46bc48a
PR Feeback: Move things to individual namespaced files.
JosephCatrambone Jun 27, 2024
62ae667
Move around and clean up based on PR feedback.
JosephCatrambone Jun 27, 2024
bf5c8d8
Remove the macarena from unit tests.
JosephCatrambone Jun 27, 2024
36f5177
Linting.
JosephCatrambone Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions guardrails/call_tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
For tracing (logging) and reporting the timing of Guard and Validator calls.

sqlite_trace_handler defines most of the actual implementation methods.
trace_handler provides the singleton that's used for fast global access across threads.
tracer_mixin defines the interface and can act as a noop.
trace_entry is just a helpful dataclass.
"""

from guardrails.call_tracing.trace_entry import GuardTraceEntry
from guardrails.call_tracing.trace_handler import TraceHandler

__all__ = ["GuardTraceEntry", "TraceHandler"]
231 changes: 231 additions & 0 deletions guardrails/call_tracing/sqlite_trace_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""
sqlite_trace_handler.py

This is the metaphorical bread and butter of our tracing implementation, or at least the
butter. It wraps a SQLite database and configures it to be 'agreeable' in multithreaded
situations. Normally, when sharing across threads and instances one should consider
using a larger database solution like Postgres, but in this case we only care about
_supporting_ writing from multiple places. We don't expect it will be the norm.
We care about (1) not negatively impacting performance, (2) not crashing when used in
unusual ways, and (3) not losing data when possible.

The happy path should be reasonably performant. The unhappy path should not crash.

The other part of the multithreaded support comes from the public trace_handler, which
uses a singleton pattern to only have a single instance of the database per-thread.
If we _do_ somehow end up shared across threads, the journaling settings and writeahead
should protect us from odd behavior.
"""

import datetime
import os
import sqlite3
import time
from dataclasses import asdict
from typing import Iterator

from guardrails.call_tracing.trace_entry import GuardTraceEntry
from guardrails.call_tracing.tracer_mixin import TracerMixin
from guardrails.classes.validation.validator_logs import ValidatorLogs
from guardrails.utils.casting_utils import to_string


LOG_RETENTION_LIMIT = 100000
TIME_BETWEEN_CLEANUPS = 10.0 # Seconds


# These adapters make it more convenient to add data into our log DB:
# Handle timestamp -> sqlite map:
def adapt_datetime(val):
"""Adapt datetime.datetime to Unix timestamp."""
# return val.isoformat() # If we want to go to datetime/isoformat...
return int(val.timestamp())


sqlite3.register_adapter(datetime.datetime, adapt_datetime)


def convert_timestamp(val):
"""Convert Unix epoch timestamp to datetime.datetime object."""
# To go to datetime.datetime:
# return datetime.datetime.fromisoformat(val.decode())
return datetime.datetime.fromtimestamp(int(val))


sqlite3.register_converter("timestamp", convert_timestamp)


# This structured handler shouldn't be used directly, since it's touching a SQLite db.
# Instead, use the singleton or the async singleton.
class SQLiteTraceHandler(TracerMixin):
CREATE_COMMAND = """
CREATE TABLE IF NOT EXISTS guard_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
guard_name TEXT,
start_time REAL,
end_time REAL,
prevalidate_text TEXT,
postvalidate_text TEXT,
exception_message TEXT
);
"""
INSERT_COMMAND = """
INSERT INTO guard_logs (
guard_name, start_time, end_time, prevalidate_text, postvalidate_text,
exception_message
) VALUES (
:guard_name, :start_time, :end_time, :prevalidate_text, :postvalidate_text,
:exception_message
);
"""

def __init__(self, log_path: os.PathLike, read_mode: bool):
self._log_path = log_path # Read-only value.
self.last_cleanup = time.time()
self.readonly = read_mode
if read_mode:
self.db = SQLiteTraceHandler._get_read_connection(log_path)
else:
self.db = SQLiteTraceHandler._get_write_connection(log_path)

@property
def log_path(self):
return self._log_path

@classmethod
def _get_write_connection(cls, log_path: os.PathLike) -> sqlite3.Connection:
try:
db = sqlite3.connect(
log_path,
isolation_level=None,
check_same_thread=False,
)
db.execute("PRAGMA journal_mode = wal")
db.execute("PRAGMA synchronous = OFF")
# isolation_level = None and pragma WAL means we can READ from the DB
# while threads using it are writing. Synchronous off puts us on the
# highway to the danger zone, depending on how willing we are to lose log
# messages in the event of a guard crash.
except sqlite3.OperationalError as e:
# logging.exception("Unable to connect to guard log handler.")
raise e
with db:
db.execute(SQLiteTraceHandler.CREATE_COMMAND)
return db

@classmethod
def _get_read_connection(cls, log_path: os.PathLike) -> sqlite3.Connection:
# A bit of a hack to open in read-only mode...
db = sqlite3.connect(
"file:" + str(log_path) + "?mode=ro", isolation_level=None, uri=True
)
db.row_factory = sqlite3.Row
return db

def _truncate(self, force: bool = False, keep_n: int = LOG_RETENTION_LIMIT):
assert not self.readonly
now = time.time()
if force or (now - self.last_cleanup > TIME_BETWEEN_CLEANUPS):
self.last_cleanup = now
self.db.execute(
"""
DELETE FROM guard_logs
WHERE id < (
SELECT id FROM guard_logs ORDER BY id DESC LIMIT 1 OFFSET ?
);
""",
(keep_n,),
)

def log(
self,
guard_name: str,
start_time: float,
end_time: float,
prevalidate_text: str,
postvalidate_text: str,
exception_text: str,
):
assert not self.readonly
with self.db:
self.db.execute(
SQLiteTraceHandler.INSERT_COMMAND,
dict(
guard_name=guard_name,
start_time=start_time,
end_time=end_time,
prevalidate_text=prevalidate_text,
postvalidate_text=postvalidate_text,
exception_message=exception_text,
),
)
self._truncate()

def log_entry(self, guard_log_entry: GuardTraceEntry):
assert not self.readonly
with self.db:
self.db.execute(SQLiteTraceHandler.INSERT_COMMAND, asdict(guard_log_entry))
self._truncate()

def log_validator(self, vlog: ValidatorLogs):
assert not self.readonly
maybe_outcome = (
str(vlog.validation_result.outcome)
if (
vlog.validation_result is not None
and hasattr(vlog.validation_result, "outcome")
)
else ""
)
with self.db:
self.db.execute(
SQLiteTraceHandler.INSERT_COMMAND,
dict(
guard_name=vlog.validator_name,
start_time=vlog.start_time if vlog.start_time else None,
end_time=vlog.end_time if vlog.end_time else 0.0,
prevalidate_text=to_string(vlog.value_before_validation),
postvalidate_text=to_string(vlog.value_after_validation),
exception_message=maybe_outcome,
),
)
self._truncate()

def tail_logs(
self, start_offset_idx: int = 0, follow: bool = False
) -> Iterator[GuardTraceEntry]:
"""Returns an iterator to generate GuardLogEntries.
@param start_offset_idx : Start printing entries after this IDX. If
negative, this will instead start printing the LAST start_offset_idx entries.
@param follow : If follow is True, will re-check the database for new entries
after the first batch is complete. If False (default), will return when entries
are exhausted.
"""
last_idx = start_offset_idx
cursor = self.db.cursor()
if last_idx < 0:
# We're indexing from the end, so do a quick check.
cursor.execute(
"SELECT id FROM guard_logs ORDER BY id DESC LIMIT 1 OFFSET ?;",
(-last_idx,),
)
for row in cursor:
last_idx = row["id"]
sql = """
SELECT
id, guard_name, start_time, end_time, prevalidate_text,
postvalidate_text, exception_message
FROM guard_logs
WHERE id > ?
ORDER BY start_time;
"""
cursor.execute(sql, (last_idx,))
while True:
for row in cursor:
last_entry = GuardTraceEntry(**row)
last_idx = last_entry.id
yield last_entry
if not follow:
return
# If we're here we've run out of entries to tail. Fetch more:
cursor.execute(sql, (last_idx,))
25 changes: 25 additions & 0 deletions guardrails/call_tracing/trace_entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
trace_entry.py

GuardTraceEntry is a dataclass which doesn't explicitly define the schema of our logs,
but serves as a nice, easy-to-use dataclass for when we want to manipulate things
programmatically. If performance and filtering is a concern, it's probably worth
writing the SQL directly instead of filtering these in a for-loop.
"""

from dataclasses import dataclass


@dataclass
class GuardTraceEntry:
id: int = -1
guard_name: str = ""
start_time: float = 0.0
end_time: float = 0.0
prevalidate_text: str = ""
postvalidate_text: str = ""
exception_message: str = ""

@property
def timedelta(self):
return self.end_time - self.start_time
69 changes: 69 additions & 0 deletions guardrails/call_tracing/trace_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
trace_handler.py

A set of tools to track the behavior of guards, specifically with the intent of
collating the pre/post validation text and timing of guard calls. Uses a singleton to
share write access to a SQLite database across threads.

By default, logs will be created in a temporary directory. This can be overridden by
setting GUARDRAILS_LOG_FILE_PATH in the environment. tracehandler.log_path will give
the full path of the current log file.

# Reading logs (basic):
>>> reader = TraceHandler.get_reader()
>>> for t in reader.tail_logs():
>>> print(t)

# Reading logs (advanced):
>>> reader = TraceHandler.get_reader()
>>> reader.db.execute("SELECT * FROM guard_logs;") # Arbitrary SQL support.

# Saving logs
>>> writer = TraceHandler()
>>> writer.log(
>>> "my_guard_name", 0.0, 1.0, "Raw LLM Output Text", "Sanitized", "exception?"
>>> )
"""

import os
import tempfile
import threading

from guardrails.call_tracing.sqlite_trace_handler import SQLiteTraceHandler
from guardrails.call_tracing.tracer_mixin import TracerMixin

# TODO: We should read this from guardrailsrc.
LOG_FILENAME = "guardrails_calls.db"
LOGFILE_PATH = os.environ.get(
"GUARDRAILS_LOG_FILE_PATH", # Document this environment variable.
os.path.join(tempfile.gettempdir(), LOG_FILENAME),
)


class TraceHandler(TracerMixin):
"""TraceHandler wraps the internal _SQLiteTraceHandler to make it multi-thread
safe. Coupled with some write ahead journaling in the _SyncTrace internal, we have
a faux-multi-write multi-read interface for SQLite."""

_instance = None
_lock = threading.Lock()

def __new__(cls):
if cls._instance is None:
# We run two 'if None' checks so we don't have to call the mutex check for
# the cases where there's obviously no handler. Only do a check if there
# MIGHT not be a handler instantiated.
with cls._lock:
if cls._instance is None:
cls._instance = cls._create()
return cls._instance

@classmethod
def _create(cls, path: os.PathLike = LOGFILE_PATH) -> TracerMixin: # type: ignore
return SQLiteTraceHandler(path, read_mode=False)
# To disable logging:
# return _BaseTraceHandler(path, read_mode=False)

@classmethod
def get_reader(cls, path: os.PathLike = LOGFILE_PATH) -> TracerMixin: # type: ignore
return SQLiteTraceHandler(path, read_mode=True)
36 changes: 36 additions & 0 deletions guardrails/call_tracing/tracer_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
tracer_mixin.py

This file defines our preferred tracer interface.
It has a side effect of acting as a 'noop' when we want to benchmark performance of a
tracer.
"""

import os
from typing import Iterator

from guardrails.call_tracing.trace_entry import GuardTraceEntry
from guardrails.classes.validation.validator_logs import ValidatorLogs


class TracerMixin:
"""The pads out the methods but is otherwise a noop."""

def __init__(self, log_path: os.PathLike, read_mode: bool):
self.db = None

def log(self, *args, **kwargs):
pass

def log_entry(self, guard_log_entry: GuardTraceEntry):
pass

def log_validator(self, vlog: ValidatorLogs):
pass

def tail_logs(
self,
start_offset_idx: int = 0,
follow: bool = False,
) -> Iterator[GuardTraceEntry]:
yield from []
2 changes: 2 additions & 0 deletions guardrails/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import guardrails.cli.validate # noqa
from guardrails.cli.guardrails import guardrails as cli
from guardrails.cli.hub import hub_command
from guardrails.cli.watch import watch_command # noqa: F401


cli.add_typer(
hub_command, name="hub", help="Manage validators installed from the Guardrails Hub."
Expand Down
Loading