In [None]:
app_name = 'data_preprocessing'
spark_ui_port = 4041

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 58.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=a5026cbd3b5893391d2b3216322147842a31daf25cc5e5f7e07cb6571d5021d5
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
import pyspark

spark = (
    pyspark.sql.SparkSession.builder
        .appName(app_name)
        .master("local[4]") # limit executor to 4 cores
        .config("spark.executor.memory", "15g")
        .config("spark.driver.memory", "15g")
        .config("spark.ui.port", spark_ui_port)
        .getOrCreate()
)

In [None]:
from pyspark import SparkFiles

TRAIN_DATA_URL = 'https://storage.yandexcloud.net/credit-cards-data/train.parquet'
TEST_DATA_URL = 'https://storage.yandexcloud.net/credit-cards-data/test.parquet'

spark.sparkContext.addFile(TRAIN_DATA_URL)
df_train = spark.read.parquet(
    SparkFiles.get('train.parquet'), header=True, inferSchema= True
    )
spark.sparkContext.addFile(TEST_DATA_URL)
df_test = spark.read.parquet(
    SparkFiles.get('test.parquet'), header=True, inferSchema= True
    )

df_train = df_train.repartition(4)
df_test = df_test.repartition(4)

# EDA

In [None]:
df_train.count(), df_test.count()

(180000, 100001)

In [None]:
df_train.schema == df_test.schema

True

In [None]:
df_train.printSchema()

root
 |-- TransactionID: long (nullable = true)
 |-- isFraud: long (nullable = true)
 |-- TransactionDT: long (nullable = true)
 |-- TransactionAmt: double (nullable = true)
 |-- ProductCD: string (nullable = true)
 |-- card1: long (nullable = true)
 |-- card2: double (nullable = true)
 |-- card3: double (nullable = true)
 |-- card4: string (nullable = true)
 |-- card5: double (nullable = true)
 |-- card6: string (nullable = true)
 |-- addr1: double (nullable = true)
 |-- addr2: double (nullable = true)
 |-- dist1: double (nullable = true)
 |-- dist2: double (nullable = true)
 |-- P_emaildomain: string (nullable = true)
 |-- R_emaildomain: string (nullable = true)
 |-- C1: double (nullable = true)
 |-- C2: double (nullable = true)
 |-- C3: double (nullable = true)
 |-- C4: double (nullable = true)
 |-- C5: double (nullable = true)
 |-- C6: double (nullable = true)
 |-- C7: double (nullable = true)
 |-- C8: double (nullable = true)
 |-- C9: double (nullable = true)
 |-- C10: double (nul

## 1. Creating top-level variables

In [None]:
IDENTIFIERS = ['TransactionID']
TARGET_COLUMN = ['isFraud']
TIME_COLUMNS = ['TransactionDT']

### Binary columns

In [None]:
import pandas as pd
import pyspark.sql.functions as F

pd.set_option("display.max_rows", 1000)

is_binary = df_train.agg(
    *[(F.size(F.collect_set(x)) == 2).alias(x) for x in df_train.columns]
    ).toPandas()

binary_test = is_binary.unstack()  

binary_test[:6]

TransactionID   0    False
isFraud         0     True
TransactionDT   0    False
TransactionAmt  0    False
ProductCD       0    False
card1           0    False
dtype: bool

In [None]:
exclude_list = IDENTIFIERS + TARGET_COLUMN + TIME_COLUMNS
BINARY_COLUMNS = [
    i[0] for i in binary_test[binary_test == True].index if i[0] not in exclude_list
]

### Categorical columns

In [None]:
CATEGORICAL_COLUMNS = [k for (k, v) in df_train.dtypes if v == 'string' and k not in BINARY_COLUMNS]

### Discrete columns

In [None]:
is_discrete = df_train.agg(
    *[(F.size(F.collect_set(x)) <= 25).alias(x) for x in df_train.columns]
    ).toPandas()

discrete_test = is_discrete.unstack()  

discrete_test[:6]

TransactionID   0    False
isFraud         0     True
TransactionDT   0    False
TransactionAmt  0    False
ProductCD       0     True
card1           0    False
dtype: bool

In [None]:
exclude_list = IDENTIFIERS + TARGET_COLUMN + TIME_COLUMNS + BINARY_COLUMNS + CATEGORICAL_COLUMNS
DISCRETE_COLUMNS = [
    i[0] for i in discrete_test[discrete_test == True].index if i[0] not in exclude_list
]

### Contunuous columns

In [None]:
exclude_list += DISCRETE_COLUMNS
CONTINUOUS_COLUMNS = [i for i in df_train.columns if not i in exclude_list]

## 2. Weeding out useless records

In [None]:
df_train.groupBy(TARGET_COLUMN).count().show()

+-------+------+
|isFraud| count|
+-------+------+
|      0|174859|
|      1|  5141|
+-------+------+



In [None]:
df_train = df_train.dropna(
    how="all", subset=[x for x in df_train.columns if x not in IDENTIFIERS]
    )
df_train = df_train.dropna(subset=TARGET_COLUMN)

print(df_train.count(), len(df_train.columns))

180000 394


## 3. Data cleaning and feature selection

### 3.1 Binary features

In [None]:
for var in BINARY_COLUMNS:
  df_train.groupBy(var).count().orderBy('count', ascending=False).show()

+----+------+
|  M1| count|
+----+------+
|null|118251|
|   T| 61748|
|   F|     1|
+----+------+

+----+------+
|  M2| count|
+----+------+
|null|118251|
|   T| 55657|
|   F|  6092|
+----+------+

+----+------+
|  M3| count|
+----+------+
|null|118251|
|   T| 48603|
|   F| 13146|
+----+------+

+----+------+
|  M5| count|
+----+------+
|null|118297|
|   F| 34262|
|   T| 27441|
+----+------+

+----+-----+
|  M6|count|
+----+-----+
|null|74348|
|   F|56627|
|   T|49025|
+----+-----+

+----+------+
|  M7| count|
+----+------+
|null|148348|
|   F| 27118|
|   T|  4534|
+----+------+

+----+------+
|  M8| count|
+----+------+
|null|148348|
|   F| 19814|
|   T| 11838|
+----+------+

+----+------+
|  M9| count|
+----+------+
|null|148348|
|   T| 26970|
|   F|  4682|
+----+------+

+----+------+
|  V1| count|
+----+------+
|null|120901|
| 1.0| 59094|
| 0.0|     5|
+----+------+

+----+------+
| V14| count|
+----+------+
| 1.0|134252|
|null| 45678|
| 0.0|    70|
+----+------+

+----+------+
| V

#### Drop the binary features that happen too little or too often

In [None]:
too_rare_binary_features = [
    'M1', 'V1', 'V14', 'V41', 'V65', 'V88', 'V305'
]

BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_binary_features)) 
BINARY_COLUMNS

['M9', 'M5', 'M2', 'M6', 'M7', 'V94', 'M8', 'M3']

In [None]:
for var in BINARY_COLUMNS:
  df_train.groupBy(var).count().orderBy('count', ascending=False).show()

+----+------+
|  M9| count|
+----+------+
|null|148348|
|   T| 26970|
|   F|  4682|
+----+------+

+----+------+
|  M2| count|
+----+------+
|null|118251|
|   T| 55657|
|   F|  6092|
+----+------+

+----+------+
|  M3| count|
+----+------+
|null|118251|
|   T| 48603|
|   F| 13146|
+----+------+

+----+------+
|  M7| count|
+----+------+
|null|148348|
|   F| 27118|
|   T|  4534|
+----+------+

+----+------+
|  M8| count|
+----+------+
|null|148348|
|   F| 19814|
|   T| 11838|
+----+------+

+----+------+
|  M5| count|
+----+------+
|null|118297|
|   F| 34262|
|   T| 27441|
+----+------+

+----+-----+
|  M6|count|
+----+-----+
|null|74348|
|   F|56627|
|   T|49025|
+----+-----+

+----+------+
| V94| count|
+----+------+
| 0.0|110440|
|null| 48827|
| 1.0| 20733|
+----+------+



### 3.2 Categorical features

In [None]:
for var in CATEGORICAL_COLUMNS:
  df_train.groupBy(var).count().orderBy('count', ascending=False).show()

+---------+------+
|ProductCD| count|
+---------+------+
|        W|110340|
|        H| 22422|
|        R| 21926|
|        C| 21664|
|        S|  3648|
+---------+------+

+----------------+------+
|           card4| count|
+----------------+------+
|            visa|118295|
|      mastercard| 54501|
|american express|  4818|
|        discover|  2378|
|            null|     8|
+----------------+------+

+---------------+------+
|          card6| count|
+---------------+------+
|          debit|122746|
|         credit| 57229|
|debit or credit|    10|
|    charge card|     8|
|           null|     7|
+---------------+------+

+-------------+-----+
|P_emaildomain|count|
+-------------+-----+
|    gmail.com|67198|
|    yahoo.com|28992|
|         null|28440|
|  hotmail.com|15058|
|anonymous.com|13328|
|      aol.com| 8583|
|  comcast.net| 2916|
|   icloud.com| 1677|
|  outlook.com| 1484|
|      msn.com| 1423|
|      att.net| 1223|
|sbcglobal.net| 1092|
|  verizon.net|  971|
|     live.com|

#### Reclassify categorical variable 'card6' to binary

In [None]:
var = 'card6'
df_train = df_train.withColumn(
      var, F.when(F.col(var) == 'debit', 1).otherwise(
        F.when(F.col(var) == 'credit', 0))
      )
BINARY_COLUMNS += [var]

CATEGORICAL_COLUMNS = list(set(CATEGORICAL_COLUMNS) - set([var])) 
CATEGORICAL_COLUMNS

['P_emaildomain', 'ProductCD', 'M4', 'card4', 'R_emaildomain']

### 3.3. Discrete features

In [None]:
for var in DISCRETE_COLUMNS:
  df_train.groupBy(var).count().orderBy('count', ascending=False).show()

+----+------+
|  C3| count|
+----+------+
| 0.0|178740|
| 1.0|  1152|
| 2.0|    76|
| 3.0|    17|
|16.0|     3|
|11.0|     2|
|12.0|     2|
| 8.0|     1|
| 4.0|     1|
|10.0|     1|
|14.0|     1|
|13.0|     1|
|15.0|     1|
|17.0|     1|
| 9.0|     1|
+----+------+

+------------------+------+
|                D9| count|
+------------------+------+
|              null|146404|
|0.7916659712791443|  2660|
|              0.75|  2514|
|0.7083330154418945|  2495|
|0.6666659712791443|  2449|
|0.8333330154418945|  2432|
|             0.875|  2264|
|             0.625|  1987|
|0.9166659712791444|  1968|
|0.9583330154418944|  1927|
|               0.0|  1836|
|0.0416660010814666|  1615|
|0.5833330154418945|  1608|
|0.0833330005407333|  1565|
|             0.125|  1357|
|0.1666660010814666|  1242|
|0.2083330005407333|   892|
|0.5416659712791443|   876|
|              0.25|   496|
|               0.5|   448|
+------------------+------+
only showing top 20 rows

+----+------+
|  V2| count|
+----+-

In [None]:
countinuous = ['D9']
no_info = ['V27', 'V28', 'V68', 'V89', 'V107',  'V108', 'V111', 'V117', 'V118',
           'V119', 'V120', 'V121', 'V122', 'V240', 'V241', ]
# 0, >0
binary_1 = [
    'C3', 'V4', 'V10', 'V11', 'V12', 'V13', 'V15', 'V16', 'V17', 'V18', 'V19', 
    'V20', 'V21', 'V22','V26', 'V29', 'V31', 'V32', 'V33', 'V34', 'V42', 'V50',
    'V57', 'V58', 'V59','V60', 'V61', 'V62', 'V63', 'V64', 'V71', 'V75', 'V79',
    'V80', 'V92', 'V93', 'V98', 'V104', 'V106', 'V138', 'V141', 'V142', 'V146',
    'V147', 'V148', 'V149', 'V153', 'V154', 'V155', 'V156', 'V169', 'V173', 
    'V174', 'V194', 'V195', 'V197', 'V198', 'V217', 'V220', 'V223', 'V226', 
    'V231',  'V235', 'V237', 'V238', 'V239', 'V250', 'V251', 'V284', 'V286', 
    'V300', 'V325', 'V327', 'V328',  'V329', 'V330'
    ]
# 1, >1
binary_2 = [
    'V2', 'V3', 'V5', 'V6', 'V7', 'V8', 'V9',  'V23', 'V24', 'V25', 'V37', 'V44', 
    'V46', 'V47', 'V55', 'V56', 'V77', 'V78', 'V86', 'V87', 'V109','V110', 'V112',
    'V113', 'V114', 'V115', 'V116', 'V123','V124', 'V125', 'V191', 'V228', 'V242',
    'V244', 'V246', 'V247', 'V248', 'V249', 'V252', 'V257',
    ]
# >= 60, < 60
binary_3 = ['V152']

# 0, 1, >1
discrete_1 = [
    'V30', 'V35', 'V36', 'V39', 'V40', 'V43', 'V48', 'V49', 'V51', 'V52', 'V53',
    'V54', 'V66', 'V67', 'V69', 'V70', 'V72', 'V73', 'V74', 'V76', 'V81', 'V82',
    'V83', 'V84', 'V85','V90', 'V91', 'V100', 'V101', 'V157', 'V158', 'V175',
    'V181', 'V184', 'V185', 'V188', 'V189', 'V260', 'V262', 'V281', 'V287',
    'V288', 'V289', 'V297', 'V299', 'V301', 'V302', 'V303', 'V304',
    ]

In [None]:
CONTINUOUS_COLUMNS += countinuous

#### Reclassify discrete variables to binary 

In [None]:
for var in binary_1:
  df_train = df_train.withColumn(
      var, F.when(F.col(var) > 0, 1).otherwise(F.col(var))
      )
for var in binary_2:
  df_train = df_train.withColumn(
      var, F.when(F.col(var) == 1, 0).otherwise(
        F.when(F.col(var) > 1, 1))
      )
for var in binary_3:
  df_train = df_train.withColumn(
    var, F.when(F.col(var) <= 60, 0).otherwise(
        F.when(F.col(var) > 60, 1))
    )

BINARY_COLUMNS += [var for var in binary_1 + binary_2 + binary_3] 

#### Consolidate discrete variables to 3 classes: 0, 1, >1

In [None]:
for var in discrete_1:
  df_train = df_train.withColumn(
        var, F.when(F.col(var) > 1, 2).otherwise(F.col(var))
        )
DISCRETE_COLUMNS = [var for var in discrete_1]

### 3.4 Time features

In [None]:
from pyspark.sql.types import IntegerType

def generate_time_vars(df):
  w = 60 * 60 * 24 * 7
  d = 60 * 60 * 24
  h = 60 * 60
  m = 60 
  time_var = TIME_COLUMNS[0] 
  df = df.withColumn(
      'weekdays', (F.col(time_var) % w / d).cast(IntegerType())
      )
  df = df.withColumn(
      'hours', (F.col(time_var) % d / h).cast(IntegerType())
      )
  df = df.withColumn(
      'minutes', (F.col(time_var) % d % h / m).cast(IntegerType())
      )

  return df   

In [None]:
df_train = generate_time_vars(df_train)
df_test = generate_time_vars(df_test)

DISCRETE_COLUMNS += ['weekdays', 'hours', 'minutes']

### 3.5 Chi2 goodness-of-fit test for binary, categorical and discrete features

#### 3.5.1. Binary features

In [None]:
df_train = df_train.na.fill(value=-999, subset=BINARY_COLUMNS)

In [None]:
from pyspark.ml import feature as MF

BINARY_STRING = [k for (k, v) in df_train[BINARY_COLUMNS].dtypes if v == 'string']

binary_string_indexer = MF.StringIndexer(
    inputCols=BINARY_STRING, 
    outputCols=[var + '_i' for var in BINARY_STRING],
    handleInvalid='keep'
    ).fit(df_train)

df_train = binary_string_indexer.transform(df_train)  

new_binary = list(set(BINARY_COLUMNS) - set(BINARY_STRING )) + [var + '_i' for var in BINARY_STRING]

In [None]:
import pyspark.sql.functions as F
from pyspark.ml.stat import ChiSquareTest

feature_vectior = MF.VectorAssembler(inputCols=new_binary, outputCol='features')
df_train = feature_vectior.transform(df_train)

result = ChiSquareTest.test(df_train, 'features', TARGET_COLUMN[0], True)
df_train = df_train.drop('features')

row = result.orderBy("featureIndex").collect()
alpha = 1e-06
BINARY_FEATURES = [new_binary[i] for i in range(len(row)) if row[i].pValue > alpha]
BINARY_FEATURES

['V26', 'V286']

#### 3.5.2. Categorical features

In [None]:
categorical_string_indexer = MF.StringIndexer(
    inputCols=CATEGORICAL_COLUMNS, 
    outputCols=[var + '_i' for var in CATEGORICAL_COLUMNS],
    handleInvalid='keep'
    ).fit(df_train)

df_train = categorical_string_indexer.transform(df_train)  

new_categorical = [var + '_i' for var in CATEGORICAL_COLUMNS]

In [None]:
feature_vectior = MF.VectorAssembler(
    inputCols=new_categorical, 
    outputCol='features'
    )
df_train = feature_vectior.transform(df_train)

result = ChiSquareTest.test(df_train, 'features', TARGET_COLUMN[0], True)
df_train = df_train.drop('features')

row = result.orderBy("featureIndex").collect()

alpha = 1e-06
CATEGORICAL_FEATURES = [new_categorical[i] for i in range(len(row)) if row[i].pValue > alpha]
CATEGORICAL_FEATURES

[]

#### 3.5.3. Discrete features

In [None]:
df_train = df_train.na.fill(value=-999, subset=DISCRETE_COLUMNS)

In [None]:
from pyspark.ml.stat import ChiSquareTest

feature_vectior = MF.VectorAssembler(
    inputCols=DISCRETE_COLUMNS, 
    outputCol='features'
    )
df_train = feature_vectior.transform(df_train)

result = ChiSquareTest.test(df_train, 'features', TARGET_COLUMN[0], True)
df_train = df_train.drop('features')

row = result.orderBy("featureIndex").collect()

alpha = 1e-06
DISCRETE_FEATURES = [DISCRETE_COLUMNS[i] for i in range(len(row)) if row[i].pValue > alpha]
DISCRETE_FEATURES

['weekdays', 'minutes']

### 3.6. Chi2 test for train and test distributions for selected binary featues

In [None]:
def get_samples(df_train, df_test, N=1000, seed=25):
    n_train = df_train.count()
    n_test = df_test.count()

    if N >= min(n_train, n_test):
      N = min(n_train, n_test) 

    t = min(n_train, n_test, N)
    f1 = t / n_train
    f2 = t / n_test  
    train_sample = df_train.sample(
        withReplacement=True, fraction=f1, seed=seed
        ).toPandas()
    test_sample = df_test.sample(
        withReplacement=True, fraction=f2, seed=seed
        ).toPandas() 
    return train_sample, test_sample

In [None]:
import pandas as pd
from scipy.stats import chisquare

def chi2_test(vars, train_sample, test_sample, p=0.025):
  p_values = []
  for var in vars:
    a = train_sample[var].value_counts()
    b = test_sample[var].value_counts()
    df = pd.concat([a, b], axis=1).fillna(1e-01)
    observed = df.iloc[:, 1]
    expected = df.iloc[:, 0] / df.iloc[:, 0].sum() * df.iloc[:, 1].sum()
    (_, p_value_chi2)= chisquare(f_obs= observed, f_exp= expected)
    p_values.append(p_value_chi2)

  stat = pd.DataFrame(index=vars)
  stat['p-values'] = p_values
  vars_selected = stat[stat['p-values'] > p].index.tolist()

  return vars_selected

In [None]:
train_sample, test_sample = get_samples(df_train, df_test)
BINARY_FEATURES = chi2_test(BINARY_FEATURES, train_sample, test_sample)
BINARY_FEATURES

['V286', 'V26']

### 3.7. Continuous features

##### 3.7.1. Taking care of extreme values

In [None]:
df_train.select(*CONTINUOUS_COLUMNS).summary(
    "mean", 
    "stddev",
     "min",
    "1%",
    "5%",
    "50%",
    "95%",
    "99%",
    "max"
    ).show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+----------------+------------------+-------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+-----------

Imputing the 99th percentile value instead of max for continuous columns if the difference between 99th percentile value and maximum value exceeds standard deviation.

In [None]:
import numpy as np
from numpy import float32

select_list = ['stddev', '99%', 'max']
df = df_train.select(*CONTINUOUS_COLUMNS).summary(select_list).toPandas().T
df.columns = select_list
df = df.iloc[1:, :].astype(float32)
df.head(3)  

Unnamed: 0,stddev,99%,max
TransactionAmt,206.51503,1000.0,5278.950195
card1,4910.77832,18246.0,18396.0
card2,158.952835,583.0,600.0


In [None]:
max_list = np.where(
    (df['max'] - df['99%']) / df['stddev'] > 1, df['99%'], df['max']
    ).tolist()
maximum = {k: v for (k, v) in zip(CONTINUOUS_COLUMNS, max_list)}

In [None]:
for k, v in maximum.items():
  df_train = df_train.withColumn(
      k, F.when(F.isnull(F.col(k)), F.col(k)).otherwise(F.least(F.col(k), F.lit(v)))
      )
  df_test = df_test.withColumn(
      k, F.when(F.isnull(F.col(k)), F.col(k)).otherwise(F.least(F.col(k), F.lit(v)))
      )

#### 3.7.2. Statistical check of train and test distributions for continuous variables

In [None]:
from scipy.stats import ttest_ind, mannwhitneyu, ks_2samp, norm

def stat_tests_continuous(train_sample, test_sample):
    vars_selected = []
    for var in CONTINUOUS_COLUMNS:
      a = train_sample[var].fillna(0).values
      b = test_sample[var].fillna(0).values
      _, p_value_ttest = ttest_ind(a, b)        
      _, p_value_ks = ks_2samp(a, b) 
      _, p_value_mw = mannwhitneyu(a, b)
      
      p = 0.025
      if (p_value_ttest > p  and p_value_ks > p and p_value_mw > p): 
        vars_selected.append(var) 

    return vars_selected

In [None]:
train_sample, test_sample = get_samples(df_train, df_test)
CONTINUOUS_COLUMNS = stat_tests_continuous(train_sample, test_sample)
len(CONTINUOUS_COLUMNS)

49

#### 3.7.3. Еlimination of highly correlated variables

##### A. Correlation between variables

In [None]:
df_train = df_train.na.fill(value=0, subset=CONTINUOUS_COLUMNS)

In [None]:
from pyspark.ml import feature as MF

continuous_features = MF.VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS, outputCol='continuous_features'
    )
continuous_vector = df_train.select(CONTINUOUS_COLUMNS)

continuous_vector_variable = continuous_features.transform(continuous_vector)

In [None]:
from pyspark.ml.stat import Correlation

correlation = Correlation.corr(
    continuous_vector_variable,
    'continuous_features'
    )
correlation.printSchema()

root
 |-- pearson(continuous_features): matrix (nullable = false)



In [None]:
import pandas as pd

pd.set_option("display.max_rows", 1000)

correlation_array = correlation.head()[0].toArray()
correlation_pd = pd.DataFrame(
    correlation_array,
    index=CONTINUOUS_COLUMNS,
    columns=CONTINUOUS_COLUMNS
    )

vars = []
for i in range(len(CONTINUOUS_COLUMNS)): 
  for j in range(i+1, len(CONTINUOUS_COLUMNS)): 
    if abs(correlation_pd.iloc[i, j]) > 0.8:
      vars.append(correlation_pd.index[i])
      
CONTINUOUS_FEATURES = list(set(CONTINUOUS_COLUMNS) - set(vars))

len(CONTINUOUS_COLUMNS), len(CONTINUOUS_FEATURES)

(49, 28)

##### B. Correlation to target

In [None]:
vars = TARGET_COLUMN + CONTINUOUS_FEATURES
continuous_features = MF.VectorAssembler(
    inputCols=vars, outputCol='continuous_features_'
    )
continuous_vector_ = df_train.select(vars)

continuous_vector_variable_ = continuous_features.transform(continuous_vector_)

In [None]:
correlation_ = Correlation.corr(
    continuous_vector_variable_,
    'continuous_features_'
    )

In [None]:
correlation_array_ = correlation_.head()[0].toArray()
correlation_pd_ = pd.DataFrame(
    correlation_array_,
    index=vars,
    columns=vars
    )
CONTINUOUS_FEATURES = correlation_pd_.loc[
    abs(correlation_pd_[TARGET_COLUMN[0]]) > 0.05, TARGET_COLUMN[0]
    ].index.tolist()

CONTINUOUS_FEATURES = [i for i in CONTINUOUS_COLUMNS if i in CONTINUOUS_FEATURES]
len(CONTINUOUS_COLUMNS), len(CONTINUOUS_FEATURES)  

(49, 12)

In [None]:
correlation_pd_.loc[CONTINUOUS_FEATURES, :TARGET_COLUMN[0]].sort_values(by=TARGET_COLUMN[0], ascending=False)

Unnamed: 0,isFraud
C7,0.215502
V236,0.105408
V280,0.09523
V317,0.092926
V308,0.082079
V97,0.078032
V279,0.072887
V183,0.071482
V306,0.067614
V290,0.061227


In [None]:
max_thresholds = {}
for var in CONTINUOUS_FEATURES:
  max_thresholds[var] = maximum[var]

max_thresholds  

{'C7': 10.0,
 'V183': 4.0,
 'V236': 3.0,
 'V279': 5.0,
 'V280': 9.0,
 'V290': 3.0,
 'V306': 938.0,
 'V308': 1649.5,
 'V317': 1762.0,
 'V97': 7.0,
 'addr1': 540.0,
 'addr2': 87.0}

In [None]:
CONTINUOUS_FEATURES = [
    'addr1', 'addr2', 'C7', 'V97', 'V183', 'V236', 'V279', 
    'V280', 'V290', 'V306', 'V308', 'V317'
    ]
max_thresholds = {
    'addr1': 540.0,
    'addr2': 87.0,
    'C7': 10.0,
    'V97': 7.0,
    'V183': 4.0,
    'V236': 3.0,
    'V279': 5.0,
    'V280': 9.0,
    'V290': 3.0,
    'V306': 938.0,
    'V308': 1649.5,
    'V317': 1762.0
    }

# Feature preparation with transformers and estimators

## 1. Custom transformers

### Parameters from EDA

In [None]:
IDENTIFIERS = ['TransactionID']
TARGET_COLUMN = ['isFraud']
TIME_COLUMNS = ['TransactionDT']

## Binary columnes passed Chi2 test (EDA)
BINARY_FEATURES = ['V286', 'V26']

## Categorical columns passed Chi2 test (EDA)
CATEGORICAL_FEATURES = []

## Discrete columnes passed Chi2 test (EDA)
DISCRETE_FEATURES = ['weekdays', 'minutes']

## Continuous columns with correlation < 0.8 (EDA)
CONTINUOUS_FEATURES = [
    'addr1', 'addr2', 'C7', 'V97', 'V183', 'V236', 'V279', 
    'V280', 'V290', 'V306', 'V308', 'V317'
    ]
max_thresholds = {
    'addr1': 540.0,
    'addr2': 87.0,
    'C7': 10.0,
    'V97': 7.0,
    'V183': 4.0,
    'V236': 3.0,
    'V279': 5.0,
    'V280': 9.0,
    'V290': 3.0,
    'V306': 938.0,
    'V308': 1649.5,
    'V317': 1762.0
    }
# 0, >0
binary_1 = ['V26', 'V286']

### 1.1 Consolidate selected in EDA discrete variables to groups 0, >0

In [None]:
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import (
    HasInputCol,
    HasInputCols,
    HasOutputCol,
    HasOutputCols,
)
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


class DiscreteToBinaryTransformer(
    Transformer,
    HasInputCol,
    HasOutputCol,
    HasInputCols,
    HasOutputCols,
    DefaultParamsReadable,
    DefaultParamsWritable,
):
    """Consolidates discrete variables to groups 0, >0."""

    @keyword_only
    def __init__(
        self,
        inputCol=None,
        outputCol=None,
        inputCols=None,
        outputCols=None
    ):
        super().__init__()        
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(
        self,
        inputCol=None,
        outputCol=None,
        inputCols=None,
        outputCols=None
    ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)

    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)

    def setInputCols(self, new_inputCols):
        return self.setParams(inputCols=new_inputCols)

    def setOutputCols(self, new_outputCols):
        return self.setParams(outputCols=new_outputCols)

    def checkParams(self):
        # Test #1: either inputCol or inputCols can be set (but not both).
        if self.isSet("inputCol") and (self.isSet("inputCols")):
            raise ValueError(
                "Only one of `inputCol` and `inputCols`" "must be set."
            )

        # Test #2: at least one of inputCol or inputCols must be set.
        if not (self.isSet("inputCol") or self.isSet("inputCols")):
            raise ValueError(
                "One of `inputCol` or `inputCols` must be set."
            )

        # Test #3: if `inputCols` is set, then `outputCols`
        # must be a list of the same len()
        if self.isSet("inputCols"):
            if len(self.getInputCols()) != len(self.getOutputCols()):
                raise ValueError(
                    "The length of `inputCols` does not match"
                    " the length of `outputCols`"
                    )

    def _transform(self, dataset):
        self.checkParams()

        # If `inputCol` / `outputCol`, we wrap into a single-item list
        input_columns = (
            [self.getInputCol()]
            if self.isSet("inputCol")
            else self.getInputCols()
            )
        output_columns = (
            [self.getOutputCol()]
            if self.isSet("outputCol")
            else self.getOutputCols()
            )      
        for var_in, var_out in zip(input_columns, output_columns):
          dataset = dataset.withColumn(
              var_out, F.when(F.col(var_in) > 0, 1).otherwise(F.col(var_in))
              )
        return dataset

### 1.2. Cap max values of continuous variables

In [None]:
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import (
    HasInputCol,
    HasInputCols,
    HasOutputCol,
    HasOutputCols,
)
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from typing import Iterable

class ContinuousOutliersCapper(
    Transformer,
    HasInputCol,
    HasInputCols,  
    DefaultParamsReadable,
    DefaultParamsWritable
    ):
    """
    Caps max values of continuous variables by 99th percentile values if 
    the difference between max value and 99th percentile value exceeds 
    standard deviation.
    """
    maximum = Param(
        Params._dummy(),
       'maximum',
       "Values we want to replace our outliers values with.",
        typeConverter=TypeConverters.toListFloat
        )

    @keyword_only
    def __init__(self, inputCol=None, inputCols=None, maximum=None):     
        super().__init__()        
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, inputCols=None, maximum=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)
    
    def setInputCols(self, new_inputCols):
        return self.setParams(inputCols=new_inputCols)

    def setMaximum(self, new_maximum):
        return self.setParams(maximum=new_maximum)   

    def getMaximum(self):
        return self.getOrDefault(self.maximum)

    def checkParams(self):
        # Test #1: either inputCol or inputCols can be set (but not both).
        if self.isSet("inputCol") and (self.isSet("inputCols")):
            raise ValueError(
                "Only one of `inputCol` and `inputCols`" "must be set."
            )

        # Test #2: at least one of inputCol or inputCols must be set.
        if not (self.isSet("inputCol") or self.isSet("inputCols")):
            raise ValueError(
                "One of `inputCol` or `inputCols` must be set."
            )
         
    def _transform(self, dataset):
        self.checkParams()

        # If `inputCol`, we wrap into a single-item list  
        input_columns = (
            [self.getInputCol()]
            if self.isSet("inputCol")
            else self.getInputCols()
            )
        max_thresholds = (
            [self.getMaximum()]
            if self.isSet("inputCol")
            else self.getMaximum()
            )
        for k, v in zip(input_columns, max_thresholds):
            dataset = dataset.withColumn(
                k, F.when(F.isnull(F.col(k)), F.col(k)
                ).otherwise(F.least(F.col(k), F.lit(v)))
                ) 
        return dataset

### 1.3. Generate time features

In [None]:
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import (
    HasInputCol,
    HasInputCols,
    HasOutputCol,
    HasOutputCols,
)
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import IntegerType

class TimeFeaturesGenerator(
    Transformer,
    HasInputCol,    
    HasInputCols,    
    DefaultParamsReadable,
    DefaultParamsWritable
    ):
    """
    Generates weekdays, hours and minutes from time variable.
    """
    @keyword_only
    def __init__(self, inputCol=None, inputCols=None):
        super().__init__()        
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(
        self,
        inputCol=None,        
        inputCols=None
        ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)
    
    def setInputCols(self, new_inputCols):
        return self.setParams(inputCols=new_inputCols)    

    def checkParams(self):
        # Test #1: either inputCol or inputCols can be set (but not both).
        if self.isSet("inputCol") and (self.isSet("inputCols")):
            raise ValueError(
                "Only one of `inputCol` and `inputCols`" "must be set."
            )

        # Test #2: at least one of inputCol or inputCols must be set.
        if not (self.isSet("inputCol") or self.isSet("inputCols")):
            raise ValueError(
                "One of `inputCol` or `inputCols` must be set."
            )

    def _transform(self, dataset):
        self.checkParams()

        # If `inputCol`, we wrap into a single-item list
        input_columns = (
            [self.getInputCol()]
            if self.isSet("inputCol")
            else self.getInputCols()
        )        
        w = 60 * 60 * 24 * 7
        d = 60 * 60 * 24
        h = 60 * 60
        m = 60 
        time_var = input_columns[0] 
        dataset = dataset.withColumn(
            'weekdays', (F.col(time_var) % w / d).cast(IntegerType())
            )
        dataset = dataset.withColumn(
            'hours', (F.col(time_var) % d / h).cast(IntegerType())
            )
        dataset = dataset.withColumn(
            'minutes', (F.col(time_var) % d % h / m).cast(IntegerType())
            )      
       
        return dataset

### 1.4 Fill null values with constants

In [None]:
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import (
    HasInputCol,
    HasInputCols,
    HasOutputCol,
    HasOutputCols,
)
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


class ScalarNAFiller(
    Transformer,
    HasInputCol,
    HasOutputCol,
    HasInputCols,
    HasOutputCols,
    DefaultParamsReadable,
    DefaultParamsWritable,
):
    """Fills the `null` values of inputCol with a scalar value `filler`."""

    filler = Param(
        Params._dummy(),
        "filler",
        "Value we want to replace our null values with.",
        typeConverter=TypeConverters.toFloat,
    )

    @keyword_only
    def __init__(
        self,
        inputCol=None,
        outputCol=None,
        inputCols=None,
        outputCols=None,
        filler=None,
    ):
        super().__init__()
        self._setDefault(filler=None)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(
        self,
        inputCol=None,
        outputCol=None,
        inputCols=None,
        outputCols=None,
        filler=None,
    ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setFiller(self, new_filler):
        return self.setParams(filler=new_filler)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)

    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)

    def setInputCols(self, new_inputCols):
        return self.setParams(inputCols=new_inputCols)

    def setOutputCols(self, new_outputCols):
        return self.setParams(outputCols=new_outputCols)

    def getFiller(self):
        return self.getOrDefault(self.filler)

    def checkParams(self):
        # Test #1: either inputCol or inputCols can be set (but not both).
        if self.isSet("inputCol") and (self.isSet("inputCols")):
            raise ValueError(
                "Only one of `inputCol` and `inputCols`" "must be set."
            )

        # Test #2: at least one of inputCol or inputCols must be set.
        if not (self.isSet("inputCol") or self.isSet("inputCols")):
            raise ValueError(
                "One of `inputCol` or `inputCols` must be set."
            )

        # Test #3: if `inputCols` is set, then `outputCols`
        # must be a list of the same len()
        if self.isSet("inputCols"):
            if len(self.getInputCols()) != len(self.getOutputCols()):
                raise ValueError(
                    "The length of `inputCols` does not match"
                    " the length of `outputCols`"
                )

    def _transform(self, dataset):
        self.checkParams()

        # If `inputCol` / `outputCol`, we wrap into a single-item list
        input_columns = (
            [self.getInputCol()]
            if self.isSet("inputCol")
            else self.getInputCols()
        )
        output_columns = (
            [self.getOutputCol()]
            if self.isSet("outputCol")
            else self.getOutputCols()
        )

        answer = dataset

        # If input_columns == output_columns, we overwrite and no need to create
        # new columns.
        if input_columns != output_columns:
            for in_col, out_col in zip(input_columns, output_columns):
                answer = answer.withColumn(out_col, F.col(in_col))

        na_filler = self.getFiller()
        return dataset.fillna(na_filler, output_columns)

### 1.5. Transform discrete variables to string format (for one-hot encoding)

In [None]:
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml import Estimator, Model, Transformer
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import (
    HasInputCol,
    HasInputCols,
    HasOutputCol,
    HasOutputCols,
)
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import IntegerType, StringType 

class StringFromDiscrete(
    Transformer,
    HasInputCol,
    HasOutputCol,
    HasInputCols,
    HasOutputCols,
    DefaultParamsReadable,
    DefaultParamsWritable,
):
    """Transforms discrete variables to string format (for one-hot encoding)."""

    @keyword_only
    def __init__(
        self,
        inputCol=None,
        outputCol=None,
        inputCols=None,
        outputCols=None
        ):
        super().__init__()        
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(
        self,
        inputCol=None,
        outputCol=None,
        inputCols=None,
        outputCols=None 
        ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)

    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)

    def setInputCols(self, new_inputCols):
        return self.setParams(inputCols=new_inputCols)

    def setOutputCols(self, new_outputCols):
        return self.setParams(outputCols=new_outputCols)

    def checkParams(self):
        # Test #1: either inputCol or inputCols can be set (but not both).
        if self.isSet("inputCol") and (self.isSet("inputCols")):
            raise ValueError(
                "Only one of `inputCol` and `inputCols`" "must be set."
            )

        # Test #2: at least one of inputCol or inputCols must be set.
        if not (self.isSet("inputCol") or self.isSet("inputCols")):
            raise ValueError(
                "One of `inputCol` or `inputCols` must be set."
            )

        # Test #3: if `inputCols` is set, then `outputCols`
        # must be a list of the same len()
        if self.isSet("inputCols"):
            if len(self.getInputCols()) != len(self.getOutputCols()):
                raise ValueError(
                    "The length of `inputCols` does not match"
                    " the length of `outputCols`"
                )

    def _transform(self, dataset):
        self.checkParams()

        # If `inputCol` / `outputCol`, we wrap into a single-item list
        input_columns = (
            [self.getInputCol()]
            if self.isSet("inputCol")
            else self.getInputCols()
        )
        output_columns = (
            [self.getOutputCol()]
            if self.isSet("outputCol")
            else self.getOutputCols()
        )       

        for var_in, var_out in zip(input_columns, output_columns):
          dataset = dataset.withColumn(
              var_out,
              dataset[var_in].cast(StringType())
          )
        return dataset

## 2. Feature extraction pipeline


### 2.1 Pipeline code

In [None]:
import pyspark.ml.feature as MF
from pyspark.ml import Pipeline

def get_feature_extraction_pipeline():  
    discrete_to_binary = DiscreteToBinaryTransformer(
        inputCols=binary_1,
        outputCols=binary_1
        )
    cap_countinuous_outliers = ContinuousOutliersCapper(
        maximum=list(max_thresholds.values()),
        inputCols=list(max_thresholds.keys())
        ) 
    get_time_features = TimeFeaturesGenerator(inputCols=TIME_COLUMNS)
    make_string_columns_from_discrete = StringFromDiscrete(
        inputCols=BINARY_FEATURES,
        outputCols= [var + '_str' for var in BINARY_FEATURES]    
        )
    discrete_fill_nan = ScalarNAFiller(
        inputCols=BINARY_FEATURES + DISCRETE_FEATURES,
        outputCols=BINARY_FEATURES + DISCRETE_FEATURES,
        filler=-999
        )
    continuous_fill_nan = ScalarNAFiller(
        inputCols=CONTINUOUS_FEATURES,
        outputCols=CONTINUOUS_FEATURES,
        filler=0
        )
    binary_string_indexer = MF.StringIndexer(
        inputCols=[i + '_str' for i in BINARY_FEATURES], 
        outputCols=[i + '_index' for i in BINARY_FEATURES],
        handleInvalid='keep'
        )
    binary_one_hot_encoder = MF.OneHotEncoder(
        inputCols=[i + '_index' for i in BINARY_FEATURES], 
        outputCols=[i + '_encoded' for i in BINARY_FEATURES],    
        )
    discrete_features_assembler = MF.VectorAssembler(
        inputCols=DISCRETE_FEATURES, 
        outputCol='discrete_assembled'
        )
    discrete_minmax_scaler = MF.MinMaxScaler(
        inputCol='discrete_assembled', 
        outputCol='discrete_vector_scaled'
        )
    continuous_features_assembler = MF.VectorAssembler(
        inputCols=CONTINUOUS_FEATURES, 
        outputCol='continuous_assembled'
        )
    continuous_robust_scaler = MF.RobustScaler(
        inputCol='continuous_assembled', 
        outputCol='continuous_vector_scaled'
        )
    binary_vars = [i + '_encoded' for i in BINARY_FEATURES]
    vars = binary_vars + ['discrete_vector_scaled', 'continuous_vector_scaled']
    features_assembler = MF.VectorAssembler(
        inputCols=vars,
        outputCol='features'
        )
    
    feature_extraction_pipeline = Pipeline(
        stages=[
            discrete_to_binary,
            cap_countinuous_outliers,
            get_time_features,
            discrete_fill_nan,
            continuous_fill_nan,        
            make_string_columns_from_discrete,
            binary_string_indexer,
            binary_one_hot_encoder,
            discrete_features_assembler,
            discrete_minmax_scaler,        
            continuous_features_assembler,
            continuous_robust_scaler,
            features_assembler
            ]
        )
    return feature_extraction_pipeline

### 2.2. Test pipeline

In [None]:
feature_extraction_pipeline = get_feature_extraction_pipeline()

In [None]:
feature_extraction_pipeline_model = feature_extraction_pipeline.fit(df_train)
df_train = feature_extraction_pipeline_model.transform(df_train)
df_train[IDENTIFIERS + TARGET_COLUMN + ['features']].show(3, truncate=60)

+-------------+-------+------------------------------------------------------------+
|TransactionID|isFraud|                                                    features|
+-------------+-------+------------------------------------------------------------+
|      3166872|      0|(20,[0,3,6,7,8],[1.0,1.0,0.5,0.864406779661017,1.26573426...|
|      3130432|      0|(20,[0,3,6,7,8],[1.0,1.0,1.0,0.3898305084745763,3.5804195...|
|      3031540|      0|(20,[0,3,6,7,8],[1.0,1.0,0.8333333333333333,0.22033898305...|
+-------------+-------+------------------------------------------------------------+
only showing top 3 rows



In [None]:
df_test = feature_extraction_pipeline_model.transform(df_test)
df_test[IDENTIFIERS + TARGET_COLUMN + ['features']].show(3, truncate=60)

+-------------+-------+------------------------------------------------------------+
|TransactionID|isFraud|                                                    features|
+-------------+-------+------------------------------------------------------------+
|      3335921|      0|(20,[0,3,6,7,8,15,18],[1.0,1.0,0.16666666666666666,0.6101...|
|      3326060|      0|(20,[0,4,6,7,8],[1.0,1.0,0.8333333333333333,0.77966101694...|
|      3334070|      0|(20,[0,3,7,8,15,18],[1.0,1.0,0.15254237288135594,1.0,1.0,...|
+-------------+-------+------------------------------------------------------------+
only showing top 3 rows



### 2.3. Save pipeline

In [None]:
feature_extraction_pipeline.save('feature_extraction_pipeline')
!zip -r feature_extraction_pipeline.zip feature_extraction_pipeline

  adding: feature_extraction_pipeline/ (stored 0%)
  adding: feature_extraction_pipeline/metadata/ (stored 0%)
  adding: feature_extraction_pipeline/metadata/part-00000 (deflated 35%)
  adding: feature_extraction_pipeline/metadata/_SUCCESS (stored 0%)
  adding: feature_extraction_pipeline/metadata/.part-00000.crc (stored 0%)
  adding: feature_extraction_pipeline/metadata/._SUCCESS.crc (stored 0%)
  adding: feature_extraction_pipeline/stages/ (stored 0%)
  adding: feature_extraction_pipeline/stages/10_VectorAssembler_e4ea796d9669/ (stored 0%)
  adding: feature_extraction_pipeline/stages/10_VectorAssembler_e4ea796d9669/metadata/ (stored 0%)
  adding: feature_extraction_pipeline/stages/10_VectorAssembler_e4ea796d9669/metadata/part-00000 (deflated 39%)
  adding: feature_extraction_pipeline/stages/10_VectorAssembler_e4ea796d9669/metadata/_SUCCESS (stored 0%)
  adding: feature_extraction_pipeline/stages/10_VectorAssembler_e4ea796d9669/metadata/.part-00000.crc (stored 0%)
  adding: feature_ex