# Torcharrow: Data Pipes

Typical torcharrow programs often wrangle data in an imperative fashion. This enables step-wise program development, thereby easing debugging and experimentation. However imperative dataframe programs require introducing lots of intermediate results. 

On the other hand, a purely functional version can often be constructed by just composing one dataframe operator after another. In the Pandas community this style is also known as data pipes.

Luckily both styles are available to anyone using the torcharrow library. To see the differences let's analyze the US Department of Transportation's data for flight data. \[Remark: To make this workbook self-contained we don't open a DB connection or read a CSV file but just use random data. The variable `flights` represents our database.\]


In [1]:
import random
import torcharrow as ta


dep_sample = [random.randrange(1, 10, 1) for i in range(70)]
arr_sample = [random.randrange(1, 10, 1) for i in range(70)]
N = 5

flights = ta.DataFrame({
    'dep_delay':ta.Column(dep_sample), 
    'arr_delay':ta.Column(arr_sample)})
flights.head(5)

  index    dep_delay    arr_delay
-------  -----------  -----------
      0            6            5
      1            1            7
      2            4            7
      3            8            6
      4            1            7
dtype: Struct([Field('dep_delay', int64), Field('arr_delay', int64)]), count: 5, null_count: 0


## Imperative code 
Here is an example program that finds the flights and their dep(arture)_delay by the minute. 

In [2]:


grouped_flights = flights.groupby(['dep_delay'])

tmp = ta.DataFrame()
tmp['dep_delay'] = grouped_flights['dep_delay']
tmp['arrival_delay'] = grouped_flights["arr_delay"].mean()
tmp['numflights'] =  grouped_flights["arr_delay"].count()

filtered = tmp[tmp['numflights']>N]
sorted = filtered.sort(by=['dep_delay'], ascending=False)
sorted.head(5)

  index    dep_delay    arrival_delay    numflights
-------  -----------  ---------------  ------------
      0            9          5.57143             7
      1            8          5.55556             9
      2            7          4.33333             9
      3            6          4.75                8
      4            4          5.76923            13
dtype: Struct([Field('dep_delay', int64), Field('arrival_delay', float64), Field('numflights', int64)]), count: 5, null_count: 0

## Functional code aka data pipes

The corresponding data pipe looks like this:

In [3]:
from torcharrow import me

(flights
.groupby(['dep_delay'])
.select(
    arrival_delay = me['arr_delay'].mean(),
    numflights = me['arr_delay'].count())
.where(me['numflights']>N)
.sort(by=['dep_delay'], ascending=False)
.head(5)
)

  index    dep_delay    arrival_delay    numflights
-------  -----------  ---------------  ------------
      0            9          5.57143             7
      1            8          5.55556             9
      2            7          4.33333             9
      3            6          4.75                8
      4            4          5.76923            13
dtype: Struct([Field('dep_delay', int64), Field('arrival_delay', float64), Field('numflights', int64)]), count: 5, null_count: 0

Everything is composed in a fluent style. But due to the missing temporaries, expressions like `tmp['numflights']>300` had to be rewritten as `me['numflights']>300`. But where does `me` come from?

## Expression Trees and `me`

In torcharrow any reference to the global `me`, which is typed as a dataframe, constructs an expression tree. That tree is only evaluated when the hosting dataframe is being executed, in which case `me` is bound to `self`.

The signature of `where` makes the distinction between passing a function and passing an expression tree explicit. We have: 

```
    def Dataframe.where(self, condition:Union[Callable, Expression): ...
```

The call of `where` with the expression tree

```
   df.where(me['numflights']>300)
```

is semantically exactly the same as passing a lambda where at call time `me` is bound to `self`.

```
    df.where(lambda me: me['numflights']>300)
```

We see that the expression tree form is not only shorter, but it also makes at runtime every attribute access and call explicit and thus easily analyzable. This stands in contrast to the use of the lambda, which code is not inspectable at runtime.

## Torcharrow, SQL, UPM, back and forth

It is easy to see that the imperative and functional versions are the same. It should also be obvious that torcharrow programs that only use the column and dataframe API actually correspond to SQL, too. The program above could, for example, be written as:

```
    SELECT 
        dep_delay, 
        MEAN(arr_delay) AS arrival_delay, 
        COUNT(arr_delay) AS numflights, 
    FROM flight
    GROUP BY dep_delay
    WHERE numflights > 300
    SORT BY dep_delay
    LIMIT 10
```

Torcharrow will eventually support all relational operators, e.g where, select, join, group-by, sort, union, except, limit, etc.

Used in this way torcharrow is similar to UPM. However there are several big differences:
 * torcharrow is a lightweight API with a single box runtime and dynamically checked types.
 * UPM is a precompiler with strong static typing targeting various platforms: Presto, HQL and XStream.
 * In torcharrow all embedded expressions have to be authored in Python proper.
 * In UPM these expressions are strings and have to follow Presto or HQL syntax.

The good news is that we can translate one into the other. In fact, we plan to translate torcharrow into the intermediate language for UPM so that torcharrow benefits from UPM's deep analysis and optimization stack. And vice versa one could consider translating UPM into torcharrow to execute it locally.

## Pipes: Compositional SQL 
The big benefit of Python over SQL is that we can easily build procedural abstractions. Suppose we define:

In [4]:
def quality_control(df):
    return df.where(me['numflights']>N)

We can use `quality_control` in above data pipe. However `quality_control` is not defined on dataframe and thus doesn't compose in a fluent style. Luckily we can use Panda's `pipe` operator:

```
    def Dataframe.pipe(self, func, *args, **kwargs):
```
        
When `pipe` executes it simply calls the passed func(tion) like this `func(self, *args, **kwargs)`. 

Applying the pipe operator, we can still write our pipeline as:

In [5]:
(flights
.groupby(['dep_delay'])
.select(
    arrival_delay = me['arr_delay'].mean(),
    numflights = me['arr_delay'].count())
.pipe(quality_control)
.sort(by=['dep_delay'], ascending=False)
.head(5)
)

  index    dep_delay    arrival_delay    numflights
-------  -----------  ---------------  ------------
      0            9          5.57143             7
      1            8          5.55556             9
      2            7          4.33333             9
      3            6          4.75                8
      4            4          5.76923            13
dtype: Struct([Field('dep_delay', int64), Field('arrival_delay', float64), Field('numflights', int64)]), count: 5, null_count: 0

## Summary

Torcharrow allows to author dataframes in an imperative and functional way. Both are semantically the same! And 
by using the `pipe` operator user-defined-functions can be integrated into the fluent style, too.  

By leveraging `me`, one can refer to the current dataframe inside of expressions. This allows to author lambda expressions succinctly  and makes them even analyzable. 

By combining expressions trees with [tracing](./torcharrow_state.ipynb), we can translate most torcharrow program to UPM and thus to SQL.
