# CoTransformer

`Transformer` represents the logic unit executing on arbitrary machine on a collection of partitions of the same partition keys of the input dataframes. The partitioning logic is not a concern of `CoTransformer`, it must be specified by `zip` in the previous step. You must understand [partition](partition.ipynb) and [zip](./execution_engine.ipynb#Zip-&-Comap)

**Input can be a single** `DataFrames`

**Alternatively it accepts input DataFrame types**: `LocalDataFrame`, `pd.DataFrame`, `List[List[Any]]`, `Iterable[List[Any]]`, `EmptyAwareIterable[List[Any]]`, `List[Dict[str, Any]]`, `Iterable[Dict[str, Any]]`, `EmptyAwareIterable[Dict[str, Any]]`

**Output DataFrame types can be**: `LocalDataFrame`, `pd.DataFrame`, `List[List[Any]]`, `Iterable[List[Any]]`, `EmptyAwareIterable[List[Any]]`, `List[Dict[str, Any]]`, `Iterable[Dict[str, Any]]`, `EmptyAwareIterable[Dict[str, Any]]`

Notice that `ArrayDataFrame` and other local dataframes can't be used as annotation, you must use `LocalDataFrame`.

`CoTransformer` requires users to be explicit on the output schema. Different from `Transformer`, `*` is not allowed.

## Why Explicit on Output Schema?

Normally computing frameworks can infer output schema, however, it is neither reliable nor efficient. To infer the schema, it has to go through at least one partition of data and figure out the possible schema. However, what if a transformer is producing inconsistent schemas on different data partitions? What if that partition takes a long time or fail? So to avoid potential correctness and performance issues, `Transformer` and `CoTransformer` output schemas are required in Fugue.

## Native Approach

The simplest way, with no dependency on Fugue. You just need to have acceptable annotations on input dataframes and output. In native approach, you must specify schema in the Fugue code.

In [None]:
from typing import Iterable, Dict, Any, List
import pandas as pd

def to_str(df1:List[List[Any]], df2:List[Dict[str,Any]], n=1) -> List[List[Any]]:
    return [[df1.__repr__(),df2.__repr__()]]

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[1,3]],"a:int,b:int")
    df2 = dag.df([[0,4],[1,2]],"a:int,c:int")
    df3 = dag.df([[0,2],[1,1],[1,5]],"a:int,b:int")
    # with out schema hint you have to specify schema in Fugue code
    # must have zip, by default, zip inner joins them by their common keys
    df1.zip(df2).transform(to_str, schema="df1:str,df2:str").show()
    # if you don't want Fugue to infer the join keys, you can specify
    df1.zip(df3, partition={"by":"a"}).transform(to_str, schema="df1:str,df2:str").show()
    # you can also presort partitions
    df1.zip(df3, partition={"by":"a", "presort":"b DESC"}).transform(to_str, schema="df1:str,df2:str").show()

## With Schema Hint

When you need to reuse a cotransformer multiple times, it's tedious to specify the schema in Fugue code every time. You can instead, write a schema hint on top of the function, this doesn't require you to have Fugue dependency. The following code is doing the same thing as above but see how much shorter.

In [None]:
from typing import Iterable, Dict, Any, List
import pandas as pd

#schema: df1:str,df2:str
def to_str(df1:List[List[Any]], df2:List[Dict[str,Any]], n=1) -> List[List[Any]]:
    return [[df1.__repr__(),df2.__repr__()]]

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[1,3]],"a:int,b:int")
    df2 = dag.df([[0,4],[1,2]],"a:int,c:int")
    df3 = dag.df([[0,2],[1,1],[1,5]],"a:int,b:int")
    df1.zip(df2).transform(to_str).show()
    df1.zip(df3, partition={"by":"a"}).transform(to_str).show()
    df1.zip(df3, partition={"by":"a", "presort":"b DESC"}).transform(to_str).show()

## Using DataFrames

Instead of using dataframes as input, you can use a single `DataFrames` for arbitrary number of inputs.

In [None]:
from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import DataFrames, FugueWorkflow

#schema: res:[str]
def to_str(dfs:DataFrames) -> List[List[Any]]:
    return [[[x.as_array().__repr__() for x in dfs.values()]]]

#schema: res:[str]
def to_str_with_key(dfs:DataFrames) -> List[List[Any]]:
    return [[[k+" "+x.as_array().__repr__() for k,x in dfs.items()]]]

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[1,3]],"a:int,b:int")
    df2 = dag.df([[0,4],[1,2]],"a:int,c:int")
    df3 = dag.df([[0,2],[1,1],[1,5]],"a:int,d:int")
    dag.zip(df1,df2,df3).transform(to_str).show()
    dag.zip(df1,df2,df3).transform(to_str_with_key).show()
    dag.zip(dict(a=df1,b=df2,c=df3)).transform(to_str_with_key).show()


## Decorator Approach

Decorator approach can do everything the schema hint can do, plus, it can take in a function to generate the schema.

In [None]:
from fugue import FugueWorkflow, Schema, cotransformer
from typing import Iterable, Dict, Any, List
import pandas as pd

# dfs is the zipped DataFrames, **params is the parameters passed in from fugue
def schema_from_dfs(dfs, **params):
    return Schema([("_".join(df.schema.names),str) for df in dfs.values()])
    
@cotransformer(schema_from_dfs)
def to_str(df1:List[List[Any]], df2:List[Dict[str,Any]], n=1) -> List[List[Any]]:
    return [[df1.__repr__(),df2.__repr__()]]

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[1,3]],"a:int,b:int")
    df2 = dag.df([[0,4],[1,2]],"a:int,c:int")
    df1.zip(df2).transform(to_str).show() # see the output schema

## Interface Approach

All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But for certain cases, you should implement interface, for example

* You need partition information, such as partition keys, schema, and current values of the keys
* You have an expensive but common initialization step for processing each logical partition, this should happen when initializaing physical partition

The biggest advantage of interface approach is that you can customize pyhisical partition level initialization, and you have all the up-to-date context variables to use.

In the interface approach, type annotations are not necessary, but again, it's good practice to have them.

In [None]:
#from fugue import Transformer, FugueWorkflow, DataFrame, LocalDataFrame, PandasDataFrame
from fugue import CoTransformer, FugueWorkflow, PandasDataFrame, DataFrame, ArrayDataFrame
from triad.collections import Schema
from time import sleep
import pandas as pd
import numpy as np

def expensive_init(sec=5):
    sleep(sec)

def helper(ct=20) -> pd.DataFrame:
    np.random.seed(0)
    return pd.DataFrame(np.random.randint(0,10,size=(ct, 2)), columns=list('ab'))

class Median(CoTransformer):
    # this is invoked on driver side
    def get_output_schema(self, dfs):
        return self.key_schema + [k+":double" for k in dfs.keys()]
    
    # on initialization of the physical partition
    def on_init(self, df: DataFrame) -> None:
        expensive_init(self.params.get("sec",0))
        
    def transform(self, dfs):
        result = self.cursor.key_value_array
        for k, v in dfs.items():
            m = v.as_pandas()["b"].median()
            result.append(m)
        return ArrayDataFrame([result], self.output_schema)
        

with FugueWorkflow() as dag:
    # a, b are identical because of the seed
    a=dag.create(helper)
    b=dag.create(helper)
    dag.zip(dict(x=a,y=b), partition={"by":["a"]}).transform(Median, params={"sec": 1}).show(rows=100)

Notice a few things here:

* How we access the key schema (`self.key_schema`), and current logical partition's keys as array (`self.cursor.key_value_array`)
* Although DataFrames is a dict, it's an ordered dict following the input order, so you can iterate in this way
* `expensive_init` is something that is a common initialization for different logical partitions, we move it to `on_init` so it will run once for each physcial partition.


# Output CoTransformer

`OutputCoTransfomer` is in general similar to `CoTransformer`. And any `CoTransformer` can be used as `OutputCoTransformer`. It is important to understand the difference between the operations `transform` and `out_transform`

* `transform` is lazy, Fugue does not ensure the compute immediately. For example, if using `SparkExecutionEngine`, the real compute of `transform` happens only when hitting an action, for example `save`.
* `out_transform` is an action, Fugue ensures the compute happening immediately, regardless of what execution engine is used.
* `transform` outputs a transformed dataframe for the following steps to use
* `out_transform` is the last compute of a branch in the DAG, it outputs nothing.

You may find that `transform().persist()` can be an alternative to `out_transform`, it's in general ok, but you must notice that, the output dataframe of a transformation can be very large, if you persist or checkpoint it, it can take up great portion of memory or disk space. In contrast, `out_transform` does not take any space. Plus, it is a more explicit way to show what you want to do.

A typical use case is to distributedly compare two dataframes per partition


## Native Approach

In [None]:
from typing import List, Any

def assert_eq(df1:List[List[Any]], df2:List[List[Any]]) -> None:
    assert df1 == df2
    print(df1,"==",df2)

# schema: a:int
def assert_eq_2(df1:List[List[Any]], df2:List[List[Any]]) -> List[List[Any]]:
    assert df1 == df2
    print(df1,"==",df2)
    return [[0]]

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[0,2],[1,3]], "a:int,b:int")
    df2 = dag.df([[1,3],[0,2],[0,1]], "a:int,b:int")
    z = df1.zip(df2, partition=dict(by=["a"],presort=["b"]))
    z.out_transform(assert_eq)
    z.out_transform(assert_eq_2) # All CoTransformer like functions/classes can be used directly

## Decorator Approach

There is no obvious advantage to use decorator for `OutputCoTransformer`

In [None]:
from typing import List, Any
from fugue.extensions import output_cotransformer
from fugue import FugueWorkflow

@output_cotransformer()
def assert_eq(df1:List[List[Any]], df2:List[List[Any]]) -> None:
    assert df1 == df2
    print(df1,"==",df2)
    
with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[0,2],[1,3]], "a:int,b:int")
    df2 = dag.df([[1,3],[0,2],[0,1]], "a:int,b:int")
    z = df1.zip(df2, partition=dict(by=["a"],presort=["b"]))
    z.out_transform(assert_eq)

## Interface Approach

Just like the interface approach of `CoTransformer`, you get all the flexibilities and control over your transformation

In [None]:
from typing import List, Any
from fugue.extensions import OutputCoTransformer
from fugue import FugueWorkflow

class AssertEQ(OutputCoTransformer):
    # notice the interface is different from CoTransformer
    def process(self, dfs):
        df1, df2 = dfs[0].as_array(), dfs[1].as_array()
        assert df1 == df2
        print(df1,"==",df2)

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1],[0,2],[1,3]], "a:int,b:int")
    df2 = dag.df([[1,3],[0,2],[0,1]], "a:int,b:int")
    z = df1.zip(df2, partition=dict(by=["a"],presort=["b"]))
    z.out_transform(AssertEQ)