<a href="https://colab.research.google.com/github/Yanina-Kutovaya/credit-cards-project/blob/main/Data_Preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
app_name = 'data_imputing'
spark_ui_port = 4043

In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import pyspark

spark = (
    pyspark.sql.SparkSession.builder
        .appName(app_name)
        .master("local[4]") # limit executor to 4 cores
        .config("spark.executor.memory", "15g")
        .config("spark.driver.memory", "15g")
        .config("spark.ui.port", spark_ui_port)
        .getOrCreate()
)

In [4]:
from pyspark import SparkFiles

TRAIN_DATA_URL = 'https://storage.yandexcloud.net/credit-cards-data/train.csv'
TEST_DATA_URL = 'https://storage.yandexcloud.net/credit-cards-data/test.csv'

spark.sparkContext.addFile(TRAIN_DATA_URL)
df_train = spark.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)

spark.sparkContext.addFile(TEST_DATA_URL)
df_test = spark.read.csv(SparkFiles.get("test.csv"), header=True, inferSchema= True)
df_train.limit(5).toPandas()

Unnamed: 0,TransactionID,isFraud,TransactionDT,TransactionAmt,ProductCD,card1,card2,card3,card4,card5,...,V330,V331,V332,V333,V334,V335,V336,V337,V338,V339
0,2987000,0,86400,68.5,W,13926,,150.0,discover,142.0,...,,,,,,,,,,
1,2987001,0,86401,29.0,W,2755,404.0,150.0,mastercard,102.0,...,,,,,,,,,,
2,2987002,0,86469,59.0,W,4663,490.0,150.0,visa,166.0,...,,,,,,,,,,
3,2987003,0,86499,50.0,W,18132,567.0,150.0,mastercard,117.0,...,,,,,,,,,,
4,2987004,0,86506,50.0,H,4497,514.0,150.0,mastercard,102.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


# EDA

In [5]:
df_train.count(), df_test.count()

(180000, 100001)

## Schema

In [7]:
df_train.schema == df_test.schema

True

In [6]:
df_train.printSchema()

root
 |-- TransactionID: integer (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- TransactionDT: integer (nullable = true)
 |-- TransactionAmt: double (nullable = true)
 |-- ProductCD: string (nullable = true)
 |-- card1: integer (nullable = true)
 |-- card2: double (nullable = true)
 |-- card3: double (nullable = true)
 |-- card4: string (nullable = true)
 |-- card5: double (nullable = true)
 |-- card6: string (nullable = true)
 |-- addr1: double (nullable = true)
 |-- addr2: double (nullable = true)
 |-- dist1: double (nullable = true)
 |-- dist2: double (nullable = true)
 |-- P_emaildomain: string (nullable = true)
 |-- R_emaildomain: string (nullable = true)
 |-- C1: double (nullable = true)
 |-- C2: double (nullable = true)
 |-- C3: double (nullable = true)
 |-- C4: double (nullable = true)
 |-- C5: double (nullable = true)
 |-- C6: double (nullable = true)
 |-- C7: double (nullable = true)
 |-- C8: double (nullable = true)
 |-- C9: double (nullable = true)
 |-- C10:

In [32]:
def get_schema(df, id ='TransactionID', target='isFraud', time_var='TransactionDT'):
  schema = {}    
  for (col, dtype) in df.dtypes:
    if col == id: schema['id'] = id
    elif col == target: schema['target'] = target
    elif col == time_var: schema['time'] = time_var 
    else:      
      if not dtype in schema:
        schema[dtype] = [col]
      else:
        schema[dtype].append(col)

  return schema

schema = get_schema(df_train)
schema.keys()

dict_keys(['id', 'target', 'time', 'double', 'string', 'int'])

## Target variable

In [8]:
target = 'isFraud'
df_train.groupBy(target).count().show()

+-------+------+
|isFraud| count|
+-------+------+
|      1|  5141|
|      0|174859|
+-------+------+



## Independent variables

__Train and test samples__

In [9]:
def get_samples(N=1000, seed=25):
  n_train = df_train.count()
  n_test = df_test.count()
  if N >= min(n_train, n_test): N = min(n_train, n_test) 

  t = min(n_train, n_test, N)
  f1 = t / n_train
  f2 = t / n_test  
  train_sample = df_train.sample(
      withReplacement=False, fraction=f1, seed=seed).toPandas()
  test_sample = df_test.sample(
      withReplacement=False, fraction=f2, seed=seed).toPandas()
  
  return train_sample, test_sample

__Chi2 test__

In [10]:
from scipy.stats import chisquare
import pandas as pd

def chi2_test(train_sample, test_sample, vars, p=0.025):  
  p_values = []
  for var in vars:
    a = train_sample[var].value_counts()
    b = test_sample[var].value_counts()
    df = pd.concat([a, b], axis=1).fillna(1e-01)
    observed = df.iloc[:, 1]
    expected = df.iloc[:, 0] / df.iloc[:, 0].sum() * df.iloc[:, 1].sum()
    (_, p_value_chi2)= chisquare(f_obs= observed, f_exp= expected)

    p_values.append(p_value_chi2)
  stat = pd.DataFrame(index=vars)
  stat['p-values'] = p_values
  vars_selected = stat[stat['p-values'] > p].index.tolist()

  return vars_selected

__Statistical tests for continuous variables:__

In [11]:
from scipy.stats import ttest_ind, mannwhitneyu, ks_2samp

def stat_tests_continuous(train_sample, test_sample, vars, p=0.025):
  vars_selected = []
  for var in vars:
    a = train_sample[var].fillna(0).values
    b = test_sample[var].fillna(0).values
    _, p_value_ttest = ttest_ind(a, b)        
    _, p_value_ks = ks_2samp(a, b) 
    _, p_value_mw = mannwhitneyu(a, b)
    
    if (p_value_ttest > p  and p_value_ks > p and p_value_mw > p): 
      vars_selected.append(var) 

  return vars_selected

### 1. Time variables

In [12]:
from pyspark.sql import functions as F 
from pyspark.sql.types import IntegerType

def get_time_features(df):
  w = 60 * 60 * 24 * 7
  d = 60 * 60 * 24
  h = 60 * 60
  m = 60  
  time_var = schema['time']

  df = df.withColumn('weekdays', ((df[time_var] % w) / d)\
                     .cast(IntegerType()))  
  df = df.withColumn(
      'weekdays', F.when(F.col('weekdays') > 2, df['weekdays'] - 3)\
      .otherwise(df['weekdays'] + 4))
  
  df = df.withColumn('hours', ((df[time_var] % d) / h)\
                     .cast(IntegerType()))  
  df = df.withColumn(
      'hours', F.when(F.col('hours') < 7, df['hours'] + 14)\
      .otherwise(df['hours'] - 7))

  df = df.withColumn('minutes', (((df[time_var] % d) % h) / m)\
                     .cast(IntegerType()))
    
  return df 

In [13]:
df_train = get_time_features(df_train)
df_test = get_time_features(df_test)

In [62]:
train_sample, test_sample = get_samples(N=1000, seed=18)

### 2. Categorical variables


In [35]:
categorical_vars = schema['string']
len(categorical_vars)

14

In [36]:
for var in categorical_vars:
  df_train.groupBy(var).count().orderBy('count', ascending=False).show()

+---------+------+
|ProductCD| count|
+---------+------+
|        W|110340|
|        H| 22422|
|        R| 21926|
|        C| 21664|
|        S|  3648|
+---------+------+

+----------------+------+
|           card4| count|
+----------------+------+
|            visa|118295|
|      mastercard| 54501|
|american express|  4818|
|        discover|  2378|
|            null|     8|
+----------------+------+

+---------------+------+
|          card6| count|
+---------------+------+
|          debit|122746|
|         credit| 57229|
|debit or credit|    10|
|    charge card|     8|
|           null|     7|
+---------------+------+

+-------------+-----+
|P_emaildomain|count|
+-------------+-----+
|    gmail.com|67198|
|    yahoo.com|28992|
|         null|28440|
|  hotmail.com|15058|
|anonymous.com|13328|
|      aol.com| 8583|
|  comcast.net| 2916|
|   icloud.com| 1677|
|  outlook.com| 1484|
|      msn.com| 1423|
|      att.net| 1223|
|sbcglobal.net| 1092|
|  verizon.net|  971|
|     live.com|

In [63]:
schema['categorical'] = chi2_test(train_sample, test_sample, schema['string'], p=0.025)
schema['categorical']

['M2', 'M3', 'M5', 'M6', 'M7', 'M8', 'M9']

### 3. Discrete independent variables

In [64]:
f1 = 5000 / df_train.count()
df = df_train.sample(withReplacement=False, fraction=f1, seed=0).toPandas()
n_unique = df.nunique()

In [65]:
discrete_list = n_unique[n_unique <= 25].index.tolist()
exceptions_list = [target] + categorical_vars 

discrete_vars = [var for var in discrete_list if not var in exceptions_list]
len(discrete_vars)

259

In [66]:
schema['discrete'] = chi2_test(train_sample, test_sample, discrete_vars, p=0.025)
len(schema['discrete'])

92

### 4. Continuous independent variables

In [67]:
continuous_list = n_unique[n_unique > 25].index.tolist()
exceptions_list = schema['string'] 

continuous_vars = [var for var in continuous_list if not var in exceptions_list]
len(continuous_vars)

123

In [68]:
schema['continuous'] = stat_tests_continuous(train_sample, test_sample, continuous_vars)
len(schema['continuous'])

48

In [73]:
schema.keys()

dict_keys(['id', 'target', 'time', 'double', 'string', 'int', 'categorical', 'discrete', 'continuous'])

# Data validator

In [5]:
import pandas as pd
from scipy.stats import ttest_ind, mannwhitneyu, chisquare, ks_2samp, norm

class DataValidator():
  def __init__(self, id='TransactionID', target='isFraud', 
               time_var='TransactionDT', sample_size=1000, p=0.025, seed=18):    
    self.schema = {}
    self.schema['id'] = id
    self.schema['target'] = target
    self.schema['time_var'] = time_var
    self.schema['time'] = ['weekdays', 'hours', 'minutes']
    self.ignore_list = [id, target, time_var]    
    
    self.N = sample_size
    self.p = p
    self.seed = seed
    
    self.train_test_schemas_the_same = False
    self.n_train = None
    self.n_test = None 
    self.train_sample = None  
    self.test_sample = None


  def check_train_test_schemas(self, df_train, df_test):
    target = self.schema['target']
    i = [var for var in df_train.columns if var != target]
    j = [var for var in df_test.columns if var != target]
    self.train_test_schemas_the_same = (df_train[i].schema == df_test[j].schema)
     

  def get_schema(self, df_train):    
    for (var, dtype) in df_train.dtypes:
      if var in self.ignore_list: continue          
      if dtype in self.schema:
        self.schema[dtype].append(var)
      else:
        self.schema[dtype] = [var]


  def get_samples(self, df_train, df_test):
    n_train = df_train.count()
    n_test = df_test.count()

    if self.N >= min(n_train, n_test): 
      self.N = min(n_train, n_test) 

    t = min(n_train, n_test, self.N)
    f1 = t / n_train
    f2 = t / n_test  
    self.train_sample = df_train.sample(
        withReplacement=True, fraction=f1, seed=self.seed
        ).toPandas()
    self.test_sample = df_test.sample(
        withReplacement=True, fraction=f2, seed=self.seed
        ).toPandas() 
    self.n_train = n_train
    

  def chi2_test(self, vars):  
    p_values = []
    for var in vars:
      a = self.train_sample[var].value_counts()
      b = self.test_sample[var].value_counts()
      df = pd.concat([a, b], axis=1).fillna(1e-01)
      observed = df.iloc[:, 1]
      expected = df.iloc[:, 0] / df.iloc[:, 0].sum() * df.iloc[:, 1].sum()
      (_, p_value_chi2)= chisquare(f_obs= observed, f_exp= expected)
      p_values.append(p_value_chi2)

    stat = pd.DataFrame(index=vars)
    stat['p-values'] = p_values
    vars_selected = stat[stat['p-values'] > self.p].index.tolist()

    return vars_selected


  def stat_tests_continuous(self, vars):
    vars_selected = []
    for var in vars:
      a = self.train_sample[var].fillna(0).values
      b = self.test_sample[var].fillna(0).values
      _, p_value_ttest = ttest_ind(a, b)        
      _, p_value_ks = ks_2samp(a, b) 
      _, p_value_mw = mannwhitneyu(a, b)
      
      p = self.p
      if (p_value_ttest > p  and p_value_ks > p and p_value_mw > p): 
        vars_selected.append(var) 

    return vars_selected


  def get_numerical_variables(self, df_train):    
    f1 = min(5000, self.n_train) / self.n_train
    df = df_train.sample(withReplacement=False, fraction=f1, seed=0).toPandas()
    
    n_unique = df.nunique()
    discrete_list = n_unique[n_unique <= 25].index.tolist()
    continuous_list = n_unique[n_unique > 25].index.tolist()
    ignore_list = self.ignore_list + self.schema['string']

    discrete_vars = [var for var in discrete_list if not var in ignore_list]  
    continuous_vars = [var for var in continuous_list if not var in ignore_list]  
    
    self.schema['discrete'] = self.chi2_test(discrete_vars)
    self.schema['continuous'] = self.stat_tests_continuous(continuous_vars)


  def fit(self, df_train, df_test): 
    self.check_train_test_schemas(df_train, df_test)
    if self.train_test_schemas_the_same:   
      self.get_schema(df_train)
      self.get_samples(df_train, df_test)
      self.schema['categorical'] = self.chi2_test(self.schema['string'])
      self.get_numerical_variables(df_train)    

    return self.schema

# Custom Transformers

In [None]:
#from pyspark.ml import Pipeline, Transformer  Estimator
#class CustomTransformer(Transformer):
#  def _transform(self, df) -> DataFrame:

#class CustomEstimator(Estimator):
#  def _fit(self, df) -> CustomTransformer:

In [7]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml import Pipeline, Transformer 
from typing import Iterable


class StringFromDiscrete(Transformer):
    """
    A custom Transformer which generates string variables from discrete.
    """

    def __init__(self, var_list: Iterable[str]):
      super(StringFromDiscrete, self).__init__()
      self.var_list = var_list

    def _transform(self, df: DataFrame) -> DataFrame:
      for var in self.var_list:
        df = df.withColumn(var + '_str', df[var].cast(StringType()))
      
      return df


class TimeFeaturesGenerator(Transformer):
    """
    A custom Transformer which generates weekdays, hours and minutes from time 
    variable.
    """

    def __init__(self, time_var: str):
      super(TimeFeaturesGenerator, self).__init__()
      self.time_var = time_var

    def _transform(self, df: DataFrame) -> DataFrame:
      w = 60 * 60 * 24 * 7
      d = 60 * 60 * 24
      h = 60 * 60
      m = 60 
      time_var = self.time_var 
      df = df.withColumn('weekdays', (
          (df[time_var] % w) / d).cast(IntegerType())
          )  
      df = df.withColumn(
          'weekdays', F.when(F.col('weekdays') > 2, df['weekdays'] - 3)\
          .otherwise(df['weekdays'] + 4)
          )    
      df = df.withColumn('hours', (
          (df[time_var] % d) / h).cast(IntegerType())
          )  
      df = df.withColumn(
          'hours', F.when(F.col('hours') < 7, df['hours'] + 14)\
          .otherwise(df['hours'] - 7)
          )
      df = df.withColumn(
          'minutes', (((df[time_var] % d) % h) / m).cast(IntegerType())
          )      
      
      return df

# Pipeline

In [8]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, RobustScaler

validator = DataValidator()
schema = validator.fit(df_train, df_test)

make_string_columns_from_discrete = StringFromDiscrete(var_list=schema['discrete'])
get_time_features = TimeFeaturesGenerator(time_var=schema['time_var'])

string_indexer = StringIndexer(
    inputCols=schema['categorical'], 
    outputCols=[i + '_index' for i in schema['categorical']],
    handleInvalid='keep'
    )
string_one_hot_encoder = OneHotEncoder(
    inputCols=[i + '_index' for i in schema['categorical']], 
    outputCols=[i + '_encoded' for i in schema['categorical']]
    )
discrete_vars = schema['discrete'] + schema['time']
discrete_imputer = Imputer(
    inputCols=discrete_vars , 
    outputCols=[var + '_imputed' for var in discrete_vars ],
    strategy='mode'
    )
discrete_features_assembler = VectorAssembler(
    inputCols=[var + '_imputed' for var in discrete_vars], 
    outputCol='discrete_assembled'
    )
discrete_minmax_scaler = MinMaxScaler(
    inputCol='discrete_assembled', 
    outputCol='discrete_vector_scaled'
    )
discrete_string_indexer = StringIndexer(
    inputCols=[i + '_str' for i in schema['discrete']], 
    outputCols=[i + '_index' for i in schema['discrete']],
    handleInvalid='keep'
    )
discrete_one_hot_encoder = OneHotEncoder(
    inputCols=[i + '_index' for i in schema['discrete']], 
    outputCols=[i + '_encoded' for i in schema['discrete']],    
    )
continuous_imputer = Imputer(
    inputCols=schema['continuous'], 
    outputCols=[i + '_imputed' for i in schema['continuous']],
    strategy='median'
    )
continuous_features_assembler = VectorAssembler(
    inputCols=[i + '_imputed' for i in schema['continuous']], 
    outputCol='continuous_assembled'
    )
continuous_robust_scaler = RobustScaler(
    inputCol='continuous_assembled', 
    outputCol='continuous_vector_scaled'
    )

cat_vars = [var + '_encoded' for var in schema['categorical']]
disc_vars = ['discrete_vector_scaled'] + [var + '_encoded' for var in schema['discrete']]
cont_vars = ['continuous_vector_scaled']  
vars = cat_vars + disc_vars + cont_vars
features_assembler = VectorAssembler(
    inputCols=vars, 
    outputCol='features'
    )

In [9]:
feat_ext_pipe = Pipeline(
    stages=[make_string_columns_from_discrete,
            get_time_features,
            string_indexer,
            string_one_hot_encoder,
            discrete_imputer,
            discrete_features_assembler,
            discrete_minmax_scaler,
            discrete_string_indexer,
            discrete_one_hot_encoder,
            continuous_imputer,
            continuous_features_assembler,
            continuous_robust_scaler,
            features_assembler]
            ).fit(df_train)
    
df1 = feat_ext_pipe.transform(df_train)
df2 = feat_ext_pipe.transform(df_test)
df1.select('features').show(10)

+--------------------+
|            features|
+--------------------+
|(1212,[0,3,5,8,13...|
|(1212,[2,6,8,13,1...|
|(1212,[0,2,5,7,9,...|
|(1212,[2,6,7,13,1...|
|(1212,[13,15,20,2...|
|(1212,[0,4,5,8,13...|
|(1212,[0,2,5,7,10...|
|(1212,[2,5,7,13,1...|
|(1212,[13,15,20,2...|
|(1212,[0,2,6,8,13...|
+--------------------+
only showing top 10 rows



# Notes

## Categorical variables

In [8]:
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(
    inputCols=schema['categorical'], 
    outputCols=[i + '_index' for i in schema['categorical']],
    handleInvalid='keep'
    )
string_indexer_model = string_indexer.fit(df_train)
df_train = string_indexer_model.transform(df_train)

df_train.select([i + '_index' for i in schema['categorical']]).show(10)

+--------+--------+--------+--------+--------+--------+
|M2_index|M4_index|M5_index|M6_index|M7_index|M8_index|
+--------+--------+--------+--------+--------+--------+
|     0.0|     1.0|     0.0|     1.0|     2.0|     2.0|
|     2.0|     0.0|     1.0|     1.0|     2.0|     2.0|
|     0.0|     0.0|     0.0|     0.0|     0.0|     0.0|
|     2.0|     0.0|     1.0|     0.0|     2.0|     2.0|
|     2.0|     3.0|     2.0|     2.0|     2.0|     2.0|
|     0.0|     2.0|     0.0|     1.0|     2.0|     2.0|
|     0.0|     0.0|     0.0|     0.0|     1.0|     1.0|
|     2.0|     0.0|     0.0|     0.0|     2.0|     2.0|
|     2.0|     3.0|     2.0|     2.0|     2.0|     2.0|
|     0.0|     0.0|     1.0|     1.0|     2.0|     2.0|
+--------+--------+--------+--------+--------+--------+
only showing top 10 rows



In [9]:
from pyspark.ml.feature import OneHotEncoder

string_one_hot_encoder = OneHotEncoder(
    inputCols=[i + '_index' for i in schema['categorical']], 
    outputCols=[i + '_encoded' for i in schema['categorical']]
    )
string_one_hot_encoder_model = string_one_hot_encoder.fit(df_train)
df_train = string_one_hot_encoder_model.transform(df_train)
df_train.select([i + '_encoded' for i in schema['categorical']]).show(10)

+-------------+-------------+-------------+-------------+-------------+-------------+
|   M2_encoded|   M4_encoded|   M5_encoded|   M6_encoded|   M7_encoded|   M8_encoded|
+-------------+-------------+-------------+-------------+-------------+-------------+
|(2,[0],[1.0])|(3,[1],[1.0])|(2,[0],[1.0])|(2,[1],[1.0])|    (2,[],[])|    (2,[],[])|
|    (2,[],[])|(3,[0],[1.0])|(2,[1],[1.0])|(2,[1],[1.0])|    (2,[],[])|    (2,[],[])|
|(2,[0],[1.0])|(3,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|
|    (2,[],[])|(3,[0],[1.0])|(2,[1],[1.0])|(2,[0],[1.0])|    (2,[],[])|    (2,[],[])|
|    (2,[],[])|    (3,[],[])|    (2,[],[])|    (2,[],[])|    (2,[],[])|    (2,[],[])|
|(2,[0],[1.0])|(3,[2],[1.0])|(2,[0],[1.0])|(2,[1],[1.0])|    (2,[],[])|    (2,[],[])|
|(2,[0],[1.0])|(3,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|(2,[1],[1.0])|(2,[1],[1.0])|
|    (2,[],[])|(3,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|    (2,[],[])|    (2,[],[])|
|    (2,[],[])|    (3,[],[])|    (2,[],[])|    (2,[],[

## Discrete variables

### As numeric

In [30]:
from pyspark.ml.feature import Imputer, VectorAssembler, MinMaxScaler

discrete_imputer = Imputer(
    inputCols=schema['discrete'], 
    outputCols=[var + '_imputed' for var in schema['discrete']],
    strategy='mode'
    )
discrete_imputer_model = discrete_imputer.fit(df_train)
df_train = discrete_imputer_model.transform(df_train)
#df_train.select([i + '_imputed' for i in schema['continuous']]).show(10)

In [31]:
discrete_features_assembler = VectorAssembler(
    inputCols=[var + '_imputed' for var in schema['discrete']], 
    outputCol='discrete_assembled')

df_train = discrete_features_assembler.transform(df_train)

In [33]:
discrete_minmax_scaler = MinMaxScaler(
    inputCol='discrete_assembled', outputCol='discrete_vector_scaled')

discrete_minmax_scaler_model = discrete_minmax_scaler.fit(df_train)
df_train = discrete_minmax_scaler_model.transform(df_train)

df_train.select('discrete_vector_scaled').show(10)

+----------------------+
|discrete_vector_scaled|
+----------------------+
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
|  (69,[0,2,7,8,12,1...|
+----------------------+
only showing top 10 rows



### As categories

In [15]:
discrete_string_indexer = StringIndexer(
    inputCols=[i + '_str' for i in schema['discrete']], 
    outputCols=[i + '_index' for i in schema['discrete']],
    handleInvalid='keep'
    )
discrete_string_indexer_model = discrete_string_indexer.fit(df_train)
df_train = discrete_string_indexer_model.transform(df_train)

#df_train.select([i + '_index' for i in schema['discrete']]).show(10)

In [18]:
discrete_one_hot_encoder = OneHotEncoder(
    inputCols=[i + '_index' for i in schema['discrete']], 
    outputCols=[i + '_encoded' for i in schema['discrete']],    
    )
discrete_one_hot_encoder_model = discrete_one_hot_encoder.fit(df_train)
df_train = discrete_one_hot_encoder_model.transform(df_train)

df_train.select([i + '_encoded' for i in schema['discrete']]).show(10)

+-------------+-------------+-------------+-------------+--------------+-------------+-------------+-------------+--------------+-------------+-------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+-------------+--------------+---------------+--------------+--------------+--------------+-------------+--------------+
|

## Continuous variables

In [23]:
from pyspark.ml.feature import Imputer

continuous_imputer = Imputer(
    inputCols=schema['continuous'], 
    outputCols=[i + '_imputed' for i in schema['continuous']],
    strategy='median'
    )
continuous_imputer_model = continuous_imputer.fit(df_train)
df_train = continuous_imputer_model.transform(df_train)
#df_train.select([i + '_imputed' for i in schema['continuous']]).show(10)

In [25]:
from pyspark.ml.feature import VectorAssembler

continuous_features_assembler = VectorAssembler(
    inputCols=[i + '_imputed' for i in schema['continuous']], 
    outputCol='continuous_assembled')

df_train = continuous_features_assembler.transform(df_train)

In [27]:
from pyspark.ml.feature import RobustScaler

continuous_robust_scaler = RobustScaler(
    inputCol='continuous_assembled', outputCol='continuous_vector_scaled')

continuous_robust_scaler_model = continuous_robust_scaler.fit(df_train)

df_train = continuous_robust_scaler_model.transform(df_train)

df_train.select('continuous_vector_scaled').show(10)

+------------------------+
|continuous_vector_scaled|
+------------------------+
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,3,4,6]...|
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,4,6,62...|
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,3,4,6,...|
|    (63,[0,1,2,3,4,6,...|
+------------------------+
only showing top 10 rows



## Feature *Assembling*

In [29]:
cat_vars = [var + '_encoded' for var in schema['categorical']]
disc_vars = ['discrete_vector_scaled'] + [var + '_encoded' for var in schema['discrete']]
cont_vars = ['continuous_vector_scaled']  
vars = cat_vars + disc_vars + cont_vars  

features_assembler = VectorAssembler(inputCols=vars, outputCol='features')