<a href="https://colab.research.google.com/github/alexandergribenchenko/Data_Science_Self_Study/blob/main/01_Prog_Ori_Obj_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformador personalizado multicolumna para Pyspark

# A. Librerías de trabajo

In [None]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasInputCols, HasOutputCols, Param, Params, TypeConverters
from pyspark import keyword_only
from pyspark.ml import Pipeline, PipelineModel

import pyspark.sql.functions as F
from pyspark.sql.types import StringType,BooleanType,DateType, FloatType

# 01. Carga del dataset raw

In [None]:
location_input = '/mnt/'+'adv/Ancillaries/Sample_Input_Data/'
input_name = 'titanic.csv'

In [None]:
df_raw = spark.read\
     .format('csv')\
     .option('header', 'true')\
     .option('sep',',')\
     .option('mode', 'DROPMALFORMED')\
     .load(location_input+ input_name).limit(10)

In [None]:
df_raw.show()

In [None]:
df_raw.printSchema()

# 02. Transformadores

## 02.01. FeatureSelector

In [None]:
class FeatureSelector(Transformer, HasInputCols):
  
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset.select(*input_columns)
    return answer

In [None]:
columns_selected = ['PassengerId', 'Pclass', 'Sex', 'Age', 'Fare','Survived']

In [None]:
Transformer_FeatureSelector = FeatureSelector(inputCols=columns_selected)

In [None]:
df_transformed_01 = Transformer_FeatureSelector.transform(df_raw)
df_transformed_01.show()

In [None]:
df_raw.show()

## 02.02. TypeAssignatorFloat

In [None]:
class TypeAssignatorFloat(Transformer, HasInputCols):
  
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    for col_i in input_columns: 
      answer=answer.withColumn(col_i,F.col(col_i).cast(FloatType()))
    return answer

In [None]:
df_transformed_01.show()

In [None]:
df_transformed_01.printSchema()

In [None]:
Transformer_TypeAssignatorFloat = TypeAssignatorFloat(inputCols=['Age','Fare'])

In [None]:
df_transformed_02 = Transformer_TypeAssignatorFloat.transform(df_transformed_01)
df_transformed_02.show()

In [None]:
df_transformed_02.printSchema()

## 02.03. LogScaler

In [None]:
class LogScaler(Transformer, HasInputCols):
  
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    for col_i in input_columns: 
      answer=answer.withColumn(col_i, F.log(F.col(col_i)))
    return answer

In [None]:
df_transformed_02.show()

In [None]:
df_transformed_02.printSchema()

In [None]:
Transformer_LogScaler = LogScaler(inputCols=['Age','Fare'])

In [None]:
df_transformed_03 = Transformer_LogScaler.transform(df_transformed_02)
df_transformed_03.show()

# 03. Pipelines

## 03.01. Pipeline

In [None]:
pipeline_my = Pipeline(stages=[
  Transformer_FeatureSelector,
  Transformer_TypeAssignatorFloat, 
  Transformer_LogScaler
])

In [None]:
type(pipeline_my)

In [None]:
pipelineModel = pipeline_my.fit(df_raw)

In [None]:
type(pipelineModel)

In [None]:
dir(pipeline_my)

In [None]:
dir(pipelineModel)

In [None]:
transformedDF = pipelineModel.transform(df_raw)

transformedDF.show()

In [None]:
type(df_raw)

In [None]:
dir(df_raw)