*Colab* uses an old version of IPython (`5.5.0` from 2017) that is not compatible with LineaPy. Hence, we need to upgrade it first and reset the the current Colab session then we can install LineaPy.  


In [None]:
%%capture
!pip install --upgrade ipython

In [None]:
exit()

In [None]:
%%capture
!pip install lineapy

Load LineaPy as a notebook extension and import the library.

In [1]:
%load_ext lineapy

In [2]:
import lineapy

# Use LineaPy to create a simple ML/data pipeline

## Scenario

As your notebooks become more and more mature, sometimes they have been used like pipelines; for instance, your notebook might do the following things

* processing latest data and updating dashboards
* preprocessing data and dumping it to the filesystem for downstream model developing

and you are expecting to re-execute the pipeline on a regular or ad-hoc basis.

However, you may not have the proper engineering support to set up these pipelines for you or these pipelines are not for production purposes (like data preparation for your experiment).

## What might happen next?
 
* You spend a lot of time copying and pasting and cleaning your code to make your pipeline work in the orchestration systems or job schedulers (cron, Apache Airflow, prefect ...).
* It takes forever to make your pipeline work and you end up running your pipelines manually whenever you need it.
* Your runtime environment is changed due to other experiments you are running and your pipeline is no longer working.
* You make some change on your pipeline and now you find out you need to restart the above processes again.

As more and more notebooks and pipelines need to be maintained, a data scientist will need to spend more and more time on the operation side to make sure every notebook is running at the right time successfully. 
And this operational burden will consume a lot of time from data scientists and decrease their productivity and innovation.

## What problems LineaPy is trying to solve here?

LineaPy is trying to help setting up and maintaining pipelines with minimal efforts.

```
import lineapy

........................
.
. your original notebook
. 
........................

lineapy.save(object, 'artifact name')
lineapy.to_pipeline(['artifact name'], )

```

 
With just three lines of code, LineaPy is enabling data scientists to produce runnable pipelines easily.
For some orchestration systems, like Apache Airflow, LineaPy is even able to upload the runnable pipeline without any manual interaction.
 
## What will we learn in the rest of the notebook?
 
In this demo, we are going to load the iris data features as a four columns data frame and do an aggregation to mimic the dashboard updating process or data processing.
During this process, we will use a minimal example to create a LineaPy artifact, then we will create a pipeline that produces the artifact.
 
In the end, we will demonstrate
 
* How to use the to_pipeline() to
  * create a native pipeline as a python script and how to set it up in crontab
  * create a native Airflow DAG that can populate in the Airflow DAG folder that will be picked up by the Airflow scheduler seamlessly.
* Investigate the `requirement.txt` to validate that LineaPy is saving the module dependency for the pipeline.
 
This demonstrates that LineaPy can help data scientists save time on pipeline management and focus more on insight generation by using the pipeline creating feature.
 
 
 




## Minimal example to mimic a data processing pipeline

* Load the data
* Do some aggregation
* Save the artifact

In [11]:
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import pandas as pd

iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(target=[iris.target_names[i] for i in iris.target])
df.head()


Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


In [12]:

iris_agg = df.groupby('target').describe()
iris_agg

Unnamed: 0_level_0,sepal length (cm),sepal length (cm),sepal length (cm),sepal length (cm),sepal length (cm),sepal length (cm),sepal length (cm),sepal length (cm),sepal width (cm),sepal width (cm),...,petal length (cm),petal length (cm),petal width (cm),petal width (cm),petal width (cm),petal width (cm),petal width (cm),petal width (cm),petal width (cm),petal width (cm)
Unnamed: 0_level_1,count,mean,std,min,25%,50%,75%,max,count,mean,...,75%,max,count,mean,std,min,25%,50%,75%,max
target,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
setosa,50.0,5.006,0.35249,4.3,4.8,5.0,5.2,5.8,50.0,3.428,...,1.575,1.9,50.0,0.246,0.105386,0.1,0.2,0.2,0.3,0.6
versicolor,50.0,5.936,0.516171,4.9,5.6,5.9,6.3,7.0,50.0,2.77,...,4.6,5.1,50.0,1.326,0.197753,1.0,1.2,1.3,1.5,1.8
virginica,50.0,6.588,0.63588,4.9,6.225,6.5,6.9,7.9,50.0,2.974,...,5.875,6.9,50.0,2.026,0.27465,1.4,1.8,2.0,2.3,2.5


In [13]:
# Saving the artifact

artifact = lineapy.save(iris_agg, 'iris agg')

## Create a pipeline from an artifact





In [14]:
lineapy.to_pipeline(artifacts=['iris agg'], framework='SCRIPT', pipeline_name='iris_aggregation_script_pipeline', output_dir='python_script')

PosixPath('python_script')

## Validate the `to_pipeline` output

In [15]:
%%sh
echo 'validate lineapy has created all pipeline required files'
echo '--------------------------------------------------------------------'
ls -ltrh python_script
echo '\n'
echo '--------------------------------------------------------------------'
echo 'python module: iris_aggregation_script_pipeline.py'
echo '--------------------------------------------------------------------'
cat python_script/iris_aggregation_script_pipeline.py
echo '\n'
echo '--------------------------------------------------------------------'
echo 'pipeline script: iris_aggregation_script_pipeline_script_dag.py'
echo '--------------------------------------------------------------------'
cat python_script/iris_aggregation_script_pipeline_script_dag.py
echo '--------------------------------------------------------------------'
echo 'module requirement: iris_aggregation_script_pipeline_requirements.txt'
echo '--------------------------------------------------------------------'
cat python_script/iris_aggregation_script_pipeline_requirements.txt
echo '--------------------------------------------------------------------'


validate lineapy has created all pipeline required files
--------------------------------------------------------------------
total 0
-rwxrwxrwx 1 mlee mlee 281 May 12 11:56 iris_aggregation_script_pipeline.py
-rwxrwxrwx 1 mlee mlee 368 May 12 11:56 iris_aggregation_script_pipeline_Dockerfile
-rwxrwxrwx 1 mlee mlee  89 May 12 11:56 iris_aggregation_script_pipeline_requirements.txt
-rwxrwxrwx 1 mlee mlee 128 May 12 11:56 iris_aggregation_script_pipeline_script_dag.py


--------------------------------------------------------------------
python module: iris_aggregation_script_pipeline.py
--------------------------------------------------------------------
import pandas as pd
from sklearn.datasets import load_iris


def iris_agg():
    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.groupby("target").describe()


-------------------------------------------------------

## Execute the pipeline from the command line
 
We can re-execute the pipeline via python CLI directly.
 
```bash
python python_script/iris_aggregation_script_pipeline_script_dag.py
```
 
If we need to recover the python environment, we can always run
 
```bash
pip install -r python_script/iris_aggregation_script_pipeline_requirements.txt
```
 
to recover the original environment.
 
### Setup a cron job
 
We can use crontab to schedule the pipeline by following the command(to run every hour)
 
```bash
crontab - "0 * * * * python python_script/iris_aggregation_script_pipeline_script_dag.py"
```
 
note that you need to make sure that you are using the correct python environment and modify the path as the absolute path to the dag file.
 



## Create the airflow pipeline

Note that, if yo set the `output_dir` to your Airflow dag folder, your Airflow can pick up the dag seamlessly.
Of course, you want to make sure the dag generated by LineaPy is reasonable or you want to add something(like logging) to the dag, you can make a change on the side and manually move the dag to your dag folder.

In [16]:
lineapy.to_pipeline(artifacts=['iris agg'], framework='AIRFLOW', pipeline_name='iris_aggregation_airflow_pipeline', output_dir='iris_airflow')

PosixPath('iris_airflow')

In [17]:
%%sh
echo 'validate lineapy has created all pipeline required files'
echo '--------------------------------------------------------------------'
ls -ltrh iris_airflow
echo '\n'
echo '--------------------------------------------------------------------'
echo 'python module: iris_aggregation_script_pipeline'
echo '--------------------------------------------------------------------'
cat iris_airflow/iris_aggregation_airflow_pipeline.py
echo '\n'
echo '--------------------------------------------------------------------'
echo 'pipeline script: iris_aggregation_script_pipeline_script_dag'
echo '--------------------------------------------------------------------'
cat iris_airflow/iris_aggregation_airflow_pipeline_dag.py
echo '--------------------------------------------------------------------'


validate lineapy has created all pipeline required files
--------------------------------------------------------------------
total 4.0K
-rwxrwxrwx 1 mlee mlee 281 May 12 11:56 iris_aggregation_airflow_pipeline.py
-rwxrwxrwx 1 mlee mlee 370 May 12 11:56 iris_aggregation_airflow_pipeline_Dockerfile
-rwxrwxrwx 1 mlee mlee  89 May 12 11:56 iris_aggregation_airflow_pipeline_requirements.txt
-rwxrwxrwx 1 mlee mlee 578 May 12 11:56 iris_aggregation_airflow_pipeline_dag.py


--------------------------------------------------------------------
python module: iris_aggregation_script_pipeline
--------------------------------------------------------------------
import pandas as pd
from sklearn.datasets import load_iris


def iris_agg():
    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names).assign(
        target=[iris.target_names[i] for i in iris.target]
    )
    iris_agg = df.groupby("target").describe()


----------------------------------------------------------

We've demonstrated that LineaPy can easily create and update your pipeline and make sure it is reproducible.
This should save a lot of time for data scientist from pipeline operation time.

Click here to learn more [LineaPy API documentation](https://docs.lineapy.org/).