![*INTERTECHNICA - SOLON EDUCATIONAL PROGRAMS - TECHNOLOGY LINE*](https://solon.intertechnica.com/assets/IntertechnicaSolonEducationalPrograms-TechnologyLine.png)

# Data Manipulation with Python - Advanced Data Manipulation - Data Pipelines

*Basic initialization of the workspace.*

In [None]:
!python -m pip install numpy
import numpy as np
print ("NumPy installed at version: {}".format(np.__version__))

NumPy installed at version: 1.19.5


In [None]:
!python -m pip install pandas
import pandas as pd
print ("Pandas installed at version: {}".format(pd.__version__))

#adjust pandas DataFrame display for a wider target 
pd.set_option('display.expand_frame_repr', False)

# disable warnings for chained assignment
pd.set_option('mode.chained_assignment', None)

Pandas installed at version: 1.3.5


In [None]:
!python -m pip install sklearn
import sklearn as skl
import sklearn.base as sklb
import sklearn.preprocessing as sklpre
import sklearn.pipeline as sklpipe
import sklearn.compose as sklcompose

print ("Sklearn installed at version: {}".format(skl.__version__))

Sklearn installed at version: 1.0.2


In [None]:
import warnings

# supress RuntimeWarnings that are not relevant
warnings.filterwarnings("ignore")

### 1 Loading Data

We will focus on processing a dataset focused on immigration data. It contains the number of foreign born citizens in different countries, considering different genders in different years. 

The dataset values have been pre-processed so that the immigrant stock values have been imputed, numerical features have been scaled and the categorical values have been one-hot encoded.


#### 1.1 Loading and exploring data

First of all, loading data and basic exploration of the dataset structure is required. 

In [None]:
# load data for processing
loaded_data = pd.read_parquet(
    "https://github.com/INTERTECHNICA-BUSINESS-SOLUTIONS-SRL/CourseDataManipulationWithPython/raw/main/Module%204%20-%20Advanced%20Data%20Manipulation/Session%202%20-%20Advanced%20Data%20Manipulation/data/migration_dataset_extended.parquet"
)

print(
    "A sample of of loaded data is \n {}".format(
      loaded_data[0:10]  
    )
)

A sample of of loaded data is 
      Year COU_ORIG Origin Country Gender COU_DEST Destination Country  Immigrant Stock  Origin Country Population  Destination Country Population
0  2000.0      AFG    Afghanistan    MEN      AUS           Australia           6500.0                 20779957.0                      19153000.0
1  2001.0      AFG    Afghanistan    MEN      AUS           Australia           7410.0                 21606992.0                      19413000.0
2  2002.0      AFG    Afghanistan    MEN      AUS           Australia           8710.0                 22600774.0                      19651400.0
3  2003.0      AFG    Afghanistan    MEN      AUS           Australia           9260.0                 23680871.0                      19895400.0
4  2004.0      AFG    Afghanistan    MEN      AUS           Australia           9810.0                 24726689.0                      20127400.0
5  2005.0      AFG    Afghanistan    MEN      AUS           Australia          10600.0      

We can observe that the dataset has several features:

*  **Year** - the year of observation;
*  **COU_ORIG** - the ISO3 code for the country of origin;
*  **Origin Country** - the name of the country of origin (country of birth/nationality);
*  **Gender** - the gender of the immigrants;
*  **COU_DEST** - the country of destination (country of residence);
*  **Destination Country** - the country of residence;
*  **Immigrant Stock** - the number of immigrants (foreign born citizens)
*  **Origin Country Population** - the population in the country of origin;
*  **Destination Country Population** - the population in the country of destination.


#### 1.2 Setting up the data transformation strategy

We would like to perform the following changes in the data:

1.   Filter out the records that are not valid, e.g. with invalid values for the year feature; 
2.   Encode categorical items;
3.   Scale the values for origin and destination country population;
4.   Include the Year and Immigrant Stock feature values along with the other processed data; 
5.   Extract a dataset for estimating (or otherwise imputing) the Immigrant Stock missing values.

We would like to proceed with a reusable and composable manner offered by the usage of data pipelines.


##### 1.2.1 Understanding basic data processing concepts

From a data transformation perspective, the sklearn library uses several out of the box mechanisms to ensure that there are strong, reusable, mechanisms for data transformation:

*    Data pipelines represented by the [**Pipeline**](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html) class used to assemble together different data transformation steps;
*    Column trasformers represented by the [**ColumnTransformer**](https://scikit-learn.org/stable/modules/generated/sklearn.compose.ColumnTransformer.html) class used to transform columnar data from various sources;
*    Feature union represented by the [**FeatureUnion**](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.FeatureUnion.html) class that is used to assemble together multiple transformation results.

It is also possible for defining custom data transformers for further customizing the data processing mechanisms.

##### 1.2.2 Understanding data transformers

Data transformers are the basic building blocks for implementing data transformation with sklearn.

A transformer is based on the following classes:

*    [**BaseEstimator**](https://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html) that defines the basic functionality for a transformer (such support for fitting and transforming data);
*    [**TransformerMixin**](https://scikit-learn.org/stable/modules/generated/sklearn.base.TransformerMixin.html) which enables additional support, more precisely fitting and transforming data in a single operation.

Basically a data transformer should support the following methods:

*   **fit(x, y = None, **fit_params)** where the transformer learns patterns of data to use for further transformations. The x parameter represents the input data and the y parameter represents labels for the data (such as in case of supervised learning).

In case of simple transformations, learning data pattern is not necessary - so the fit method does nothing in this case. This method returns the transformer whose state has been updated by fitting the input data;

*   **transform(x, y = None, **fit_params)** where the transformer receives the input data (x parameter) and returns the transformed data;
*   **get_feature_names_out(input_features=None)** returning the names of the transformed features. 

The TransformerMixin provides also the **fit_transform(X, y=None, **fit_params)** method which calls the **fit** and afterwards the **transform** method on the same data.

The ** **fit_params** parameters represent additional parameters that may be provided for the fitting and transforming methods. 


##### 1.2.3 Implementing phase 1: filter out the records that are not valid 

Let's implement the first phase of our data transformation strategy by using a custom transformer.

In [None]:
# Phase 1: Encode categorical items related to gender, origin country and destination country

class FilterOutTransformer(sklb.BaseEstimator, sklb.TransformerMixin) :
  """
  The filter out transformer will filter out the records that have invalid
  values for a set of specified features
  """
  def __init__(self, features_to_filter_out):
    self.features_to_filter_out = features_to_filter_out

  def fit(self, X, Y = None, ** fit_params) :
    return self

  def transform(self, X, Y = None, ** fit_params) :
    result = X.copy()
    for feature in self.features_to_filter_out :
      result = result[~np.isnan(result[feature])]
    return result

  def get_feature_names_out(self, input_features=None) :
    return input_features

# create a preprocessor for filtering out records
# that have invalid values for Year feature
filter_out_transformer = FilterOutTransformer(["Year"])

# process the data
filter_out_data_processed = filter_out_transformer.fit_transform(loaded_data)
print(
    "A sample of transformed data by filtering out records with invalid Year \
values is \n {} \n with shape \n {}".format(
        filter_out_data_processed[0:10],
        filter_out_data_processed.shape
    )
)

# display a sample of the generated featured
print(
      "A sample of the features generated by the transformer is \n{} ".format(
        filter_out_transformer.get_feature_names_out(
            loaded_data.columns
        )[0:10]    
      )
    )

A sample of transformed data by filtering out records with invalid Year values is 
      Year COU_ORIG Origin Country Gender COU_DEST Destination Country  Immigrant Stock  Origin Country Population  Destination Country Population
0  2000.0      AFG    Afghanistan    MEN      AUS           Australia           6500.0                 20779957.0                      19153000.0
1  2001.0      AFG    Afghanistan    MEN      AUS           Australia           7410.0                 21606992.0                      19413000.0
2  2002.0      AFG    Afghanistan    MEN      AUS           Australia           8710.0                 22600774.0                      19651400.0
3  2003.0      AFG    Afghanistan    MEN      AUS           Australia           9260.0                 23680871.0                      19895400.0
4  2004.0      AFG    Afghanistan    MEN      AUS           Australia           9810.0                 24726689.0                      20127400.0
5  2005.0      AFG    Afghanistan    MEN

##### 1.2.4 Implementing phase 2: encode categorical items

The categorical items will be one-hot encoded using out of the box mechanisms. They are represented by the ColumnTransformer data transformer using the standard sklearn's one hot encoder.

In [None]:
# Phase 2: Encode categorical items related to gender, origin country and destination country

# determine the categorical features to be one-hot encoded
gender_categorical_features = ["Gender"]
countries_categorical_features = ["COU_ORIG", "COU_DEST"]

# create a pre-processor for categorical features 
# it is basically a column transformer that applies
# a one-hot encoder for the gender variable 
# and another for the country-related variables
categorical_transformer = sklcompose.ColumnTransformer(
  transformers = [  
    ( 
      "gender_encoder", 
      sklpre.OneHotEncoder(sparse = False), 
      gender_categorical_features
    ),
    (
      "countries_encoder", 
      sklpre.OneHotEncoder(sparse = False), 
      countries_categorical_features
    )
  ]
)

# process the data
categorical_data_preprocessed = categorical_transformer.fit_transform(loaded_data)
print(
    "A sample of transformed data is \n {} \n with shape \n {}".format(
        categorical_data_preprocessed[0:10],
        categorical_data_preprocessed.shape
    )
)

# display a sample of the generated featured
print(
      "A sample of the features generated by the transformer is \n{} ".format(
        categorical_transformer.get_feature_names_out(
            loaded_data.columns
        )[0:10]    
      )
    )

A sample of transformed data is 
 [[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]] 
 with shape 
 (289675, 247)
A sample of the features generated by the transformer is 
['gender_encoder__Gender_MEN' 'gender_encoder__Gender_WMN'
 'gender_encoder__Gender_None' 'countries_encoder__COU_ORIG_AFG'
 'countries_encoder__COU_ORIG_AGO' 'countries_encoder__COU_ORIG_ALB'
 'countries_encoder__COU_ORIG_AND' 'countries_encoder__COU_ORIG_ARE'
 'countries_encoder__COU_ORIG_ARG' 'countries_encoder__COU_ORIG_ARM'] 


##### 1.2.5 Implementing phase 3: scale the values for origin and destination country population 

The numerical values for origin and destination country population
will be scaled using out of the box mechanisms. We will use again the ColumnTransformer data transformer this time with a min-max scaler.

In [None]:
# Phase 3: Scale the values for origin and destination country population 

# determine the numerical features to be scaled
country_numerical_features = ["Origin Country Population", "Destination Country Population"]

# create a pre-processor for categorical features 
# it is basically a column transformer that applies
# min-max scaling for countries population
country_numerical_transformer = sklcompose.ColumnTransformer(
  transformers = [ 
    (
      "country_population_scaler", 
      sklpre.MinMaxScaler(), 
      country_numerical_features
    )
  ]
)

# process the data
country_numerical_data_preprocessed = country_numerical_transformer.fit_transform(loaded_data)

# display samples of data along with basic output information 
print(
    "A sample of transformed data is \n{}\n with shape {} \n".format(
        country_numerical_data_preprocessed[0:10],
        country_numerical_data_preprocessed.shape
    )
)

# display a sample of the generated featured
print(
      "A sample of the features generated by the transformer is \n{} ".format(
        country_numerical_transformer.get_feature_names_out(
            loaded_data.columns
        )[0:10]    
      )
)

A sample of transformed data is 
[[0.01472129 0.05732572]
 [0.01530746 0.05811551]
 [0.01601181 0.05883968]
 [0.01677734 0.05958087]
 [0.01751857 0.0602856 ]
 [0.018176   0.06109786]
 [0.01872797 0.06201857]
 [0.01920105 0.06241255]
 [0.01964172 0.06369322]
 [0.02011837 0.06503738]]
 with shape (289675, 2) 

A sample of the features generated by the transformer is 
['country_population_scaler__Origin Country Population'
 'country_population_scaler__Destination Country Population'] 


##### 1.2.6 Implementing phase 4: include the Year and Immigrant Stock feature values along with the other processed data

We need to extract the values for Year and Immigrant Stock so that we have all the features needed to assemble the dataset. This is not supported out of the box by sklearn so we will need a custom transformer for this purpose.

In [None]:
class SelectColumnsTransformer(sklb.BaseEstimator, sklb.TransformerMixin) :
  """
  The filter out transformer will filter out the records that have invalid
  values for a set of specified features
  """
  def __init__(self, columns_to_select):
    self.columns_to_select = columns_to_select

  def fit(self, X, Y = None, ** fit_params) :
    return self

  def transform(self, X, Y = None, ** fit_params) :
    result = X.copy()[self.columns_to_select]
    return result

  def get_feature_names_out(self, input_features=None) :
    return self.columns_to_select

# preprocess a data sample
select_columns_transformer = SelectColumnsTransformer(["Immigrant Stock", "Year"])
select_columns_data_processed =  select_columns_transformer.fit_transform(loaded_data)

# display samples of data along with basic output information 
print(
    "A sample of transformed data is \n{}\n with shape {} \n".format(
        select_columns_data_processed[0:10],
        select_columns_data_processed.shape
    )
)

# display a sample of the generated featured
print(
      "A sample of the features generated by the transformer is \n{} ".format(
        select_columns_transformer.get_feature_names_out(
            loaded_data.columns
        )[0:10]    
      )
)

A sample of transformed data is 
   Immigrant Stock    Year
0           6500.0  2000.0
1           7410.0  2001.0
2           8710.0  2002.0
3           9260.0  2003.0
4           9810.0  2004.0
5          10600.0  2005.0
6          12170.0  2006.0
7          13280.0  2007.0
8          14230.0  2008.0
9          15360.0  2009.0
 with shape (289675, 2) 

A sample of the features generated by the transformer is 
['Immigrant Stock', 'Year'] 


A this point we have all the segments of data needed to assemble the final dataset:

*    We have the values for Year and Immigrant stock;
*    We have the one-hot encoded categorical features;
*    We have the scaled values for origin and destination country population.

These segments of data are joined by using the sklearn's out of the box support represented by FeatureUnion which joins the outputs of multiple transformers.

In [None]:
# Phase 4: include the Year and Immigrant Stock feature values along with the other processed data

# use FeatureUnion to assemble multiple outcomes from data transformers
# we will pass a list of the transformers for which the outputs
# will be joined
union_transformer = sklpipe.FeatureUnion(
      (
        # provides the Year and Immigrant Stock features
        (
          "select_columns_transformer",
          select_columns_transformer
        ),  
        # provides scaled origin and destination country population values  
        (
          "country_numerical_transformer",
          country_numerical_transformer
        ),
        # provides encoded categorical features
        (
          "categorical_transformer",
          categorical_transformer
        )
      )
  )

# process the data
union_data_processed = union_transformer.fit_transform(loaded_data)

# display samples of data along with basic output information 
print(
    "A sample of transformed data is \n{}\n with shape {} \n".format(
        union_data_processed[0:10],
        union_data_processed.shape
    )
)

# display a sample of the generated featured
print(
      "A sample of the features generated by the transformer is \n{} ".format(
        union_transformer.get_feature_names_out(
            loaded_data.columns
        )[0:10]    
      )
)

A sample of transformed data is 
[[6.50000000e+03 2.00000000e+03 1.47212921e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [7.41000000e+03 2.00100000e+03 1.53074593e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [8.71000000e+03 2.00200000e+03 1.60118097e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 ...
 [1.32800000e+04 2.00700000e+03 1.92010536e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [1.42300000e+04 2.00800000e+03 1.96417158e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [1.53600000e+04 2.00900000e+03 2.01183728e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]]
 with shape (289675, 251) 

A sample of the features generated by the transformer is 
['select_columns_transformer__Immigrant Stock'
 'select_columns_transformer__Year'
 'country_numerical_transformer__country_population_scaler__Origin Country Population'
 'country_numerical_transformer__country_population_scaler__Destination Country Population'
 'categorical_trans

##### 1.2.7 Implementing phase 5: extract a dataset for estimating (or otherwise imputing) the Immigrant Stock missing values

So far we have all the pieces needed to implement Phase 5, we need to put them together. We need to cleanup the data and afterwards use this clean data to generate a dataset for Immigrant Stock feature imputation.

For this purpose, we will use another sklearn's out of the box mechanism represented by the Pipeline mechanism.

The data pipelines allow the definition and execution of data transformation flows by assembling a set of data transformation steps. 

In [None]:
# create a data transformation pipeline that 
# will cleanup the initial data, 
# trigger data processing (e.g. scaling and encoding)
# and finally assemble the result
data_transformation_pipeline = sklpipe.Pipeline(
    steps = [
       ("data cleanup", filter_out_transformer),
       ("data processing and assembly", union_transformer)      
    ]
)

In [None]:
# display the pipeline in textual format
data_transformation_pipeline

Pipeline(steps=[('data cleanup',
                 FilterOutTransformer(features_to_filter_out=['Year'])),
                ('data processing and assembly',
                 FeatureUnion(transformer_list=[('select_columns_transformer',
                                                 SelectColumnsTransformer(columns_to_select=['Immigrant '
                                                                                             'Stock',
                                                                                             'Year'])),
                                                ('country_numerical_transformer',
                                                 ColumnTransformer(transformers=[('country_population_scaler',
                                                                                  MinMaxScaler(),
                                                                                  ['Origin '
                                                                      

In [None]:
# process the dataset
data_processed = data_transformation_pipeline.fit_transform(loaded_data)

# display samples of data along with basic output information 
print(
    "A sample of data generated by the data pipeline is is \n{}\n with shape {} \n".format(
        data_processed[0:10],
        data_processed.shape
    )
)

# display a sample of the generated featured
print(
      "A sample of the features generated by the data pipeline is \n{}".format(
        data_transformation_pipeline.get_feature_names_out(
            loaded_data.columns
        )[0:10]    
      )
)

A sample of data generated by the data pipeline is is 
[[6.50000000e+03 2.00000000e+03 1.47212921e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [7.41000000e+03 2.00100000e+03 1.53074593e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [8.71000000e+03 2.00200000e+03 1.60118097e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 ...
 [1.32800000e+04 2.00700000e+03 1.92010536e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [1.42300000e+04 2.00800000e+03 1.96417158e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [1.53600000e+04 2.00900000e+03 2.01183728e-02 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]]
 with shape (289674, 248) 

A sample of the features generated by the data pipeline is 
['select_columns_transformer__Immigrant Stock'
 'select_columns_transformer__Year'
 'country_numerical_transformer__country_population_scaler__Origin Country Population'
 'country_numerical_transformer__country_population_scaler__Destination Country Populat

In [None]:
# create a dataframe with the processed results
processed_dataset = pd.DataFrame(
  columns = data_transformation_pipeline.get_feature_names_out(loaded_data.columns),
  data = data_processed
) 

# display samples of data along with basic output information 
print(
      "A sample of the processed dataset is \n{} ".format(
          processed_dataset[0:10]
      )
)

# at this point we can use the processed data for any machine learning pursposes
# e.g. imputing or predicting values

A sample of the processed dataset is 
   select_columns_transformer__Immigrant Stock  select_columns_transformer__Year  country_numerical_transformer__country_population_scaler__Origin Country Population  country_numerical_transformer__country_population_scaler__Destination Country Population  categorical_transformer__gender_encoder__Gender_MEN  categorical_transformer__gender_encoder__Gender_WMN  categorical_transformer__countries_encoder__COU_ORIG_AFG  categorical_transformer__countries_encoder__COU_ORIG_AGO  categorical_transformer__countries_encoder__COU_ORIG_ALB  categorical_transformer__countries_encoder__COU_ORIG_AND  categorical_transformer__countries_encoder__COU_ORIG_ARE  categorical_transformer__countries_encoder__COU_ORIG_ARG  categorical_transformer__countries_encoder__COU_ORIG_ARM  categorical_transformer__countries_encoder__COU_ORIG_ATG  categorical_transformer__countries_encoder__COU_ORIG_AUS  categorical_transformer__countries_encoder__COU_ORIG_AUT  categorical_transfo