In [None]:
#hide
!pip install -qq nbdev geopandas matplotlib numpy pandas pygeos pyproj Shapely

In [None]:
#hide
from pypipe.pipes import *
import warnings
warnings.filterwarnings('ignore')

# PyPipe

> A framework for building and running simple data engineering pipelines in Python.

In Data Science or Data Engineering you constantly hear term “data pipeline”. But there are so many meanings to this term and people often are refering to very specific tools or packages depending on their own background/needs. There are pipelines for pretty much everything and in Python alone I can think of [Luigi](https://luigi.readthedocs.io/en/stable/), [Airflow](https://airflow.apache.org/), [scikit-learn pipelines](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html), and [Pandas pipes](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html) just off the top of my head - [this article](https://towardsdatascience.com/data-pipelines-what-why-and-which-ones-1f674ba49946) does a good job of helping you understand what is out there.

It can be quite confusing especially if you want a simple and agnostic pipeline that you can customize for your specific needs with no bells and whistles or lock-ins to libraries etc. That is where PyPipe comes in. It is for the simple data engineer who just wants to get stuff done in an ordered and repeatable way.

PyPipe is a simple data pipeline that automates a chain of transformations performed on some data.

PyPipe data pipelines are a great way of introducing automation, reproducibility, structure, and flow to your data engineering projects.

---

PyPipe was made by [Robert Johnson](https://www.robtheoceanographer.com/) and [Alexander Kozlov](https://alexkozlov.com/) and [Mohammadreza Khanarmuei](https://www.linkedin.com/in/mohammadreza-khanarmuei-437a3163)

---

## What is it?

The PyPipe transformation pipelines use user defined transformation functions linked together into a TransformationPipe. The key feature of PyPipe is that the datasource passed in can be almost anything that you desire  - e.g. a pandas dataframe, a geopandas dataframe, and iris datacube, a numppy array, so long as your transformation steps read and write the same object pypipe will work for you.

![pypipe arch](images/pypipe.jpeg)

## Install

`pip install pypipe`

## How to use

The TransformationPipe class accepts a list of transformation functions,'steps', to be applied sequentially. Each step contains a name and a function, applied to the input DataObject and will return a transformed DataObject. There is also a third argument in a step that is an optional dictionary of parameters to be passed to your step transformation functions.


In order to use PyPipe you need two things - a DataObject and a set of transformation steps

### DataObject

In this very simplified example we will use a [geopandas.GeoDataFrame](https://geopandas.org/en/stable/index.html) as our input DataObject. To do this we will load an example data set from [Kaggle](https://www.kaggle.com/) on the global distribution of Volcano Eruptions: https://www.kaggle.com/datasets/texasdave/volcano-eruptions that we have stored in the repo for this package as 'volcano_data_2010.csv'

In [None]:
import pandas
import geopandas

Load the data and put it into a geopandas dataframe:

In [None]:
df1 = pandas.read_csv('../test_data/volcano_data_2010.csv')
# Keep only relevant columns
df = df1.loc[:, ("Year", "Name", "Country", "Latitude", "Longitude", "Type")]
# Create point geometries
geometry = geopandas.points_from_xy(df.Longitude, df.Latitude)
geo_df = geopandas.GeoDataFrame(df[['Year','Name','Country', 'Latitude', 'Longitude', 'Type']], geometry=geometry)
geo_df.head()

Unnamed: 0,Year,Name,Country,Latitude,Longitude,Type,geometry
0,2010,Tungurahua,Ecuador,-1.467,-78.442,Stratovolcano,POINT (-78.44200 -1.46700)
1,2010,Eyjafjallajokull,Iceland,63.63,-19.62,Stratovolcano,POINT (-19.62000 63.63000)
2,2010,Pacaya,Guatemala,14.381,-90.601,Complex volcano,POINT (-90.60100 14.38100)
3,2010,Sarigan,United States,16.708,145.78,Stratovolcano,POINT (145.78000 16.70800)
4,2010,Karangetang [Api Siau],Indonesia,2.78,125.48,Stratovolcano,POINT (125.48000 2.78000)


### Steps

Just as an example of something to do we will define only one transformation steps to spatially subset to the Australian region. Yes, i know that this is an unrealistic example but it is just here to show you how to implement pipelines.

We must now write our transformation function - keep in mind that the function must take our DataObject as an input and return a transformed DataObject as a return... in this example that is a geopandas.GeoDataFrame

In [None]:
from pyproj import crs
from shapely.geometry import Polygon, MultiPolygon, box, Point

In [None]:
def spatialCrop(gdf: geopandas.GeoDataFrame, **kwargs):
    """
    This function will apply a sptial limit to a GeoDataFrame based on user-defined limits.
    ----------
    parameters:
        gdf (geopandas.GeoDataFrame): an input GeoDataFrame
        kwargs (dict): parameters, 
            - boundingBox (list): an iterable (lon_min, lat_min, lon_max, lat_max) of the specified region.
    Output:
        transformed_gdf (gdp.GeoDataFrame): GeoDataFrame that is spatially limited to the boundingBox.
    """
    if "boundingBox" not in kwargs:
        return gdf

    boundingBox = kwargs["boundingBox"]
    # just an example so we are doing naughty things with the CRS... look away here...
    coord_system = crs.crs.CRS('WGS 84')

    bounding = geopandas.GeoDataFrame(
        {
            'limit': ['bounding box'],
            'geometry': [
                box(boundingBox[0], boundingBox[1], boundingBox[2],
                    boundingBox[3])
            ]
        },
        crs=coord_system)
    limited_gdf = geopandas.tools.sjoin(gdf,
                                        bounding,
                                        op='intersects',
                                        how='left')
    limited_gdf = limited_gdf[limited_gdf['limit'] == 'bounding box']
    limited_gdf = limited_gdf.drop(columns=['index_right', 'limit'])

    return limited_gdf

### Define a PyPipe
Now that we have a step or function and some data we can now define our transformation pipeline:

In [None]:
pipe = TransformationPipe(steps=[
    ('refine region', spatialCrop, {"boundingBox": [80, -50, 180, 0]})
])

### Evaluate your PyPipe
This where things get interesting... we can now call `evaluate` on our pipe and watch the magic happen:

#### Input data:

In [None]:
geo_df

Unnamed: 0,Year,Name,Country,Latitude,Longitude,Type,geometry
0,2010,Tungurahua,Ecuador,-1.467,-78.442,Stratovolcano,POINT (-78.44200 -1.46700)
1,2010,Eyjafjallajokull,Iceland,63.630,-19.620,Stratovolcano,POINT (-19.62000 63.63000)
2,2010,Pacaya,Guatemala,14.381,-90.601,Complex volcano,POINT (-90.60100 14.38100)
3,2010,Sarigan,United States,16.708,145.780,Stratovolcano,POINT (145.78000 16.70800)
4,2010,Karangetang [Api Siau],Indonesia,2.780,125.480,Stratovolcano,POINT (125.48000 2.78000)
...,...,...,...,...,...,...,...
58,2018,Kilauea,United States,19.425,-155.292,Shield volcano,POINT (-155.29200 19.42500)
59,2018,Kadovar,Papua New Guinea,-3.620,144.620,Stratovolcano,POINT (144.62000 -3.62000)
60,2018,Ijen,Indonesia,-8.058,114.242,Stratovolcano,POINT (114.24200 -8.05800)
61,2018,Kilauea,United States,19.425,-155.292,Shield volcano,POINT (-155.29200 19.42500)


#### Evaluation:

In [None]:
transformed_geo_df = pipe.evaluate(geo_df)

#### Transformed data:

In [None]:
transformed_geo_df

Unnamed: 0,Year,Name,Country,Latitude,Longitude,Type,geometry
6,2010,Merapi,Indonesia,-7.542,110.442,Stratovolcano,POINT (110.44200 -7.54200)
8,2010,Tengger Caldera,Indonesia,-7.942,112.95,Stratovolcano,POINT (112.95000 -7.94200)
9,2011,Merapi,Indonesia,-7.542,110.442,Stratovolcano,POINT (110.44200 -7.54200)
22,2013,Merapi,Indonesia,-7.542,110.442,Stratovolcano,POINT (110.44200 -7.54200)
23,2013,Paluweh,Indonesia,-8.32,121.708,Stratovolcano,POINT (121.70800 -8.32000)
25,2013,Paluweh,Indonesia,-8.32,121.708,Stratovolcano,POINT (121.70800 -8.32000)
29,2013,Okataina,New Zealand,-38.12,176.5,Lava dome,POINT (176.50000 -38.12000)
31,2014,Kelut,Indonesia,-7.93,112.308,Stratovolcano,POINT (112.30800 -7.93000)
39,2015,Manam,Papua New Guinea,-4.1,145.061,Stratovolcano,POINT (145.06100 -4.10000)
41,2015,Okataina,New Zealand,-38.12,176.5,Lava dome,POINT (176.50000 -38.12000)


The power of this work is in its reproducibility and scalablilty.

## Credits

- Logo art from "Vecteezy.com"
- Demo data from "Kaggle.com"