Skip to content

Commit

Permalink
feat(persistence): make launch_app runnable on tmp directory (#2851)
Browse files Browse the repository at this point in the history
* feat(persistence): make launch_app runnable on tmp directory

* fix run in process

* feat(persistence): default to running via a tmp directory

* fix type

* WIP

* fix lint

* session warning fix
  • Loading branch information
mikeldking committed Apr 19, 2024
1 parent 7c01420 commit f41e922
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/phoenix/server/app.py
Expand Up @@ -220,7 +220,7 @@ async def openapi_schema(request: Request) -> Response:


def create_app(
database: str,
database_url: str,
export_path: Path,
model: Model,
umap_params: UMAPParameters,
Expand All @@ -241,7 +241,7 @@ def create_app(
)
)
initial_batch_of_evaluations = () if initial_evaluations is None else initial_evaluations
engine = create_engine(database)
engine = create_engine(database_url)
db = _db(engine)
graphql = GraphQLWithContext(
db=db,
Expand Down
7 changes: 5 additions & 2 deletions src/phoenix/server/main.py
Expand Up @@ -126,6 +126,7 @@ def _load_items(
atexit.register(_remove_pid_file)

parser = ArgumentParser()
parser.add_argument("--database-url", required=False)
parser.add_argument("--export_path")
parser.add_argument("--host", type=str, required=False)
parser.add_argument("--port", type=int, required=False)
Expand Down Expand Up @@ -156,6 +157,9 @@ def _load_items(
)
demo_parser.add_argument("--simulate-streaming", action="store_true")
args = parser.parse_args()
db_connection_str = (
args.database_url if args.database_url else get_env_database_connection_str()
)
export_path = Path(args.export_path) if args.export_path else EXPORT_DIR
if args.command == "datasets":
primary_dataset_name = args.primary
Expand Down Expand Up @@ -237,9 +241,8 @@ def _load_items(
start_prometheus()

working_dir = get_working_dir().resolve()
db_connection_str = get_env_database_connection_str()
app = create_app(
database=db_connection_str,
database_url=db_connection_str,
export_path=export_path,
model=model,
umap_params=umap_params,
Expand Down
4 changes: 4 additions & 0 deletions src/phoenix/services.py
Expand Up @@ -107,6 +107,7 @@ class AppService(Service):

def __init__(
self,
database_url: str,
export_path: Path,
host: str,
port: int,
Expand All @@ -117,6 +118,7 @@ def __init__(
corpus_dataset_name: Optional[str],
trace_dataset_name: Optional[str],
):
self.database_url = database_url
self.export_path = export_path
self.host = host
self.port = port
Expand All @@ -133,6 +135,8 @@ def command(self) -> List[str]:
command = [
sys.executable,
"main.py",
"--database-url",
self.database_url,
"--export_path",
str(self.export_path),
"--host",
Expand Down
4 changes: 3 additions & 1 deletion src/phoenix/session/client.py
Expand Up @@ -28,6 +28,7 @@ def __init__(
self,
*,
endpoint: Optional[str] = None,
warn_if_server_not_running: bool = True,
**kwargs: Any, # for backward-compatibility
):
"""
Expand All @@ -50,7 +51,8 @@ def __init__(
)
self._session = Session()
weakref.finalize(self, self._session.close)
self._warn_if_phoenix_is_not_running()
if warn_if_server_not_running:
self._warn_if_phoenix_is_not_running()

def query_spans(
self,
Expand Down
66 changes: 49 additions & 17 deletions src/phoenix/session/session.py
Expand Up @@ -65,6 +65,10 @@
else:
_BaseList = UserList

# Temporary directory for the duration of the session
global _session_working_dir
_session_working_dir: Optional["TemporaryDirectory[str]"] = None


class NotebookEnvironment(Enum):
COLAB = "colab"
Expand Down Expand Up @@ -105,6 +109,7 @@ def __dir__(self) -> List[str]:

def __init__(
self,
database_url: str,
primary_dataset: Inferences,
reference_dataset: Optional[Inferences] = None,
corpus_dataset: Optional[Inferences] = None,
Expand All @@ -114,6 +119,7 @@ def __init__(
port: Optional[int] = None,
notebook_env: Optional[NotebookEnvironment] = None,
):
self._database_url = database_url
self.primary_dataset = primary_dataset
self.reference_dataset = reference_dataset
self.corpus_dataset = corpus_dataset
Expand All @@ -129,8 +135,7 @@ def __init__(
self.root_path = _get_root_path(self.notebook_env, self.port)
host = "127.0.0.1" if self.host == "0.0.0.0" else self.host
self._client = Client(
endpoint=f"http://{host}:{self.port}",
use_active_session_if_available=False,
endpoint=f"http://{host}:{self.port}", warn_if_server_not_running=False
)

def query_spans(
Expand Down Expand Up @@ -238,13 +243,18 @@ def url(self) -> str:
"""Returns the url for the phoenix app"""
return _get_url(self.host, self.port, self.notebook_env)

@property
def database_url(self) -> str:
return self._database_url


_session: Optional[Session] = None


class ProcessSession(Session):
def __init__(
self,
database_url: str,
primary_dataset: Inferences,
reference_dataset: Optional[Inferences] = None,
corpus_dataset: Optional[Inferences] = None,
Expand All @@ -256,6 +266,7 @@ def __init__(
notebook_env: Optional[NotebookEnvironment] = None,
) -> None:
super().__init__(
database_url=database_url,
primary_dataset=primary_dataset,
reference_dataset=reference_dataset,
corpus_dataset=corpus_dataset,
Expand All @@ -279,12 +290,13 @@ def __init__(
)
# Initialize an app service that keeps the server running
self.app_service = AppService(
self.export_path,
self.host,
self.port,
self.root_path,
self.primary_dataset.name,
umap_params_str,
database_url=database_url,
export_path=self.export_path,
host=self.host,
port=self.port,
root_path=self.root_path,
primary_dataset_name=self.primary_dataset.name,
umap_params=umap_params_str,
reference_dataset_name=(
self.reference_dataset.name if self.reference_dataset is not None else None
),
Expand All @@ -308,7 +320,7 @@ def end(self) -> None:
class ThreadSession(Session):
def __init__(
self,
database: str,
database_url: str,
primary_dataset: Inferences,
reference_dataset: Optional[Inferences] = None,
corpus_dataset: Optional[Inferences] = None,
Expand All @@ -320,6 +332,7 @@ def __init__(
notebook_env: Optional[NotebookEnvironment] = None,
):
super().__init__(
database_url=database_url,
primary_dataset=primary_dataset,
reference_dataset=reference_dataset,
corpus_dataset=corpus_dataset,
Expand Down Expand Up @@ -349,7 +362,7 @@ def __init__(
self.traces.put(pb_evaluation)
# Initialize an app service that keeps the server running
self.app = create_app(
database=database,
database_url=database_url,
export_path=self.export_path,
model=self.model,
corpus=self.corpus,
Expand Down Expand Up @@ -385,16 +398,23 @@ def delete_all(prompt_before_delete: Optional[bool] = True) -> None:
Deletes the entire contents of the working directory. This will delete, traces, evaluations,
and any other data stored in the working directory.
"""
global _session_working_dir
working_dir = get_working_dir()

# See if the working directory exists
directories_to_delete = []
if working_dir.exists():
directories_to_delete.append(working_dir)
if _session_working_dir is not None:
directories_to_delete.append(Path(_session_working_dir.name))

# Loop through directories to delete
for directory in directories_to_delete:
if prompt_before_delete:
input(
f"You have data at {working_dir}. Are you sure you want to delete?"
f"You have data at {directory}. Are you sure you want to delete?"
+ " This cannot be undone. Press Enter to delete, Escape to cancel."
)
shutil.rmtree(working_dir)
shutil.rmtree(directory)
_session_working_dir = None


def launch_app(
Expand All @@ -407,6 +427,7 @@ def launch_app(
port: Optional[int] = None,
run_in_thread: bool = True,
notebook_environment: Optional[Union[NotebookEnvironment, str]] = None,
use_temp_dir: bool = True,
) -> Optional[Session]:
"""
Launches the phoenix application and returns a session to interact with.
Expand Down Expand Up @@ -438,6 +459,10 @@ def launch_app(
The environment the notebook is running in. This is either 'local', 'colab', or 'sagemaker'.
If not provided, phoenix will try to infer the environment. This is only needed if
there is a failure to infer the environment.
use_temp_dir: bool, optional, default=True
Whether to use a temporary directory to store the data. If set to False, the data will be
stored in the directory specified by PHOENIX_WORKING_DIR environment variable via SQLite.
Returns
-------
Expand Down Expand Up @@ -511,11 +536,16 @@ def launch_app(

host = host or get_env_host()
port = port or get_env_port()
database = get_env_database_connection_str()
if use_temp_dir:
global _session_working_dir
_session_working_dir = _session_working_dir or TemporaryDirectory()
database_url = f"sqlite:///{_session_working_dir.name}/phoenix.db"
else:
database_url = get_env_database_connection_str()

if run_in_thread:
_session = ThreadSession(
database,
database_url,
primary,
reference,
corpus,
Expand All @@ -528,6 +558,7 @@ def launch_app(
# TODO: catch exceptions from thread
else:
_session = ProcessSession(
database_url,
primary,
reference,
corpus,
Expand All @@ -548,7 +579,8 @@ def launch_app(
return None

print(f"🌍 To view the Phoenix app in your browser, visit {_session.url}")
print(f"💽 Your data is being persisted to {database}")
if not use_temp_dir:
print(f"💽 Your data is being persisted to {database_url}")
print("📖 For more information on how to use Phoenix, check out https://docs.arize.com/phoenix")
return _session

Expand Down

0 comments on commit f41e922

Please sign in to comment.