Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Database Component #14995

Merged
merged 45 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
2b2f33d
update
tchaton Oct 4, 2022
4217fa9
Merge branch 'master' into upstream_hpo
tchaton Oct 4, 2022
895c88e
update
tchaton Oct 5, 2022
2329e09
update
tchaton Oct 5, 2022
d46a93a
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 5, 2022
d474a48
Merge branch 'master' into upstream_hpo
tchaton Oct 5, 2022
3ce549a
update
tchaton Oct 5, 2022
b2b0810
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 5, 2022
a74b7be
Merge branch 'master' into upstream_hpo
tchaton Oct 5, 2022
c9975fe
update
tchaton Oct 5, 2022
bce25f4
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 5, 2022
5184e57
Merge branch 'master' into upstream_hpo
tchaton Oct 7, 2022
fb3f4fb
update
tchaton Oct 12, 2022
b2c6313
update
tchaton Oct 12, 2022
49deba1
update
tchaton Oct 12, 2022
bc66784
Merge branch 'master' into upstream_hpo
tchaton Oct 12, 2022
39dab97
update
tchaton Oct 13, 2022
dd240d9
update
tchaton Oct 13, 2022
c437b5c
Merge branch 'master' into upstream_hpo
tchaton Oct 13, 2022
2c6bf85
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 13, 2022
44ead03
Merge branch 'master' into upstream_hpo
tchaton Oct 13, 2022
0e3d6e7
update
tchaton Oct 13, 2022
4592b98
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 13, 2022
e621cf7
Merge branch 'master' into upstream_hpo
tchaton Oct 13, 2022
fbfaa0b
Merge branch 'master' into upstream_hpo
tchaton Oct 13, 2022
2ba8d0d
Merge branch 'master' into upstream_hpo
tchaton Oct 13, 2022
1fc5e0e
Merge branch 'master' into upstream_hpo
tchaton Oct 18, 2022
a644519
Merge branch 'master' into upstream_hpo
tchaton Oct 18, 2022
34e50c7
update
tchaton Oct 18, 2022
e4ba042
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 18, 2022
e3b48d4
update
tchaton Oct 18, 2022
74dee33
update
tchaton Oct 18, 2022
be343d6
update
tchaton Oct 18, 2022
5c3d4c4
update
tchaton Oct 18, 2022
bc599ae
Merge branch 'master' into upstream_hpo
tchaton Oct 18, 2022
7864ba4
update
tchaton Oct 19, 2022
1ee170e
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 19, 2022
a297fe6
Merge branch 'master' into upstream_hpo
tchaton Oct 19, 2022
ed72450
Merge branch 'master' into upstream_hpo
tchaton Oct 19, 2022
1b6d17e
Merge branch 'master' into upstream_hpo
tchaton Oct 19, 2022
15ebc05
update
tchaton Oct 19, 2022
cc1328a
Merge branch 'upstream_hpo' of https://github.com/Lightning-AI/lightn…
tchaton Oct 19, 2022
7eb1c02
update
tchaton Oct 19, 2022
f3ad290
Merge branch 'master' into upstream_hpo
tchaton Oct 19, 2022
b729421
Merge branch 'master' into upstream_hpo
tchaton Oct 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/app_components/python/component_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def on_train_start(self, trainer, pl_module) -> None:
print("Even the Lightning Work is available and state transfer works !")
print(self.lightning_work)

def on_batch_end(self, trainer, *_) -> None:
def on_train_batch_end(self, trainer, *_) -> None:
# On every batch end, collects some information.
# This is communicated automatically to the rest of the app,
# so you can track your training in real time in the Lightning App UI.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ exclude = [
"src/lightning_app/cli/pl-app-template",
"src/lightning_app/cli/react-ui-template",
"src/lightning_app/cli/app-template",
"src/lightning_app/components/database",
]
install_types = "True"
non_interactive = "True"
Expand Down
1 change: 1 addition & 0 deletions requirements/app/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ trio<0.22.0
pympler
psutil
setuptools<=59.5.0
sqlmodel
requests-mock
4 changes: 4 additions & 0 deletions src/lightning_app/components/database/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from lightning_app.components.database.client import DatabaseClient
from lightning_app.components.database.server import Database

__all__ = ["Database", "DatabaseClient"]
76 changes: 76 additions & 0 deletions src/lightning_app/components/database/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from typing import Any, Dict, List, Optional, Type, TypeVar

import requests
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from lightning_app.components.database.utilities import GeneralModel

_CONNECTION_RETRY_TOTAL = 5
_CONNECTION_RETRY_BACKOFF_FACTOR = 1


def _configure_session() -> Session:
"""Configures the session for GET and POST requests.

It enables a generous retrial strategy that waits for the application server to connect.
"""
retry_strategy = Retry(
# wait time between retries increases exponentially according to: backoff_factor * (2 ** (retry - 1))
total=_CONNECTION_RETRY_TOTAL,
backoff_factor=_CONNECTION_RETRY_BACKOFF_FACTOR,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
return http


T = TypeVar("T")


class DatabaseClient:
def __init__(self, db_url: str, token: Optional[str] = None, model: Optional[T] = None) -> None:
self.db_url = db_url
self.model = model
self.token = token or ""
self._session = None

def select_all(self, model: Optional[Type[T]] = None) -> List[T]:
cls = model if model else self.model
resp = self.session.post(self.db_url + "/select_all/", data=GeneralModel.from_cls(cls, token=self.token).json())
assert resp.status_code == 200
return [cls(**data) for data in resp.json()]

def insert(self, model: T) -> None:
resp = self.session.post(
self.db_url + "/insert/",
data=GeneralModel.from_obj(model, token=self.token).json(),
)
assert resp.status_code == 200

def update(self, model: T) -> None:
resp = self.session.post(
self.db_url + "/update/",
data=GeneralModel.from_obj(model, token=self.token).json(),
)
assert resp.status_code == 200

def delete(self, model: T) -> None:
resp = self.session.post(
self.db_url + "/delete/",
data=GeneralModel.from_obj(model, token=self.token).json(),
)
assert resp.status_code == 200

@property
def session(self):
if self._session is None:
self._session = _configure_session()
return self._session

def to_dict(self) -> Dict[str, Any]:
return {"db_url": self.db_url, "model": self.model.__name__ if self.model else None}
157 changes: 157 additions & 0 deletions src/lightning_app/components/database/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import asyncio
import os
import sys
from typing import List, Optional, Type, Union

import uvicorn
from fastapi import FastAPI
from uvicorn import run

from lightning import BuildConfig, LightningWork
from lightning_app.components.database.utilities import create_database, Delete, Insert, SelectAll, Update
from lightning_app.storage import Drive
from lightning_app.utilities.app_helpers import Logger
from lightning_app.utilities.imports import _is_sqlmodel_available

if _is_sqlmodel_available():
from sqlmodel import SQLModel


logger = Logger(__name__)
engine = None
tchaton marked this conversation as resolved.
Show resolved Hide resolved


# Required to avoid Uvicorn Server overriding Lightning App signal handlers.
# Discussions: https://github.com/encode/uvicorn/discussions/1708
class DatabaseUvicornServer(uvicorn.Server):

has_started_queue = None

def run(self, sockets=None):
self.config.setup_event_loop()
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.serve(sockets=sockets))
loop.run_forever()

def install_signal_handlers(self):
"""Ignore Uvicorn Signal Handlers."""


class Database(LightningWork):
def __init__(
self,
models: Union[Type["SQLModel"], List[Type["SQLModel"]]],
db_filename: str = "database.db",
debug: bool = False,
) -> None:
"""The Database Component enables to interact with an SQLite database to store some structured information
about your application.

The provided models are SQLModel tables

Arguments:
models: A SQLModel or a list of SQLModels table to be added to the database.
db_filename: The name of the SQLite database.
debug: Whether to run the database in debug mode.
mode: Whether the database should be running within the flow or dedicated work.
token: Token used to protect the database access. Ensure you don't expose it through the App State.
tchaton marked this conversation as resolved.
Show resolved Hide resolved

Example:

from sqlmodel import SQLModel, Field
from lightning import LightningFlow, LightningApp
from lightning_app.components.database import Database, DatabaseClient

class CounterModel(SQLModel, table=True):
__table_args__ = {"extend_existing": True}

id: int = Field(default=None, primary_key=True)
count: int


class Flow(LightningFlow):

def __init__(self):
super().__init__()
self.db = Database(models=[CounterModel])
self._client = None
self.counter = 0

def run(self):
self.db.run()

if not self.db.alive():
return

if self.counter == 0:
self._client = DatabaseClient(model=CounterModel, db_url=self.db.url)
self._client.reset_database()

assert self._client

rows = self._client.select_all()

print(f"{self.counter}: {rows}")

if not rows:
self._client.insert(CounterModel(count=0))
else:
row: CounterModel = rows[0]
row.count += 1
self._client.update(row)

if self.counter >= 100:
row: CounterModel = rows[0]
self._client.delete(row)
self._client.delete_database()
self._exit()

self.counter += 1

app = LightningApp(Flow())
"""
super().__init__(parallel=True, cloud_build_config=BuildConfig(["sqlmodel"]))
self.db_filename = db_filename
self.debug = debug
self._models = models if isinstance(models, list) else [models]
self.drive = None

def run(self, token: Optional[str] = None) -> None:
"""
Arguments:
token: Token used to protect the database access. Ensure you don't expose it through the App State.
"""
self.drive = Drive("lit://database")
if self.drive.list(component_name=self.name):
self.drive.get(self.db_filename)
print("Retrieved the database from Drive.")

app = FastAPI()

create_database(self.db_filename, self._models, self.debug)
models = {m.__name__: m for m in self._models}
app.post("/select_all/")(SelectAll(models, token))
app.post("/insert/")(Insert(models, token))
app.post("/update/")(Update(models, token))
app.post("/delete/")(Delete(models, token))

sys.modules["uvicorn.main"].Server = DatabaseUvicornServer

run(app, host=self.host, port=self.port, log_level="error")

def alive(self) -> bool:
"""Hack: Returns whether the server is alive."""
return self.db_url != ""

@property
def db_url(self) -> Optional[str]:
use_localhost = "LIGHTNING_APP_STATE_URL" not in os.environ
if use_localhost:
return self.url
if self.internal_ip != "":
return f"http://{self.internal_ip}:{self.port}"
return self.internal_ip

def on_exit(self):
self.drive.put(self.db_filename)
print("Stored the database to the Drive.")
tchaton marked this conversation as resolved.
Show resolved Hide resolved