# 2.3.1. Simple Featurization Jobs and Datasets (tg.common.datasets.featurization.simple)


`FeaturizationJob` is the job that combines `DataSource` with `Selector`, addressing the production-level questions of memory control and results output. 

To demonstrate how it works, we will first create data source and selector in a way similar to previous demo, but without artificial distortion.

In [1]:
from tg.common.datasets.access import MockDfDataSource
import pandas as pd

source = MockDfDataSource(pd.read_csv('titanic.csv'))
selector = lambda z: z
selector(source.get_data().first())

{'PassengerId': 1, 'Survived': 0, 'Pclass': 3, 'Name': 'Braund, Mr. Owen Harris', 'Sex': 'male', 'Age': 22.0, 'SibSp': 1, 'Parch': 0, 'Ticket': 'A/5 21171', 'Fare': 7.25, 'Cabin': nan, 'Embarked': 'S'}

`MockDfDataSource` is a class that converts the dataframe into DOF of its rows. You may effectively use this class for, e.g., unit tests. And since the output of the `MockDfDataSource` is already in an appropriate format, we don't need any complex selectors, so we will just use an identity function.

Now we can create a data frame in the most primitive way:

In [2]:
df = pd.DataFrame(selector(z) for z in source.get_data())
df.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


The featurization job that produces the same result can be created as follows:

In [3]:
from tg.common.datasets.featurization import FeaturizationJob, DataframeFeaturizer
from tg.common import MemoryFileSyncer

mem = MemoryFileSyncer()

job = FeaturizationJob(
    name = 'job',
    version = 'v1',
    source = source,
    featurizers = {
        'passengers': DataframeFeaturizer(row_selector = selector)
    },
    syncer = mem,
    location = './temp/featurization_job'
)

job.run()

2022-06-29 11:25:55.397205+00:00 INFO: Featurization Job job at version v1 has started


2022-06-29 11:25:55.400349+00:00 INFO: Fetching data


2022-06-29 11:25:55.511306+00:00 INFO: Data fetched, finalizing


2022-06-29 11:25:55.598581+00:00 INFO: Uploading data


2022-06-29 11:25:55.601867+00:00 INFO: Featurization job completed


Some notes: 

* `DataFrameFeaturizer`: When used in this way, it just applies `row_selector` to each data object from `source` and collects the results into pandas dataframes
* If no `location` is provided, the folder will be created automatically in the `Loc.temp_path` folder. Usually we don't care where the intermediate files are stored, as syncer takes care of them automatically.
* `MemoryFileSyncer`. The job creates files locally (in the `location` folder), and the uploads them to the remote destination. For demonstration purposes, we will "upload" data in the memory. `tg.common` also contains `S3FileSyncer` that syncs the files with `S3`. Interfaces for other storages may be written, deriving from `FileSyncer`. Essentialy, the meaning of `FileSyncer` is a connection between a specific location on the local disk and the location somewhere else. When calling `upload` or `download` methods, the class assures the same content of given files/folders.


The resulting files can be viewed in the following way:

In [4]:
list(mem.cache)

['passengers/d24a7be5-5871-497a-9558-28b4df7d7744.parquet']

Method `get_parquet`, used for testing purposes, will read the file with the given key as a parquet file. Instead of the file name, the index of this name in the list of files can be provided, or a lambda-expression that filters the name you want to read.

In [5]:
mem.get_parquet(0).head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


## Partitioning

What if the data is too big? Per se, it's not a problem: data sources do not normally keep all the data in memory at once, and selectors process the data one-by-one. But when we do the last step of assembling the data into a dataframe, we might run into problem. Let's use `DataframeFeaturizer` arguments to prevent it:

In [6]:
mem = MemoryFileSyncer()

job = FeaturizationJob(
    name = 'job',
    version = 'v1',
    source = source,
    featurizers = {
        'passengers': DataframeFeaturizer(buffer_size=250, row_selector = selector)
    },
    syncer = mem
)

job.run()
list(mem.cache)

2022-06-29 11:25:55.697649+00:00 INFO: Featurization Job job at version v1 has started


2022-06-29 11:25:55.700836+00:00 INFO: Fetching data


2022-06-29 11:25:55.774199+00:00 INFO: Data fetched, finalizing


2022-06-29 11:25:55.778471+00:00 INFO: Uploading data


2022-06-29 11:25:55.779497+00:00 INFO: Featurization job completed


['passengers/a13a9eca-2b73-4061-8a1f-e949b2a22a9e.parquet',
 'passengers/d3b69a43-71aa-40ca-8bee-8f04c0749aae.parquet',
 'passengers/0766c94b-d180-47ce-a33f-4e1f427e621e.parquet',
 'passengers/5a27b574-592a-4d3f-b7f7-8ed996cafd2d.parquet']

In [7]:
len(mem.get_parquet(0))

250

Here we have limited amount of rows that can be put into one data frame to 250. As the result, each data frame in `memory.cache` has no more than 250 rows, and we have several files in our dataset.

## Filtering / expanding

What if our data are more complicated, and there is no 1-to-1 correspondance between data objects and rows? Examples are:
* We want to filter out some rows. In this case, 1 incoming data object corresponds to 0 rows.
* We are processing data that are organized not as a flow of passengers, but as a flow of cabins, where each cabin is a list of passengers. In this case, 1 incoming data object corresponds to arbitrary amount of rows.

Let's implement the first option by modifying `DataFrameFeaturizer`, and also explore some additional features of this class

In [8]:
import numpy as np

class MyDataFrameFeaturizer(DataframeFeaturizer):
    def __init__(self):
        super(MyDataFrameFeaturizer, self).__init__()
        
    def _featurize(self, obj):
        if obj['Age'] < 30:
            return []
        else:
            return [obj]
        
    def _postprocess(self, df):
        df.Cabin = np.where(df.Cabin.isnull(), 'NONE', df.Cabin)
        return df
     
mem = MemoryFileSyncer()

job = FeaturizationJob(
    name = 'job',
    version = 'v1',
    source = source,
    featurizers = {
        'passengers': MyDataFrameFeaturizer()
    },
    syncer = mem
)

job.run()
mem.get_parquet(0).sort_values('Age').head()

2022-06-29 11:25:55.802560+00:00 INFO: Featurization Job job at version v1 has started


2022-06-29 11:25:55.804675+00:00 INFO: Fetching data


2022-06-29 11:25:55.859091+00:00 INFO: Data fetched, finalizing


2022-06-29 11:25:55.871411+00:00 INFO: Uploading data


2022-06-29 11:25:55.872501+00:00 INFO: Featurization job completed


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
349,607,0,3,"Karaic, Mr. Milan",male,30.0,0,0,349246,7.8958,NONE,S
88,179,0,2,"Hale, Mr. Reginald",male,30.0,0,0,250653,13.0,NONE,S
76,158,0,3,"Corn, Mr. Harry",male,30.0,0,0,SOTON/OQ 392090,8.05,NONE,S
287,521,1,1,"Perreault, Miss. Anne",female,30.0,0,0,12749,93.5,B73,S
156,287,1,3,"de Mulder, Mr. Theodore",male,30.0,0,0,345774,9.5,NONE,S


Here we have created a special class just for this particular dataset, therefore, we don't really need to pass the `selector` to it.

In `_featurize` method we process a given data object in an arbitrary way, and return a list of rows.

In `_postprocess` we may perform some additional operations on the dataframe. We have imputed the values for `Cabin` field, but **do not do it** in the real examples: the imputation belongs to the machine learning part of the pipeline, not to the data cleaning.

## Aggregating 

Sometimes we are not really interested in the dataframe as it is, but want to compute some aggregated statistics and use it as features. In our case, we may wish to compute average fare and age for each cabin. (_Note that it would be an awful idea in the reality, as it would be a leakage of data from test to train_).

In this case, we need to step back in our inheritance hierarchy, and use `StreamFeaturizer` class.

In [9]:
from tg.common.datasets.featurization import StreamFeaturizer

class CabinStatisticsFeaturizer(StreamFeaturizer):
    def start(self):
        self.cabins = {}
    
    def observe_data_point(self, item):
        cabin =  item['Cabin'] 
        if not isinstance(cabin, str) or item['Fare'] is None or item['Age'] is None:
            return
        if cabin not in self.cabins:
            self.cabins[cabin] = dict(count=0, age=0, fare=0, id=cabin)
        self.cabins[cabin]['count']+=1
        self.cabins[cabin]['age']+=item['Age']
        self.cabins[cabin]['fare']+=item['Fare']
        
    def finish(self):
        df = pd.DataFrame(list(self.cabins.values()))
        df.age = df.age/df['count']
        df.fare = df.fare/df['count']
        return df.set_index('id')
        
dataset_buffer = MemoryFileSyncer()

job = FeaturizationJob(
    name = 'job',
    version = 'v1',
    source = source,
    featurizers = {
        'passengers': MyDataFrameFeaturizer(),
        'cabins': CabinStatisticsFeaturizer()
    },
    syncer = dataset_buffer,
    location='./temp/test'
)

job.run()
list(dataset_buffer.cache)

2022-06-29 11:25:55.902147+00:00 INFO: Featurization Job job at version v1 has started


2022-06-29 11:25:55.905046+00:00 INFO: Fetching data


2022-06-29 11:25:55.959682+00:00 INFO: Data fetched, finalizing


2022-06-29 11:25:55.974348+00:00 INFO: Uploading data


2022-06-29 11:25:55.975421+00:00 INFO: Featurization job completed


['cabins/0d6136cb-7833-498e-ae3c-5775b4d8db5c.parquet',
 'passengers/5b02583b-258a-4a09-a77b-aa066c281180.parquet']

These are features for cabins. Note, that passengers' features are produced as well.

In [10]:
dataset_buffer.get_parquet(lambda z: z.startswith('cabins')).head()

Unnamed: 0_level_0,count,age,fare
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
C85,1,38.0,71.2833
C123,2,36.0,53.1
E46,1,54.0,51.8625
G6,4,14.75,13.58125
C103,1,58.0,26.55


So it is possible, and often very useful, to build several datasets with a single run over the source. So far, retrieving data from source was the most time-consuming part of the featurization, so it _really_ saves time

## Datasets

The result of the FeaturizerJob can be easily consumed with the `Dataset` class. The `Dataset` class downloads the data from the remote location with `FileSyncer`, and allows you to open the files.

If the `FeaturizationJob` creates several folders, as in the example above, **one** of them is a `Dataset`. If you need access to both of them, you need to create two instances of `Dataset` class.

For demonstration, we will use `dataset_buffer` created at the last demonstration of the `FeaturizationJob`, with two datasets, `cabins` and `passengers`.

In [11]:
from tg.common.datasets.featurization import Dataset

dataset = Dataset(
    './temp/dataset',
    dataset_buffer.change_remote_subfolder('passengers')
)

As we remember, `FileSyncer` establishes connection between local drive and the remote datasource. Let's dwell into some technicalities here. Method `change_remote_subfolder`:
* returns a new instance of `FileSyncer`, because this class is immutable by design
* establishes connection between **the same** folder at the local drive, **changing only subfolder** on the remote storage. 

This is because in this particular case local folder does not matter, as it will be overriden in `Dataset` constructor. In fact, it's not even set:

In [12]:
dataset_buffer.get_local_folder() is None

True

To set the local folder, use `change_local_folder` method. We use the word _folder_ for local drive, because it can be anywhere. We use _subfolder_ for the remote source, as the path can only direct to a subfolder of the initially-defined folder. If you use `S3FileSyncer` and set it to a specific bucket with a specific prefix, you won't be able to escape from this prefix, which increases safety of operations.

If you want to change both simultaneously, spawning `FileSyncer` instance fo a subfolder in both local drive and remote storage, use `cd` method.

Back to the datasets. First, you need to download dataset:

In [13]:
dataset.download()
dataset.read().head(3)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
1,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
2,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,NONE,S


Datasets can be very large. This problem is avoided on the stage of dataset's creation by partitioning, but may appear again if you try to read the entire dataset in the memory. The following procedure of dataset's discovery is recommended.

First, read a bit of rows from the dataset. In case of partitioned dataset, only some partition will be opened:

In [14]:
dataset.read(count=3)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
1,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
2,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,NONE,S


Often, you only need a specific columns from dataset. You may explore the columns names by reading several rows, and then specify columns you need:

In [15]:
dataset.read(columns=['Age','Sex','Survived']).head()

Unnamed: 0,Age,Sex,Survived
0,38.0,female,1
1,35.0,female,1
2,35.0,male,0
3,,male,0
4,54.0,male,0


If you are only interested in some rows from dataset, use `selector` argument. It will allow you to `loc` dataframe on the records you need. It will be done, again, per partition, so the memory won't overfill.

In [16]:
dataset.read(selector=lambda z: z.loc[z.Age==80])

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
361,631,1,1,"Barkworth, Mr. Algernon Henry Wilson",male,80.0,0,0,27042,30.0,A23,S


Datasets are fine for analytics and machine learning, but when using in production, it is more convenient to have a more straightforward interface, that just gets the required `dataframe` without all these details. This interface is `DataFrameSource`, and `Dataset` can be converted to it:

In [17]:
df_source = dataset.as_data_frame_source(columns=['Age','Sex','Survived'], count=3)
df_source.get_df()

Unnamed: 0,Age,Sex,Survived
0,38.0,female,1
1,35.0,female,1
2,35.0,male,0


## Summary

In this section, the big, real-data datasets are covered:
* Creating a production-ready `FeaturizationJob` that creates dataset
* Accessing the created dataset with the `Dataset` class
* `FileSyncer` as the primary interface to syncronize the local data with the remote host