# Stage personalizado para transformar variables numércias de alto sesgo (skew)

La creación de objeto personalizado tomada de https://stackoverflow.com/questions/37270446/how-to-create-a-custom-estimator-in-pyspark

Librerias necesarias

In [0]:
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import Params, Param, TypeConverters, HasInputCols, HasOutputCols
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark import keyword_only  

Declarar parámetros a estimar.

In [0]:
class HasSkewnessList(Params):
    """Sesgos (skewness) de las variables numericas para decidir si transformar o no con log1p...."""

    skewnessList = Param(Params._dummy(), "skewnessList", "List of skewness for each column in InputCols", 
        typeConverter=TypeConverters.toListFloat)

    def __init__(self):
        super(HasSkewnessList, self).__init__()

    def setSkewnessList(self, value):
        return self._set(skewnessList=value)

    def getSkewnessList(self):
        return self.getOrDefault(self.skewnessList)
    

class HasSkewnessThreshold(Params):
    """Umbral para el sesgo (skewness) de las variables numericas para decidir si transformar o no con log1p...."""

    skewnessThreshold = Param(Params._dummy(),
            "skewnessThreshold", "Umbral de sesgo (skewness)",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasSkewnessThreshold, self).__init__()

    def setSkewnessThreshold(self, value):
        return self._set(skewnessThreshold=value)

    def getSkewnessThreshold(self):
        return self.getOrDefault(self.skewnessThreshold)

Declarar estimador.

In [0]:
class Log1pSkewed(Estimator, HasInputCols, HasOutputCols, 
                  HasSkewnessList, HasSkewnessThreshold, # parametros personalizados
        # Available in PySpark >= 2.3.0 
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCols=None, outputCols=None, skewnessList=[], skewnessThreshold=None):
        super(Log1pSkewed, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    # Required in Spark >= 3.0
    def setInputCols(self, values):
        """
        Sets the value of :py:attr:`inputCols`.
        """
        return self._set(inputCols=values)

    # Required in Spark >= 3.0
    def setOutputCols(self, values):
        """
        Sets the value of :py:attr:`outputCols`.
        """
        return self._set(outputCols=values)

    @keyword_only
    def setParams(self, inputCols=None, outputCols=None, skewnessList=[], skewnessThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        

    def _fit(self, dataset):
        icols = self.getInputCols()
        
        isNumeric = all([(isinstance(f.dataType, IntegerType) or\
                          isinstance(f.dataType, LongType) or\
                          isinstance(f.dataType, DoubleType) or\
                          isinstance(f.dataType, DecimalType))
                         for f in dataset.select(icols).schema.fields])
        
        if not isNumeric:
            raise ValueError("No todas las columnas en InputCols son numericas")
        
        skew = dataset\
        .select(*(f.skewness(f.col(c)).alias(c) for c in icols))\
        .collect()
        skew = list(skew[0])
        
        return Log1pSkewedModel(
            inputCols=icols, skewnessList=skew, skewnessThreshold=self.getSkewnessThreshold(),
            outputCols=self.getOutputCols())

Definir transformador (model)

In [0]:
class Log1pSkewedModel(Model, HasInputCols, HasOutputCols,
        HasSkewnessList, HasSkewnessThreshold, # parametros personalizados
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCols=None, outputCols=None,
                skewnessList=[], skewnessThreshold=None):
        super(Log1pSkewedModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCols=None, outputCols=None,
                skewnessList=[], skewnessThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)           

    def _transform(self, dataset):
        icols = self.getInputCols()
        ocols = self.getOutputCols()
        skew = self.getSkewnessList()
        skewnessThreshold = self.getSkewnessThreshold()
            
        for ind in range(len(skew)):
            icol = icols[ind]
            ocol = ocols[ind]
            s = skew[ind]
            
            
            dataset = dataset.withColumn(icol, f.when(dataset[icol]<0, 0).otherwise(dataset[icol]))
            if s > skewnessThreshold: 
                dataset = dataset.withColumn(ocol, f.log1p(dataset[icol]))
            else:
                dataset = dataset.withColumn(ocol, dataset[icol])
            if icol != ocol:
                dataset = dataset.drop(icol)

        return dataset 