# Transformadores Genéricos

In [0]:
from pyspark.sql.types import StringType, BooleanType, FloatType, IntegerType, DoubleType, DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import sum, col, desc, asc, count, countDistinct, round, max, min, avg
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql.window import Window

from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasInputCols, HasOutputCols, Param, Params, TypeConverters
from pyspark import keyword_only
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml import Model
from pyspark.ml import Estimator

from datetime import datetime
import numpy as np

## Preprocesamiento

### FeatureSelector

In [0]:
class FeatureSelector(Transformer, HasInputCols):
  
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset.select(*input_columns)
    return answer

### DfFilterByDate

In [0]:
class DfFilterByDate(Transformer, HasInputCols):
  
  list_date = Param(Params._dummy(),"list_date","Diccionario base para renombrar")
  
  @keyword_only
  def __init__(self, list_date=None, inputCols=None):
    super().__init__()
    self._setDefault(list_date=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, list_date=None, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  def setlist_date(self, new_list_date):
    return self.setParams(list_date=new_list_date)
  
  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def getlist_date(self):
    return self.getOrDefault(self.list_date)
  
  def _transform(self, dataset):
    list_date = self.getlist_date()
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    
    date_from = datetime.strptime(list_date[0], '%Y%m%d')
    date_to = datetime.strptime(list_date[1], '%Y%m%d')
    
    answer = answer.filter((answer[input_columns[0]] >= date_from) & 
                           (answer[input_columns[0]] <= date_to))
    return answer

### DfFilterNulls

In [0]:
class DfFilterNulls(Transformer, HasInputCols):
    
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  
  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    
    for col_i in input_columns:
      answer = answer.filter(~answer[col_i].isNull())
    return answer

### TypeAssignatorDate

In [0]:
class TypeAssignatorDate(Transformer, HasInputCols):
  
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    udf_str_to_date = udf(lambda x : datetime.strptime(x, '%Y%m%d') if x!=None else x, DateType())
    for col_i in input_columns: 
      answer=answer.withColumn(col_i, udf_str_to_date(col_i))
    return answer

### TypeAssignatorFloat

In [0]:
class TypeAssignatorFloat(Transformer, HasInputCols):
  
  @keyword_only
  def __init__(self, inputCols=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    for col_i in input_columns: 
      answer=answer.withColumn(col_i,F.col(col_i).cast(FloatType()))
    return answer

### ColumnsRenamer

In [0]:
class ColumnsRenamer(Transformer):
  
  RenameDict = Param(Params._dummy(),"RenameDict","Diccionario base para renombrar")
  
  @keyword_only
  def __init__(self, RenameDict=None):
    super().__init__()
    self._setDefault(RenameDict=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, RenameDict=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  def setRenameDict(self, new_RenameDict):
    return self.setParams(RenameDict=new_RenameDict)
  
  def getRenameDict(self):
    return self.getOrDefault(self.RenameDict)
  
  def _transform(self, dataset):
    answer = dataset
    rename_dict = self.getRenameDict()
    for old_name, new_name in rename_dict.items():
      answer=answer.withColumnRenamed(old_name, new_name)
    return answer

### ColumnsRounder

In [0]:
class ColumnsRounder(Transformer, HasInputCols):
  
  RoundValue = Param(Params._dummy(),"RoundValue","Decimal a redondear", typeConverter=TypeConverters.toInt)
  
  @keyword_only
  def __init__(self, inputCols=None, RoundValue=None):
    super().__init__()
    self._setDefault(RoundValue=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None, RoundValue=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  def setRoundValue(self, new_RoundValue):
    return self.setParams(RoundValue=new_RoundValue)
  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def getRoundValue(self):
    return self.getOrDefault(self.RoundValue)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    answer = dataset
    round_value = self.getRoundValue()
    for col_i in input_columns: 
      answer=answer.withColumn(col_i,round(F.col(col_i),round_value))
    return answer

### DfFilterDuplicates

In [0]:
class DfFilterDuplicates(Transformer):
  
  @keyword_only
  def __init__(self):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  
  def _transform(self, dataset):
    answer = dataset.distinct()
    return answer

### DfFilterLessEqValue

In [0]:
class DfFilterLessEqValue(Transformer, HasInputCols):
  
  Value = Param(Params._dummy(),"Value","Decimal a redondear")
  
  @keyword_only
  def __init__(self, inputCols=None, Value=None):
    super().__init__()
    self._setDefault(Value=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCols=None, Value=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  def setValue(self, new_Value):
    return self.setParams(Value=new_Value)
  def setInputCol(self, new_inputCols):
    return self.setParams(inputCols=new_inputCols)
  
  def getValue(self):
    return self.getOrDefault(self.Value)
  
  def _transform(self, dataset):
    input_columns = (self.getInputCols() if self.isSet("inputCols") else self.getInputCols())
    value = self.getValue()
    
    answer = dataset.filter(~(dataset[input_columns[0]]<=value))
    return answer

# Transformadores RFM

## RFMAggregatorPre

In [0]:
class RFMAggregatorPre(Transformer):
  
  @keyword_only
  def __init__(self):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  
  def _transform(self, dataset):
    # Calcula los valores agregados para el RFM
    answer = dataset.groupBy("Customer_ID")\
             .agg(max("R_base").alias("R_agg"),
                  countDistinct("F_base").alias("F_agg"),
                  round(sum("M_base"),0).alias("M_agg"))\
             .sort(desc("M_agg"))
    
    # Ajusta el valor de R tomando como referencia la última fecha del dataset
    max_R_base = dataset.agg(F.max('R_base')).collect()[0][0]
    udf_delta_R = udf(lambda x : (max_R_base - x).days if x!=None else x, IntegerType())
    answer = answer.withColumn('R_agg', udf_delta_R('R_agg'))
    
#     # Incluye el valor de R_prima_agg que es un factor de si esta adelantado o atrasado respecto a compra
#     answer =  answer.withColumn('R_prima_agg', round(F.col('R_agg')/(180/(F.col('F_agg'))),2))
#     answer = answer.select(*['Customer_ID', 'R_prima_agg', 'R_agg', 'F_agg', 'M_agg'])
    return answer

## RFMSegmentator

In [0]:
class RFMSegmentator(Transformer):
  
  Value = Param(Params._dummy(),"Value","Decimal a redondear")
  
  @keyword_only
  def __init__(self, Value=None):
    super().__init__()
    self._setDefault(Value=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, Value=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  def setValue(self, new_Value):
    return self.setParams(Value=new_Value)
  
  def getValue(self):
    return self.getOrDefault(self.Value)

  def _transform(self, dataset):
    value = self.getValue()
    # Calcula los valores agregados para el RFM
    umbral_de_corte = value 
    R_umbral_de_corte = umbral_de_corte
    F_umbral_de_corte = 1 - umbral_de_corte
    M_umbral_de_corte = 1 - umbral_de_corte
    
    R_percentile_cut = dataset.approxQuantile("R_agg", [umbral_de_corte], 0.01)[0]
    F_percentile_cut = dataset.approxQuantile("F_agg", [umbral_de_corte], 0.01)[0]
    M_percentile_cut = dataset.approxQuantile("M_agg", [umbral_de_corte], 0.01)[0]
    
    df_output = dataset
    
    udf_delta_R = udf(lambda x : 1 if x<R_percentile_cut else 2, IntegerType())
    udf_delta_F = udf(lambda x : 1 if x>F_percentile_cut else 2, IntegerType())
    udf_delta_M = udf(lambda x : 1 if x>M_percentile_cut else 2, IntegerType())
    
    df_output = df_output.withColumn('R', udf_delta_R('R_agg'))
    df_output = df_output.withColumn('F', udf_delta_F('F_agg'))
    df_output = df_output.withColumn('M', udf_delta_M('M_agg'))
    df_output = df_output.withColumn('RFM', F.concat(df_output.R,df_output.F, df_output.M))
    
    print(umbral_de_corte, R_percentile_cut, F_percentile_cut, M_percentile_cut)

#     return df_output, umbral_de_corte, R_percentile_cut
    return df_output

## RFMAggregatorPost

In [0]:
class RFMAggregatorPost(Transformer):
  
  @keyword_only
  def __init__(self):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  
  def _transform(self, dataset):

      Valores_globales = {}
      Valores_globales['N_Total'] = dataset.agg(F.countDistinct('Customer_ID')).collect()[0][0]
      Valores_globales['F_Total'] = dataset.agg(F.sum("F_agg")).collect()[0][0]
      Valores_globales['M_Total'] = np.round(dataset.agg(F.sum("M_agg")).collect()[0][0]/1E6,2)
  
      df_output = dataset.groupBy('RFM', 'R', 'F','M').agg(countDistinct('Customer_ID').alias('N_Customer_ID'), \
#                                             round(avg('R_prima_agg'),2).alias('R_prima_mean'), \
                                            round(avg('R_agg'),2).alias('R_mean'), \
                                            min('R_agg').alias('R_min'), \
                                            max('R_agg').alias('R_max'), \
                                            round(avg('F_agg'),2).alias('F_mean'), \
                                            sum('F_agg').alias('F_acom'), \
                                            round(avg('M_agg'),5).alias('M_mean'), \
                                            round(sum('M_agg')/1E6,2).alias('M_acom')). \
                                            sort(['RFM'], ascending=[True])
    
      df_output = df_output.withColumn('N_Customer_ID_rel',(round(100*F.col('N_Customer_ID')/( Valores_globales['N_Total']),2)))
      df_output = df_output.withColumn('F_acom_rel',(round(100*F.col('F_acom')/( Valores_globales['F_Total']),2)))
      df_output = df_output.withColumn('M_acom_rel',(round(100*F.col('M_acom')/( Valores_globales['M_Total']),2)))
      
      print('N_Total:', Valores_globales['N_Total'])
      print('F_Total:', Valores_globales['F_Total'])
      print('M_Total:', Valores_globales['M_Total'])
      
      return df_output

# Estimadores RFM

### RFMSegmentatorEstimator

In [0]:
class _RFMSegmentatorEstimatorParams():
  percentile_threshold = Param(Params._dummy(), "percentile_threshold", "Percentil que se empleará como umbral de corte para RFM. Default = 0.6.", TypeConverters.toFloat)
  
  def __init__(self, *args):
    super().__init__(*args)
    self._setDefault(percentile_threshold=0.6)
    
  def getpercentile_threshold(self):
    return self.getOrDefault(self.percentile_threshold)

In [0]:
class RFMSegmentatorEstimatorModel(Model, _RFMSegmentatorEstimatorParams):

  R_value_thereshold_ = Param(Params._dummy(), "R_value_thereshold_", "Valor de corte para la variable R." "Values will be capped to this value.", TypeConverters.toFloat)
  F_value_thereshold_ = Param(Params._dummy(), "F_value_thereshold_", "Valor de corte para la variable F." "Values will be capped to this value.", TypeConverters.toFloat)
  M_value_thereshold_ = Param(Params._dummy(), "M_value_thereshold_", "Valor de corte para la variable M." "Values will be capped to this value.", TypeConverters.toFloat)

  
  @keyword_only
  def __init__(self, percentile_threshold, R_value_thereshold_=None, F_value_thereshold_=None, M_value_thereshold_=None ):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, percentile_threshold, R_value_thereshold_=None, F_value_thereshold_=None, M_value_thereshold_=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
#   def setValue(self, new_Value):
#     return self.setParams(Value=new_Value)
    
    
    
    
    
    
    
    
#       @keyword_only
#   def setParams(self, inputCols=None, Value=None):
#       kwargs = self._input_kwargs
#       return self._set(**kwargs)
#   def setValue(self, new_Value):
#     return self.setParams(Value=new_Value)
#   def setInputCol(self, new_inputCols):
#     return self.setParams(inputCols=new_inputCols)
  
#   def getValue(self):
#     return self.getOrDefault(self.Value)
    
    
    
    
    
    
    
    
    
  def _transform(self, dataset):
    R_value_thereshold = self.getOrDefault('R_value_thereshold_')
    F_value_thereshold = self.getOrDefault('F_value_thereshold_')
    M_value_thereshold = self.getOrDefault('M_value_thereshold_')
    percentile_threshold = self.getOrDefault('percentile_threshold')
    
    df_output = dataset
    
    udf_delta_R = udf(lambda x : 1 if x<R_value_thereshold else 2, IntegerType())
    udf_delta_F = udf(lambda x : 1 if x>F_value_thereshold else 2, IntegerType())
    udf_delta_M = udf(lambda x : 1 if x>M_value_thereshold else 2, IntegerType())
    
    df_output = df_output.withColumn('R', udf_delta_R('R_agg'))
    df_output = df_output.withColumn('F', udf_delta_F('F_agg'))
    df_output = df_output.withColumn('M', udf_delta_M('M_agg'))
    df_output = df_output.withColumn('RFM', F.concat(df_output.R,df_output.F, df_output.M))
    
    print(R_value_thereshold, F_value_thereshold, M_value_thereshold)
#     print(percentile_threshold, R_value_thereshold, F_value_thereshold, M_value_thereshold)
    
    return df_output

In [0]:
class RFMSegmentatorEstimator(Estimator, _RFMSegmentatorEstimatorParams):
  
  @keyword_only
  def __init__(self, percentile_threshold=None):
    super().__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, percentile_threshold=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)
  def setpercentile_threshold(self, new_percentile_threshold):
    return self.setParams(percentile_threshold=new_percentile_threshold) 
    
  def _fit(self, dataset):
    percentile_threshold = self.getpercentile_threshold()
    
    R_percentil_de_corte = percentile_threshold
    F_percentil_de_corte = 1 - percentile_threshold
    M_percentil_de_corte = 1 - percentile_threshold
    
    R_value_thereshold = dataset.approxQuantile("R_agg", [R_percentil_de_corte], 1)[0]
    F_value_thereshold = dataset.approxQuantile("F_agg", [F_percentil_de_corte], 1)[0]
    M_value_thereshold = dataset.approxQuantile("M_agg", [M_percentil_de_corte], 1)[0]
    
    return RFMSegmentatorEstimatorModel(R_value_thereshold_=R_value_thereshold, F_value_thereshold_ = F_value_thereshold, M_value_thereshold_ = M_value_thereshold, percentile_threshold=percentile_threshold )

In [0]:
# class RFMSegmentatorEstimator(Transformer):
  
#   Value = Param(Params._dummy(),"Value","Decimal a redondear")
  
#   @keyword_only
#   def __init__(self, Value=None):
#     super().__init__()
#     self._setDefault(Value=None)
#     kwargs = self._input_kwargs
#     self.setParams(**kwargs)
    
#   @keyword_only
#   def setParams(self, Value=None):
#       kwargs = self._input_kwargs
#       return self._set(**kwargs)
#   def setValue(self, new_Value):
#     return self.setParams(Value=new_Value)
  
#   def getValue(self):
#     return self.getOrDefault(self.Value)

#   def _transform(self, dataset):
#     value = self.getValue()
#     # Calcula los valores agregados para el RFM
#     umbral_de_corte = value 
#     R_umbral_de_corte = umbral_de_corte
#     F_umbral_de_corte = 1 - umbral_de_corte
#     M_umbral_de_corte = 1 - umbral_de_corte
    
#     R_percentile_cut = dataset.approxQuantile("R_agg", [umbral_de_corte], 0.01)[0]
#     F_percentile_cut = dataset.approxQuantile("F_agg", [umbral_de_corte], 0.01)[0]
#     M_percentile_cut = dataset.approxQuantile("M_agg", [umbral_de_corte], 0.01)[0]
    
#     df_output = dataset
    
#     udf_delta_R = udf(lambda x : 1 if x<R_percentile_cut else 2, IntegerType())
#     udf_delta_F = udf(lambda x : 1 if x>F_percentile_cut else 2, IntegerType())
#     udf_delta_M = udf(lambda x : 1 if x>M_percentile_cut else 2, IntegerType())
    
#     df_output = df_output.withColumn('R', udf_delta_R('R_agg'))
#     df_output = df_output.withColumn('F', udf_delta_F('F_agg'))
#     df_output = df_output.withColumn('M', udf_delta_M('M_agg'))
#     df_output = df_output.withColumn('RFM', F.concat(df_output.R,df_output.F, df_output.M))
    
#     print(umbral_de_corte, R_percentile_cut, F_percentile_cut, M_percentile_cut)

# #     return df_output, umbral_de_corte, R_percentile_cut
#     return df_output

# Espacio de pruebas

In [0]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Perro", "Sales", 15), \
    ("Perro", "Sales", 15) \
  ]
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)

In [0]:
df.show()

In [0]:
df.limit(3).show()

In [0]:
params_DfFilterLessEqValue = {}
params_DfFilterLessEqValue['Value']= 4000
params_DfFilterLessEqValue['inputCols']= ['salary']

In [0]:
Transformer_DfFilterLessEqValue = DfFilterLessEqValue(Value=params_DfFilterLessEqValue['Value'], inputCols=params_DfFilterLessEqValue['inputCols'])

In [0]:
Transformer_DfFilterLessEqValue.transform(df).show()