In [1]:
from typing import List
# from databricks import koalas as ks , we do not neven need koalsa, because it is now built in with databricks
import pyspark.pandas as ps
from pyspark.sql import DataFrame as SparkDataFrame

from pyspark.ml.feature import VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer
from pyspark.ml import Pipeline 



In [2]:

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class Connector(
        Transformer, HasInputCol, HasOutputCol,
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(Connector, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None,):
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        dataset = dataset

        return dataset

In [3]:

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class ColumnHandler(
        Transformer, HasInputCol, HasOutputCol,
        DefaultParamsReadable, DefaultParamsWritable):
    
    delete_col = Param(Params._dummy(), "delete_col", "delete_col",
                      typeConverter=TypeConverters.toString)
    
    replace_col = Param(Params._dummy(), "replace_col", "replace_col",
                      typeConverter=TypeConverters.toString)


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None,delete_col=None,replace_col=None):
        super(ColumnHandler, self).__init__()
        self.delete_col = Param(self, "delete_col", "")
        self.replace_col = Param(self, "replace_col", "")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None,delete_col=None,replace_col=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def setDeleteCol(self, value):
        return self._set(delete_col=value)

    def getDeleteCol(self):
        return self.getOrDefault(self.delete_col)
    
        
    def setReplaceCol(self, value):
        return self._set(replace_col=value)

    def getReplaceCol(self):
        return self.getOrDefault(self.replace_col)


    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        delete_col = self.getDeleteCol()
        replace_col = self.getReplaceCol()
        
        dataset = dataset.drop(*delete_col) 
        dataset = dataset.withColumnRenamed(replace_col,delete_col) 

        return dataset

In [45]:

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class Custom_OneHotEncoder(
        Transformer, HasInputCol, HasOutputCol,
        DefaultParamsReadable, DefaultParamsWritable):
    
    input_columns = Param(Params._dummy(), "input_columns", "input_columns",
                      typeConverter=TypeConverters.toListString)
    
    output_column = Param(Params._dummy(), "output_column", "output_column",
                      typeConverter=TypeConverters.toString)


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None,input_columns=None,output_column=None):
        super(Custom_OneHotEncoder, self).__init__()
        self.input_columns = Param(self, "input_columns", "")
        self.output_column = Param(self, "output_column", "")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None,input_columns=None,output_column=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def setInput_columns(self, value):
        return self._set(input_columns=value)

    def getInput_columns(self):
        return self.getOrDefault(self.input_columns)
    
        
    def setOutput_column(self, value):
        return self._set(output_column=value)

    def getOutput_column(self):
        return self.getOrDefault(self.output_column)


    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        
        input_cols = self.getInput_columns()
        output_col = self.getOutput_column()
        
        indexer = StringIndexer(inputCols=input_cols,outputCols= [column + str("_indexer") for column in input_cols])
        encoder = OneHotEncoder(inputCols=[column + str("_indexer") for column in input_cols] , outputCols= 
                                [column + str("_encoded") for column in input_cols], dropLast=False)
        assembler = VectorAssembler(inputCols= [column + str("_encoded") for column in input_cols],
                                    outputCol= output_col)
        
        

        return indexer,encoder,assembler

In [4]:
from SparkAutoML.ml_module.preprocessing_module.preprocess_file import Preprocessor
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

In [5]:
# read data 
df = ps.read_csv("credit.csv")

In [6]:
df.columns

Index(['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_1', 'PAY_2',
       'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2',
       'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1',
       'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6', 'default'],
      dtype='object')

In [7]:
#def bool_function(df:ks.DataFrame)->ks.DataFrame:
def bool_function(df):
  df =ps.sql("""select *,
  case when mean_bal > 50000 then 1 else 0 end as new_bol_sql
  from {df}""")
  return df

In [8]:
# that is how you can apply the full stack piping in koalas
df= (
df
[['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE']]
.pipe(lambda x: ps.sql(""" select * , case when LIMIT_BAL < 100000 then 'low' when LIMIT_BAL between 100000 and 200000 then 'medium' else 'high' end as BAL_CAT from {x}"""))
.pipe(lambda x: ps.sql(""" select * , case when AGE < 30 then 'youth' when AGE between 30 and 40 then 'yong' else 'mature' end as maturity from {x}"""))
)

In [9]:
df.head()

Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,BAL_CAT,maturity
0,20000,2,2,1,24,low,youth
1,90000,2,2,2,34,low,yong
2,50000,2,2,1,37,low,yong
3,50000,1,2,1,57,low,mature
4,50000,1,1,2,37,low,yong


In [10]:
from pyspark.ml.feature import (
    VectorAssembler,
    VectorIndexer,
    OneHotEncoder,
    StringIndexer,
    Imputer,
    Normalizer,
    StandardScaler,
)
from pyspark.ml import Pipeline

In [10]:
StandardScaler()

StandardScaler_3e9ad9ef8f3f

In [11]:
import numpy as np
# df.iloc[0,4] = np.nan
# df.AGE = df.AGE.astype(float)
spark_df = df.to_spark()
# spark_df = spark_df.withColumn('AGE', spark_df['AGE'].cast(IntegerType()))
train,test = spark_df.randomSplit([.70,.30],seed=123)

In [12]:
spark_df.columns

['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'BAL_CAT', 'maturity']

In [29]:
def _pipe(num_cols,cat_cols=None, impute=False, impute_strategy='mean', normalize=False, normalize_p=2, 
          standard_scale =False,standard_scaler_withMean=False,standard_scaler_withStd=True,):
    
    # first we deploy imputer
    if impute:
        imputer = Imputer(inputCols=num_cols,outputCols=num_cols,strategy=impute_strategy)
    else:
        imputer = Connector()
        
    # we need Vector Assambler here anyway
    assemble_numeric = VectorAssembler(inputCols=num_cols,outputCol='numeric_features')
    
    # if we need normalizer
    if normalize:
        normalizer = Normalizer(inputCol='numeric_features',outputCol='numeric_features1')
        column_handler1 = ColumnHandler(delete_col='numeric_features',replace_col='numeric_features1')
    else:
        normalizer = Connector()
        column_handler1 = Connector()
    
    # standard scaller
    if standard_scale:
        standard_scaler = StandardScaler(inputCol='numeric_features',outputCol='numeric_features1',
                                        withMean=standard_scaler_withMean,withStd=standard_scaler_withStd)
        column_handler2 = ColumnHandler(delete_col='numeric_features',replace_col='numeric_features1')
    else:
        standard_scaler = Connector()
        column_handler2 = Connector()
        
        
        
        
        
    
    
    p = Pipeline(stages=[imputer,assemble_numeric,normalizer,column_handler1,standard_scaler,column_handler2])
    
    return p
    

In [30]:
# imp = Imputer(inputCols=['AGE','SEX'],outputCols=['AGE','SEX'])
# assmb = VectorAssembler(inputCols=['AGE','SEX'],outputCol='features')
# norm = nnormalizer()
# # assmb2 = VectorAssembler(inputCols=['features1'],outputCol='features2')
# pipe = Pipeline(stages = [imp])


pipe = _pipe(num_cols=['AGE','SEX'],cat_cols=None,impute=False,impute_strategy='mean',normalize=True,normalize_p=2,standard_scale=False)
fitted = pipe.fit(spark_df)
fitted.transform(spark_df).show()

+---------+---+---------+--------+---+-------+--------+--------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|BAL_CAT|maturity|    numeric_features|
+---------+---+---------+--------+---+-------+--------+--------------------+
|    20000|  2|        2|       1| 24|    low|   youth|[0.99654575824487...|
|    90000|  2|        2|       2| 34|    low|    yong|[0.99827437317499...|
|    50000|  2|        2|       1| 37|    low|    yong|[0.99854227327750...|
|    50000|  1|        2|       1| 57|    low|  mature|[0.99984614201001...|
|    50000|  1|        1|       2| 37|    low|    yong|[0.99963496987285...|
|   100000|  2|        2|       2| 23| medium|   youth|[0.99624058819568...|
|   140000|  2|        3|       1| 28| medium|   youth|[0.99745869983073...|
|    20000|  1|        3|       2| 35|    low|    yong|[0.99959208646069...|
|   200000|  2|        3|       2| 34| medium|    yong|[0.99827437317499...|
|   260000|  2|        1|       2| 51|   high|  mature|[0.99923195073154...|

In [28]:
pipe = Preprocessor(training_data=train
                     ,hold_out_data=test,
                     target_feature='LIMIT_BAL',
                     numeric_features= ['AGE','SEX'],
                     categorical_features=["BAL_CAT","maturity"],
                     impute_missing_values=True, missing_value_strategy="mean",
                     normalize= True,
                     standard_scale=True
                      )

In [29]:
pipe.run_pipeline()

In [30]:
pipe.fit_transform()

In [31]:
pipe.train_data_transformed.show()

+---------+---+---------+--------+---+-------+--------+--------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|BAL_CAT|maturity|    numeric_features|
+---------+---+---------+--------+---+-------+--------+--------------------+
|  10000.0|  1|        1|       1| 38|    low|    yong|[954.733573425178...|
|  10000.0|  1|        1|       1| 40|    low|    yong|[954.765784924959...|
|  10000.0|  1|        1|       1| 48|    low|  mature|[954.856907896601...|
|  10000.0|  1|        1|       1| 50|    low|  mature|[954.873147092114...|
|  10000.0|  1|        1|       2| 22|    low|   youth|[954.078992409441...|
|  10000.0|  1|        1|       2| 22|    low|   youth|[954.078992409441...|
|  10000.0|  1|        1|       2| 23|    low|   youth|[954.162673354240...|
|  10000.0|  1|        1|       2| 23|    low|   youth|[954.162673354240...|
|  10000.0|  1|        1|       2| 23|    low|   youth|[954.162673354240...|
|  10000.0|  1|        1|       2| 23|    low|   youth|[954.162673354240...|

In [47]:
oneh = Custom_OneHotEncoder(input_columns=['BAL_CAT','maturity'],output_column='cat_features')
indx,enc,ass = oneh.transform(train)

Pipeline(stages=[index,enc,ass]).transform()

In [14]:
ind = StringIndexer(inputCols=["BAL_CAT","maturity"],outputCols=["BAL_CAT_","maturity_"])

In [18]:
t = ind.fit(train).transform(train)

In [19]:
t.show()

+---------+---+---------+--------+---+-------+--------+--------+---------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|BAL_CAT|maturity|BAL_CAT_|maturity_|
+---------+---+---------+--------+---+-------+--------+--------+---------+
|    10000|  1|        1|       1| 38|    low|    yong|     0.0|      0.0|
|    10000|  1|        1|       1| 40|    low|    yong|     0.0|      0.0|
|    10000|  1|        1|       1| 48|    low|  mature|     0.0|      2.0|
|    10000|  1|        1|       1| 50|    low|  mature|     0.0|      2.0|
|    10000|  1|        1|       2| 22|    low|   youth|     0.0|      1.0|
|    10000|  1|        1|       2| 22|    low|   youth|     0.0|      1.0|
|    10000|  1|        1|       2| 23|    low|   youth|     0.0|      1.0|
|    10000|  1|        1|       2| 23|    low|   youth|     0.0|      1.0|
|    10000|  1|        1|       2| 23|    low|   youth|     0.0|      1.0|
|    10000|  1|        1|       2| 23|    low|   youth|     0.0|      1.0|
|    10000|  1|        1|

In [43]:
enco = OneHotEncoder(inputCols=["BAL_CAT_","maturity_"],outputCols=['bal_enc','maturity_enc'],dropLast=False)
e = enco.fit(t).transform(t)
e.show()

+---------+---+---------+--------+---+-------+--------+--------+---------+-------------+-------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|BAL_CAT|maturity|BAL_CAT_|maturity_|      bal_enc| maturity_enc|
+---------+---+---------+--------+---+-------+--------+--------+---------+-------------+-------------+
|    10000|  1|        1|       1| 38|    low|    yong|     0.0|      0.0|(3,[0],[1.0])|(3,[0],[1.0])|
|    10000|  1|        1|       1| 40|    low|    yong|     0.0|      0.0|(3,[0],[1.0])|(3,[0],[1.0])|
|    10000|  1|        1|       1| 48|    low|  mature|     0.0|      2.0|(3,[0],[1.0])|(3,[2],[1.0])|
|    10000|  1|        1|       1| 50|    low|  mature|     0.0|      2.0|(3,[0],[1.0])|(3,[2],[1.0])|
|    10000|  1|        1|       2| 22|    low|   youth|     0.0|      1.0|(3,[0],[1.0])|(3,[1],[1.0])|
|    10000|  1|        1|       2| 22|    low|   youth|     0.0|      1.0|(3,[0],[1.0])|(3,[1],[1.0])|
|    10000|  1|        1|       2| 23|    low|   youth|     0.0|      1.0

In [44]:
vec = VectorAssembler(inputCols=['bal_enc','maturity_enc'],outputCol='cat_feature')
vec.transform(e).show()

+---------+---+---------+--------+---+-------+--------+--------+---------+-------------+-------------+-------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|BAL_CAT|maturity|BAL_CAT_|maturity_|      bal_enc| maturity_enc|        cat_feature|
+---------+---+---------+--------+---+-------+--------+--------+---------+-------------+-------------+-------------------+
|    10000|  1|        1|       1| 38|    low|    yong|     0.0|      0.0|(3,[0],[1.0])|(3,[0],[1.0])|(6,[0,3],[1.0,1.0])|
|    10000|  1|        1|       1| 40|    low|    yong|     0.0|      0.0|(3,[0],[1.0])|(3,[0],[1.0])|(6,[0,3],[1.0,1.0])|
|    10000|  1|        1|       1| 48|    low|  mature|     0.0|      2.0|(3,[0],[1.0])|(3,[2],[1.0])|(6,[0,5],[1.0,1.0])|
|    10000|  1|        1|       1| 50|    low|  mature|     0.0|      2.0|(3,[0],[1.0])|(3,[2],[1.0])|(6,[0,5],[1.0,1.0])|
|    10000|  1|        1|       2| 22|    low|   youth|     0.0|      1.0|(3,[0],[1.0])|(3,[1],[1.0])|(6,[0,4],[1.0,1.0])|
|    10000|  1| 

In [35]:
li = ['k','h']

In [38]:
"_ind".join(li)

'k_indh'