# Essential: Static file management with SourceLoader

Data pipelines usually interact with external systems such as SQL databases. Using relative paths to find such files is error-prone as the path to the file depends on the file loading it, on the other hand, absolute paths are to restrictive, the path will only work in your current environment but will break others. Combining `Env` with `SourceLoader` provides a clean approach for managing static files.

In [1]:
from pathlib import Path

import pandas as pd
from sklearn import datasets
from IPython.display import display, Markdown

from ploomber import DAG, SourceLoader, with_env
from ploomber.tasks import PythonCallable, NotebookRunner, SQLUpload, SQLScript
from ploomber.products import File, SQLiteRelation
from ploomber.clients import SQLAlchemyClient
from ploomber.executors import Serial

In [2]:
# initialize a temporary directory
import tempfile
import os
tmp_dir = Path(tempfile.mkdtemp())
tmp_dir_static = tmp_dir / 'static'
tmp_dir_static.mkdir()
os.chdir(str(tmp_dir))

In [3]:
report_py = """
# static/report.py

# +
# This file is in jupytext light format
import seaborn as sns
import pandas as pd
# -

# + tags=['parameters']
# papermill will add the parameters below this cell
upstream = None
product = None
# -

# +
path = upstream['raw']
df = pd.read_parquet(path)
# -

# ## HouseAge distribution

# +
_ = sns.distplot(df.HouseAge)
# -

# ## Price distribution

# +
_ = sns.distplot(df.price)
# -
"""

clean_table_sql = """
-- static/clean_table.sql

DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}}
AS SELECT * FROM {{upstream["raw_table"]}}
WHERE HouseAge < 100
"""

env_yaml = """
_module: '{{here}}'

path:
    data: '{{here}}/data/'
    static: '{{here}}/static/'
"""

(tmp_dir_static / 'report.py').write_text(report_py)
(tmp_dir_static / 'clean_table.sql').write_text(clean_table_sql)
(tmp_dir / 'env.yaml').write_text(env_yaml)

def display_file(file, syntax):
    s = """
```{}
{}
```
""".format(syntax, file)
    return display(Markdown(s))

Our working environment has an `env.yaml` file with a `static/` folder holding a SQL and a Python script.

In [4]:
! tree $tmp_dir

[01;34m/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf8svp9hx[00m
├── env.yaml
└── [01;34mstatic[00m
    ├── clean_table.sql
    └── report.py

1 directory, 3 files


### Content of `env.yaml`

In [5]:
display_file(env_yaml, 'yaml')


```yaml

_module: '{{here}}'

path:
    data: '{{here}}/data/'
    static: '{{here}}/static/'

```


### Content of `static/report.py`

In [6]:
display_file(report_py, 'python')


```python

# static/report.py

# +
# This file is in jupytext light format
import seaborn as sns
import pandas as pd
# -

# + tags=['parameters']
# papermill will add the parameters below this cell
upstream = None
product = None
# -

# +
path = upstream['raw']
df = pd.read_parquet(path)
# -

# ## AGE distribution

# +
_ = sns.distplot(df.AGE)
# -

# ## Price distribution

# +
_ = sns.distplot(df.price)
# -

```


### Content of `static/create_table.sql`

In [7]:
display_file(clean_table_sql, 'sql')


```sql

-- static/clean_table.sql

DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}}
AS SELECT * FROM {{upstream["raw_table"]}}
WHERE AGE < 100

```


### Pipeline declaration

In [8]:
def _get_data(product):
    data = datasets.fetch_california_housing()
    df = pd.DataFrame(data.data)
    df.columns = data.feature_names
    df['price'] = data.target
    df.to_parquet(str(product))


@with_env
def make(env):
    # NOTE: passing the executor parameter is only required for testing purposes, can be removed
    dag = DAG(executor=Serial(build_in_subprocess=False))

    client = SQLAlchemyClient('sqlite:///my_db.db')
    dag.clients[SQLUpload] = client
    dag.clients[SQLiteRelation] = client
    dag.clients[SQLScript] = client

    # initialize SourceLoader in our static directory
    loader = SourceLoader(path=env.path.static)

    get_data = PythonCallable(_get_data,
                              product=File(tmp_dir / 'raw.parquet'),
                              dag=dag,
                              name='raw')

    # if we do not pass a name, the filename will be used as default
    report = NotebookRunner(loader['report.py'],
                            product=File(tmp_dir / 'report.html'),
                            dag=dag,
                            kernelspec_name='python3')

    raw_table = SQLUpload(source='{{upstream["raw"]}}',
                         product=SQLiteRelation(('raw', 'table')),
                         dag=dag,
                         name='raw_table')

    # same here, no need to pass a name
    clean_table = SQLScript(loader['clean_table.sql'],
                            product=SQLiteRelation(('clean', 'table')),
                            dag=dag)

    get_data >> report
    get_data >> raw_table >> clean_table
    
    return dag


dag = make()

### Pipeline status

In [9]:
# Using SourceLoader automatically adds 'Location' column which points to the the source code location
dag.status()

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




name,Last updated,Outdated dependencies,Outdated code,Product,Doc (short),Location
raw,Has not been run,False,True,/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf8svp9hx/raw.parquet,,:1
report.py,Has not been run,True,True,/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf8svp9hx/report.html,,/private/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf8svp9hx/static/report.py
raw_table,Has not been run,True,True,raw,,
clean_table.sql,Has not been run,True,True,clean,,/private/var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpf8svp9hx/static/clean_table.sql


In [10]:
dag.build()

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, description='Executing', max=10.0, style=ProgressStyle(description_wid…





name,Ran?,Elapsed (s),Percentage
raw,True,0.121997,3.90869
raw_table,True,0.036288,1.16264
clean_table.sql,True,0.003628,0.116238
report.py,True,2.95926,94.8124


## Advanced jinja2 features

`SourceLoader` initializes a proper jinja2 environment, so you can use features such as [macros](https://jinja.palletsprojects.com/en/2.11.x/templates/#macros), this is very useful to maximize SQL code reusability.

In [11]:
import shutil
shutil.rmtree(str(tmp_dir))