autor: @LuisFalva

### SMOTE es una técnica para balancear datos. Normalmente, a la hora de entrenar un modelo tenemos que generar nuestra variable *target* [0,1] con el cual podremos calcular una predicción apartir de los registros observados, ¿pero que pasa cuando el 'target' que nos interesa es la clase minoritaria? Esto es un problema típico que muchos modelos sufren, dado que nuestra clase de interés será, en la mayoría de los casos, la clase minoritaria, tenemos que buscar una técnica para implementar un sobremuestreo sin perder información.

<img src="src/smote.gif" width="750" align="center">

### Dentro de este notebook, están las notas de estudio respecto a la técnica Synthetic Minority Oversampling Technique [SMOTE] la cual hace uso del algoritmo de k-NN para encontrar los vecinos más cercanos a la clase minoritaria, i.e. la clase de los positivos '1'.

**img link: [The main issue with identifying Financial Fraud using Machine Learning (and how to address it)](https://towardsdatascience.com/the-main-issue-with-identifying-financial-fraud-using-machine-learning-and-how-to-address-it-3b1bf8fa1e0c)**

In [1]:
import random
import numpy as np

from sklearn import neighbors
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import when, col
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import VectorAssembler

### Este notebook involucra el uso mixto de sklearn y pyspark, la idea de esta solución es publicar una forma de tantas posibles para implementar un método de muestreo de forma distribuida, por tal razón tenemos que definir a continuación el contexto de Spark.

In [2]:
spark = SparkSession.builder.appName("SMOTE").getOrCreate()

### Para la construcción de la función que nos ayudará a generar nuestras muestras sintéticas, vamos a cargar la tabla **"src/data/"**, la cual contiene una cantidad de variables que describen las caracteristicas principales de un cliente por cada renglón. El dataframe que usaremos mantendrá de origen las siguientes variables numéricas:
- **[age, child, saving, insight, backup, marital]**

In [3]:
arr_col = ["age", "child", "saving", "insight", "backup", "marital"]
smote_test = spark.read.parquet("src/data/").select(*arr_col)
smote_test.show(5, False)

+---+-----+------+-------+------+-------+
|age|child|saving|insight|backup|marital|
+---+-----+------+-------+------+-------+
|59 |1    |0     |1      |1     |married|
|56 |0    |1     |0      |1     |married|
|41 |1    |1     |0      |0     |married|
|55 |1    |0     |0      |1     |married|
|54 |1    |0     |0      |1     |married|
+---+-----+------+-------+------+-------+
only showing top 5 rows



In [4]:
test = smote_test.select("*", (when(col("marital") == "divorced", 1).otherwise(0)).alias("target")).drop("marital")
test.groupBy("target").count().show()
test.where(col("target") == 1).show(5)

+------+-----+
|target|count|
+------+-----+
|     1| 1293|
|     0| 9869|
+------+-----+

+---+-----+------+-------+------+------+
|age|child|saving|insight|backup|target|
+---+-----+------+-------+------+------+
| 60|    0|     1|      0|     0|     1|
| 35|    0|     1|      1|     1|     1|
| 49|    1|     1|      1|     0|     1|
| 28|    0|     0|      0|     0|     1|
| 43|    1|     1|      0|     1|     1|
+---+-----+------+-------+------+------+
only showing top 5 rows



### Lo que nosotros buscamos para entrenar nuestro modelo de k vecinos cercanos [i.e. k-NN] es un objeto de tipo numpy array con los valores de cada registro, algo similar a esto:

In [5]:
np.array(test.where(col("target") == 1).drop("target").collect())

array([[60,  0,  1,  0,  0],
       [35,  0,  1,  1,  1],
       [49,  1,  1,  1,  0],
       ...,
       [52,  0,  0,  0,  0],
       [38,  0,  1,  0,  1],
       [60,  1,  1,  0,  1]])

**NOTA: Sin embargo, para convertir de un Spark Dataframe a un objeto de tipo numpy.array es conveniente antes transformarlo a RDD, por lo que los métodos de las clase SparkSMOTE realizarán internamente esos parseos.**

### Para entrenar el modelo de k-NN necesitaremos convertir nuestro spark Dataframe a un objeto de tipo numpy array, y para ello debemos bajar nuestra estructura dataframe a rdd's, para que la estructura de datos al ser transformada ésta sea de manera distribuida.

In [6]:
def vector_assembling(data_input, target_name):
    """
    Vector assembling function will create a vector filled with features for each row
    
    :param data_input: df, spark Dataframe with target label
    :param target_name: str, string name from target label
    :return: Dataframe, table that includes the feature vector and label
    """
    
    if data_input.select(target_name).distinct().count() != 2:
        raise ValueError("Target field must have only 2 distinct classes")
    
    column_names = list(data_input.drop(target_name).columns)
    vec_assembler = VectorAssembler(inputCols=column_names, outputCol='features')
    vec_transform = vec_assembler.transform(data_input)
    vec_feature = vec_transform.select('features', (vec_transform[target_name]).alias("label"))
    
    return vec_feature

def split_target(df, field, minor=1, major=0):
    """
    Split target will split in two distinct Dataframe from label 1 and 0
    
    :param df: Dataframe, spark Dataframe with target label
    :param field: str, string name from taget label
    :param minor: int, integer number for minority class; '1' set as default
    :param major: int, integer number for majority class; '0' set as default
    :return: dict, python dictionary with separated Dataframe
    """
    minor = df[df[field] == minor]
    major = df[df[field] == major]
    
    return {"minor": minor, "major": major}

def spkdf_to_nparr(df, feature):
    """
    Spkdf to nparr function will help to parse from spark Dataframe to numpy array
    in a distributed way
    
    :param df: Dataframe, spark Dataframe with features column
    :param feature: str, string name of column features name
    :return: np.array, numpy array object with features
    """
    feature_df = df.select(feature)
    
    return np.asarray(feature_df.rdd.map(lambda x: x[0]).collect())

def nparr_to_spkdf(spark_session, arr, feat="features", label="label"):
    """
    Nparr to spkdf function will help to parse from numpy array to spark Dataframe
    in a distributed way
    
    :param df: Dataframe, spark Dataframe with features column
    :param feat: str, string name of column features name; 'features' set as default
    :param label: str, string name of column label name; 'label' set as default
    :return: Dataframe, with feautures and label
    """
    sc = spark_session.sparkContext
    data_set = sc.parallelize(arr)
    data_rdd = data_set.map(lambda x: (Row(fatures=DenseVector(x), label=1)))
    
    return data_rdd.toDF()

def smote_sampling(spark, df, k=2, algth="auto", pct_over_min=100, pct_under_max=100):
    """
    Smote sampling function will create an oversampling with SMOTE technique
    
    :param df: Dataframe, spark Dataframe with features column
    :param k: int, integer k folds for KNN's groups; '2' set as default
    :param algrth: str, string name for KNN's algorithm choice; 'auto' set as default
    :param pct_over_min: int, integer number for sampling minority class; '100' set as default
    :param pct_under_max: int, integer number for sampling majority class; '100' set as default
    :return: Dataframe, with new SMOTE features sampled
    """
    def k_neighbor(k, algth, feature):
        """
        k neighbor will compute Nearest Neighbors sklearn algorithm

        :param k: int, integer number for k nearest neighbors groups
        :param feature: str, string name of column features name
        :return: list, python list with numpy array object for each neighbor
        """
        neighbor = neighbors.NearestNeighbors(n_neighbors=k, algorithm=algth)
        model_fit = neighbor.fit(feature)
        
        return model_fit.kneighbors(feature)
    
    def compute_smo(neighbor_list, min_pct, min_arr):
        """
        Compute smo function will compute the SMOTE oversampling technique

        :param neighbor_list: list, python list with numpy array object for each neighbor
        :param min_pct: int, integer pct for over min
        :param min_arr: list, python list with minority class rows
        :param k: int, integer number for k nearest neighbors groups
        :return: list, python list with sm class oversampled
        """
        if min_pct < 100:
            raise ValueError("pct_over_min can't be less than 100")
        
        smo = []
        counter = 0
        pct_over = int(min_pct / 100)
        
        while len(min_arr) > counter:
            for i in range(pct_over):
                random_neighbor = random.randint(0, len(neighbor)-1)
                diff = neighbor_list[random_neighbor][0] - min_arr[i][0]
                new_record = (min_arr[i][0] + random.random() * diff)
                smo.insert(0, (new_record))
            counter+=1
            
        return smo
    
    data_input_min = split_target(df=df, field="label")["minor"]
    data_input_max = split_target(df=df, field="label")["major"]
    
    feature_mat = spkdf_to_nparr(df=data_input_min, feature="features")
    neighbor = k_neighbor(k=k, algth=algth, feature=feature_mat)[1]
    
    min_array = data_input_min.drop("label").rdd.map(lambda x : list(x)).collect()
    new_row = compute_smo(neighbor_list=neighbor, min_pct=pct_over_min, min_arr=min_array)
    smo_data_df = nparr_to_spkdf(spark_session=spark, arr=new_row)
    smo_data_minor = data_input_min.unionAll(smo_data_df)
    
    if (pct_under_max < 10) | (pct_under_max > 100):
        raise ValueError("pct_under_max can't be less than 10 either higher than 100")
    new_data_major = data_input_max.sample(False, (float(pct_under_max / 100)))
    
    return new_data_major.unionAll(smo_data_minor)

### Para computar nuestras muestras sintéticas debemos antes vectorizar los atributos que tengamos en nuestra tabla de datos, esto significa que debemos tomar los valores de cada columna y crear vectores de longitud **$p$**. Este método asume tres principales puntos:

- Normalización y estandarización de variables
- Mapeo de cada valor por columna a codificaciones binarias (StringIndexer, OneHotEncoder)
- Spark Dataframe vectorizado, i.e. con columna de vectores densos y escasos (features), y columna dicotómica (label)

In [7]:
vector_assemble = vector_assembling(test, "target")
vector_assemble.show(5, False)

+----------------------+-----+
|features              |label|
+----------------------+-----+
|[59.0,1.0,0.0,1.0,1.0]|0    |
|[56.0,0.0,1.0,0.0,1.0]|0    |
|[41.0,1.0,1.0,0.0,0.0]|0    |
|[55.0,1.0,0.0,0.0,1.0]|0    |
|[54.0,1.0,0.0,0.0,1.0]|0    |
+----------------------+-----+
only showing top 5 rows



### Como muestra de su funcionamiento, para aplicar el método *smote_sampling* requerimos de la tabla anterior con variables previamente standarizados, codificados y vectorizados. Como se puede ver, el método recibe los argumentos 'pct_over_min' y 'pct_under_max' configurados por default en [100, 100] respectivamente, cada uno de esos argumentos ayudarán a manipular el submuestreo o sobremuestreo de ambas clases que se ven en la siguiente tabla.

**pct_over_min; modificará la cantidad de registros que existe para la clase minoritaria sobremuestreando los registros con valores sintéticos, en este caso, la clase '1'**

**pct_under_max; modificará la cantidad de registros que existe para la clase mayoritaria submuestreando los registros, en este caso, la clase '0'**

In [8]:
smote_sample = smote_sampling(spark, vector_assemble, pct_over_min=600, pct_under_max=100)
smote_sample.groupBy("label").count().show()
smote_sample.where(col("label") == 1).orderBy(col("features").desc()).limit(5).toPandas()

+-----+-----+
|label|count|
+-----+-----+
|    0| 9869|
|    1| 9051|
+-----+-----+



Unnamed: 0,features,label
0,"[1271.0918104712482, 1270.6470927128566, 1270....",1
1,"[1252.0007268819193, 1251.6117668010615, 1251....",1
2,"[1248.406327167742, 1248.385651994479, 1248.38...",1
3,"[1247.931361874167, 1247.8941456210284, 1247.8...",1
4,"[1230.4068770858046, 1229.7593553922754, 1229....",1


#### Referencias:
- https://rikunert.com/SMOTE_explained
- https://bmcbioinformatics.biomedcentral.com/articles/10.1186/1471-2105-14-106
- https://machinelearningmastery.com/smote-oversampling-for-imbalanced-classification/