-
Notifications
You must be signed in to change notification settings - Fork 112
/
utils.py
75 lines (59 loc) · 2.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import argparse
import os
from pathlib import Path
from typing import Optional, Callable, Tuple
import dlt
import pandas as pd
import streamlit as st
from dlt.cli import echo as fmt
from dlt.pipeline.exceptions import SqlClientNotAvailable
HERE = Path(__file__).absolute().parent
def parse_args() -> Tuple[str, str]:
parser = argparse.ArgumentParser()
parser.add_argument("pipeline_name", nargs=1)
parser.add_argument(
"--pipelines-dir",
help="Pipelines working directory",
default=None,
)
known_options, _ = parser.parse_known_args()
return known_options.pipeline_name[0], known_options.pipelines_dir
def render_with_pipeline(render_func: Callable[..., None]) -> None:
pipeline_name, pipelines_dir = parse_args()
if test_pipeline_name := os.getenv("DLT_TEST_PIPELINE_NAME"):
fmt.echo(f"RUNNING TEST PIPELINE: {test_pipeline_name}")
pipeline_name = test_pipeline_name
st.session_state["pipeline_name"] = pipeline_name
# use pipelines dir from env var or try to resolve it using get_dlt_pipelines_dir
pipeline = dlt.attach(pipeline_name, pipelines_dir=pipelines_dir)
render_func(pipeline)
def query_using_cache(pipeline: dlt.Pipeline, ttl: int) -> Callable[..., Optional[pd.DataFrame]]:
@st.cache_data(ttl=ttl)
def do_query( # type: ignore[return]
query: str,
schema_name: str = None,
chunk_size: Optional[int] = None,
) -> Optional[pd.DataFrame]:
try:
with pipeline.sql_client(schema_name) as client:
with client.execute_query(query) as curr:
return curr.df(chunk_size=chunk_size)
except SqlClientNotAvailable:
st.error("🚨 Cannot load data - SqlClient not available")
return do_query # type: ignore
def query_data(
pipeline: dlt.Pipeline,
query: str,
schema_name: str = None,
chunk_size: Optional[int] = None,
) -> pd.DataFrame:
query_maker = query_using_cache(pipeline, ttl=60)
return query_maker(query, schema_name, chunk_size=chunk_size)
def query_data_live(
pipeline: dlt.Pipeline,
query: str,
schema_name: str = None,
chunk_size: Optional[int] = None,
) -> pd.DataFrame:
query_maker = query_using_cache(pipeline, ttl=5)
return query_maker(query, schema_name, chunk_size=chunk_size)