<a href="https://colab.research.google.com/github/clarkmaio/linkedin_posts/blob/main/demo_essentials.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install git+https://github.com/clarkmaio/clarkpy_essentials.git --quiet --force-reinstall

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for clarkpy-essentials (pyproject.toml) ... [?25l[?25hdone


In [2]:
from clarkpy_essentials import Node, Pipeline, DataCatalog, Context
import clarkpy_essentials.decorator as deco

import pandas as pd
import polars as pl
import numpy as np
import os
import yaml


In [3]:
# Create dummy dataset
pd.DataFrame(np.random.randn(10, 5)).to_csv('data/csv_test.csv')
pd.DataFrame(np.random.randn(10, 5)).to_excel('data/excel_test.xlsx')
pd.DataFrame(np.random.randn(10, 5)).to_parquet('data/parquet_test.parquet')
yaml.dump({'a': 1, 'b': 2}, open('data/yaml_test.yaml', 'w'))

# **Demo decorators**

In [4]:
@deco.force_kwargs
def func1(x, y):
  return x + y

In [5]:
# The function raise error if input are not passed trhough key
try:
  func1(1, y=2)
except Exception as e:
  print('Error:', e)

Error: Arguments must be passed as keyword arguments.


In [6]:
func1(x=1, y=2)

3

# **Demo DataCatalog**
Access to your dataset simply defining the relative or aboslute path of your dataset and the type of import you want to apply (pandas.csv, pandas.excel, yaml, polars.csv, polars.parquet, ...)

Once the DataCatalog is initialized load the dataset using the corresponding key defined in the `catalog.yml`.

***This class is nothing but a big if/else to wasily deal with different data type.***

In [7]:
CATALOG_PATH = os.path.join(os.getcwd(), 'catalog.yml')
catalog = DataCatalog(catalog=CATALOG_PATH, source_path=os.getcwd())

## Explore data mapped in `catalog.yml`

In [8]:
excel_data = catalog('excel_test')
print(type(excel_data))
excel_data.head(2)

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0.1,Unnamed: 0,0,1,2,3,4
0,0,0.591221,0.919757,-0.991286,-0.249461,1.722701
1,1,0.360208,0.855848,0.721331,0.402465,-0.777587


In [9]:
yaml_data = catalog('yaml_test')
print(type(yaml_data))
yaml_data

<class 'dict'>


{'a': 1, 'b': 2}

In [10]:
parquet_data_polars = catalog('parquet_test_polars')
print(type(parquet_data_polars))
parquet_data_polars.head(2)

<class 'polars.dataframe.frame.DataFrame'>


0,1,2,3,4
f64,f64,f64,f64,f64
0.778878,-0.612608,-1.23481,0.571337,0.312162
-0.026435,0.709216,-0.460779,1.387805,-0.781104


# **Demo Context**
`Context` is just a class to collect variables you want to expose to your functions.

The class simply transform kwargs in properties.
The only constraint is about kwarg `catalog` that must be `DataCatalog` instance to be properly handled by Pipeline

In [19]:
GLOBAL_VARIABLES = {'var1': 1, 'var2': 100}
DATA_CATALOG = DataCatalog(catalog=CATALOG_PATH, source_path=os.getcwd())
context = Context(global_variables=GLOBAL_VARIABLES, catalog=DATA_CATALOG)

In [20]:
context.global_variables

{'var1': 1, 'var2': 100}

In [21]:
context.catalog('yaml_test')

{'a': 1, 'b': 2}

# **Demo Pipeline**
You can organize your workflow using Pipeline and Node classes like Kedro.
Exploit `Context` and `DataCatalog` to easily access to global variables and dataset

In [11]:
# Node functions
def f1(x: float, y: float) -> float:
  return x*y


def f2(df: pd.DataFrame, z: float) -> pd.DataFrame:
  new_df = (df*z).T
  return new_df

In [17]:
# ------------ Create Context --------------
GLOBAL_VARIABLES = {'var1': 1, 'var2': 100}
DATA_CATALOG = DataCatalog(catalog=CATALOG_PATH, source_path=os.getcwd())
context = Context(global_variables=GLOBAL_VARIABLES, catalog=DATA_CATALOG)

# ------------ Initialize Piepline ---------
pipeline = Pipeline([
    Node(func=f1,
         inputs=['context.global_variables.var1', 'context.global_variables.var2'],
         outputs='outpout_f1'),
    Node(func=f2,
         inputs=['context.catalog.csv_test', 'outpout_f1'],
         outputs='outpout_f2')
])

# ------------ Run Piepline ----------------
pipeline_results = pipeline.run(context=context)


In [16]:
print(pipeline_results)

{'outpout_f1': 100, 'outpout_f2':                     0           1           2           3           4  \
Unnamed: 0   0.000000  100.000000  200.000000  300.000000  400.000000   
0          -67.135774  108.593824   74.271549  113.166057    2.203076   
1          -25.797278  123.009467   49.157157  -39.154692   27.889686   
2           30.081634  -64.060859   61.533461  -18.624164  -85.415716   
3           31.091464  138.120942   16.174611   64.662701   78.614592   
4            1.138311 -102.975769   86.503607    4.251761 -122.422479   

                     5           6           7           8           9  
Unnamed: 0  500.000000  600.000000  700.000000  800.000000  900.000000  
0           -71.295185  -19.681172  -20.197128 -122.130013  -16.481358  
1            33.610936 -124.613468   29.350761  -35.577872   16.463038  
2          -157.676062 -118.028386   30.539302   20.352886  -99.126650  
3           -19.734175  -15.846617  -19.791488   82.308112  125.642325  
4          -232.