## <font color='blue'>Pyspark : Creation d'estimators et transformers </font>

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('Exple')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Un pipeline est une séquence d'étapes finies, chaque étape est soit un transformer ou estimator.
* Un transformer est un processus qui permet de transformer un dataframe à un autre nécessitant l'implémentation d'un `transformer()`:
    * Exemples
        * Tokenization: segmenter un texte en une liste de mots.
        * On peut créer un transformer qui supprime les variables corrlélées d'un dataframe
* Un estimator est un opérateur qui prend en entrée un dataframe et retourne un transformer. Il necessite l'implémentation d'un `fit()` et d'un `transformer()`
    * Exemples :
        * VectorIndexer: coder des catégories
        * Un algorithme ML de SparkML

Après le fitting, un pipeline retourne un pipelineModel (transformer). Chaque estimator du pipeline intial devient un transformer dans le pipelineModel. 

Spark dispose d'un API spécifique permettant le partage de parametres entre estimators et transformers.

In [3]:
# créer une dataFrame à partir d’une séquence

seq =  [(0, "aa"), (1, "ab"), (2, "cd"), (3, "ad"), (4, "aa")]
dfm = spark.createDataFrame(seq).toDF("ID", "CATEGORY")
dfm.show(5)

+---+--------+
| ID|CATEGORY|
+---+--------+
|  0|      aa|
|  1|      ab|
|  2|      cd|
|  3|      ad|
|  4|      aa|
+---+--------+



**Pyspark Estimators**

In [4]:
from pyspark import keyword_only
from pyspark.sql.functions import col, create_map, lit
from pyspark.ml.param import Params
from pyspark.ml.param.shared import TypeConverters, Param
from pyspark.ml.pipeline import Estimator, Model, Pipeline, DefaultParamsReadable, DefaultParamsWritable
from itertools import chain

class HasInOutputCol(Params):
    """
    Mixin for param columns
    """

    inputCol = Param(Params._dummy(), "inputCol", 
                      "Input column", 
                      typeConverter=TypeConverters.toString)
    outputCol = Param(Params._dummy(), "outputCol", 
                      "Output column name", 
                      typeConverter=TypeConverters.toString)

    def __init__(self):
        super(HasInOutputCol, self).__init__()
        self._setDefault(inputCol=None, outputCol=None)

    def getInputCol(self):
        """
        Gets the list of input columns.
        """
        return self.getOrDefault(self.inputCol)

    def getOutputCol(self):
        """
        Gets the list of output columns.
        """
        return self.getOrDefault(self.outputCol)
    
class HasMappingCategories(Params):
    """
    Mixin for param mappingCat
    """

    mappingCat = Param(Params._dummy(), "mappingCat", 
                        "Mapping category <=> Double", 
                        typeConverter=TypeConverters.toList)

    def __init__(self):
        super(HasMappingCategories, self).__init__()
        self._setDefault(mappingCat=None)

    def getMappingCat(self):
        """
        Gets the list of columns to drop.
        """
        return self.getOrDefault(self.mappingCat)

    def setMappingCat(self, value):
        """
        Gets the list of columns to drop.
        """
        return self._set(mappingCat=value)

class CustomVectorIndexer(Estimator, HasInOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(CustomVectorIndexer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  
        self._setDefault(inputCol=None, outputCol=None)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)
    
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _fit(self, dataset):
        column = self.getInputCol()
        mapping_cat = sorted([cat[column] 
            for cat in dataset.select(column).distinct().collect()])
        mapping_cat = [(value, key) for key, value in enumerate(mapping_cat)]
        return CustomVectorIndexerModel(inputCol=self.getInputCol(),
                                        outputCol=self.getOutputCol(),
                                        mappingCat=mapping_cat)  


class CustomVectorIndexerModel(Model, HasMappingCategories, HasInOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, mappingCat=None):
        super(CustomVectorIndexerModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, mappingCat=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        mapping = create_map([lit(col) for col in chain(*self.getMappingCat())])
        return dataset.withColumn(output_col, mapping[col(input_col)])


In [5]:
vectorizer = CustomVectorIndexer().setInputCol('CATEGORY').setOutputCol('CATEGORY_FITTED')
model  = vectorizer.fit(dfm)
model.transform(dfm).show(truncate=False)

+---+--------+---------------+
|ID |CATEGORY|CATEGORY_FITTED|
+---+--------+---------------+
|0  |aa      |0              |
|1  |ab      |1              |
|2  |cd      |3              |
|3  |ad      |2              |
|4  |aa      |0              |
+---+--------+---------------+



**Pyspark - Transformers**

In [6]:
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml import Pipeline, Transformer, Estimator, Model

class HasDropCol(Params):
    """
    Mixin for param column
    """

    column = Param(Params._dummy(), "column", 
                      "Column to drop", 
                      typeConverter=TypeConverters.toString)

    def __init__(self):
        super(HasDropCol, self).__init__()
        self._setDefault(column=None)

    def getDropCol(self):
        """
        Gets the column to drop.
        """
        return self.getOrDefault(self.column)

class ColumnFilter(Transformer, HasDropCol, DefaultParamsReadable, DefaultParamsWritable):
    """
    Filter columns.
    """
    def __init__(self, column=None):
        """
        __init__(self, column=None)
        """
        super(ColumnFilter, self).__init__()
        self._setDefault(column=column)

    
    def setDropCol(self, value):
        """
        Sets the value of :py:attr:`columns`.
        """
        return self._set(column=value)
    
    def _transform(self, dataset):
        """
        Drop columns.
        """
        column = self.getDropCol()
        if column:
            return dataset.drop(column)
        else:
            return dataset


In [7]:
filter_ = ColumnFilter().setDropCol('ID')
filter_.transform(dfm).show()

+--------+
|CATEGORY|
+--------+
|      aa|
|      ab|
|      cd|
|      ad|
|      aa|
+--------+



In [8]:
vectorizer = CustomVectorIndexer().setInputCol('CATEGORY').setOutputCol('CATEGORY_FITTED')
filter_ = ColumnFilter().setDropCol('CATEGORY')
model  = Pipeline(stages=[vectorizer, filter_]).fit(dfm)
model.transform(dfm).show(truncate=False)

+---+---------------+
|ID |CATEGORY_FITTED|
+---+---------------+
|0  |0              |
|1  |1              |
|2  |3              |
|3  |2              |
|4  |0              |
+---+---------------+



#### <font color='red'>Remarque : </font>

1. Les estimators et transformers fonctionnent correctement. Cependant les modèles créés ne peuvent pas être sauvagardés et utilsés ultérieurment.


2. Une solution serait de s'inspirer du développement de Pyspark :    
    a. Les estimators et transformers sont créés en Scala    
    b. Pour utiliser ces objets avec Python, on crée un wrapper