# Challenge Avanzado: Análisis de Grandes Bases de Datos con PySpark, Relacionado a la Tesis

## Contenido

1. Fuentes de datos
2. Descripción de los datos
3. Limpieza de datos
4. Valores faltantes
5. Visualización de datos
6. Referencias

## 1. Fuentes de datos

La fuente de datos de este trabajo es **estrucurada** en formato xlsx. Los datos son **privados** y se eliminó toda la información sensible por privacidad de los datos. Esta base de datos contiene información sobre rotación de personal de empresas manufactureras de Tala, Jalisco.

Se trata de una base de datos transversal que comprende del 20 de abril del 2021 al 30 de abril del 2024.

In [1]:
## Creamos una instancia de Pyspark

from pyspark.sql import SparkSession

#spark = SparkSession.builder.appName("TodlerAutism").getOrCreate()

spark = SparkSession.builder.appName("rotacion").master("local[*]").config("spark.executor.memory", "2g").config("spark.driver.memory", "2g").getOrCreate()


cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()

print("Se tienen ", cores, "núcleo(s)")

spark

Se tienen  1 núcleo(s)


In [None]:
# Librerías necesarias

from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import functions as F

from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder # Aqui se incluye la parte de Cross Validation

In [None]:
# Importación de datos como spark DF
df = spark.read.csv('rotacion_personal_clean.csv',inferSchema=True,header=True)

In [31]:
df.printSchema()

root
 |-- FECHA DE INGRESO: date (nullable = true)
 |-- FECHA ULTIMO REGISTRO: date (nullable = true)
 |-- Estatus: string (nullable = true)
 |-- Días Laborados: integer (nullable = true)
 |-- FECHA DE NACIMIENTO: timestamp (nullable = true)
 |-- No de Crédito Infonavit: string (nullable = true)
 |-- PUESTO: string (nullable = true)
 |-- AREA: string (nullable = true)
 |-- TURNO: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- SALARIO MENSUAL: double (nullable = true)
 |-- ESCOLARIDAD: string (nullable = true)
 |-- GENERO: string (nullable = true)
 |-- Tipo de renuncia: string (nullable = true)



Para facilitar el análisis se renombran las columnas con títulos más adecuados y sin carácteres especiales ni espacios.

In [32]:
#Renombramos variables
df = df.withColumnRenamed("FECHA DE INGRESO", "FECHA_INGRESO") \
       .withColumnRenamed("FECHA ULTIMO REGISTRO", "FECHA_ULTIMO_REGISTRO") \
       .withColumnRenamed("Estatus", "ESTATUS") \
       .withColumnRenamed("Días Laborados", "DIAS_LABORADOS") \
       .withColumnRenamed("FECHA DE NACIMIENTO", "FECHA_NACIMIENTO") \
       .withColumnRenamed("No de Crédito Infonavit", "INFONAVIT") \
       .withColumnRenamed("PUESTO", "PUESTO") \
       .withColumnRenamed("AREA", "AREA") \
       .withColumnRenamed("TURNO", "TURNO") \
       .withColumnRenamed("MUNICIPIO", "MUNICIPIO") \
       .withColumnRenamed("SALARIO MENSUAL", "SALARIO_MENSUAL") \
       .withColumnRenamed("ESCOLARIDAD", "ESCOLARIDAD") \
       .withColumnRenamed("GENERO", "GENERO") \
       .withColumnRenamed("Tipo de renuncia", "MOTIVO_RENUNCIA")

df.printSchema()

root
 |-- FECHA_INGRESO: date (nullable = true)
 |-- FECHA_ULTIMO_REGISTRO: date (nullable = true)
 |-- ESTATUS: string (nullable = true)
 |-- DIAS_LABORADOS: integer (nullable = true)
 |-- FECHA_NACIMIENTO: timestamp (nullable = true)
 |-- INFONAVIT: string (nullable = true)
 |-- PUESTO: string (nullable = true)
 |-- AREA: string (nullable = true)
 |-- TURNO: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- SALARIO_MENSUAL: double (nullable = true)
 |-- ESCOLARIDAD: string (nullable = true)
 |-- GENERO: string (nullable = true)
 |-- MOTIVO_RENUNCIA: string (nullable = true)



De las variables con las que contamos las fechas no se pueden procesar en el modelo, así que antes de eliminarlas crearemos una nueva variable edad restando la fecha de nacimiento a la de corte de los datos 2024-04-30. 

También se eliminará la columna Motivo de renuncia ya que tiene muchos datos faltantes.

In [33]:
# Fecha de referencia para calcular la edad
fecha_referencia = '2024-04-30'

# Calcular la diferencia en días entre la fecha de nacimiento y la fecha de referencia
df = df.withColumn(
    "EDAD",
    (F.datediff(F.lit(fecha_referencia), F.col("FECHA_NACIMIENTO")) / 365.25).cast("int")
)

# Eliminar la columna MOTIVO_RENUNCIA Y LAS DE FECHA
df = df.drop("MOTIVO_RENUNCIA", "FECHA_INGRESO", "FECHA_ULTIMO_REGISTRO", "FECHA_NACIMIENTO" )

# Obtener la lista de las columnas actuales
columns = df.columns

# Mover ESTATUS al final
columns.remove("ESTATUS")
columns.append("ESTATUS")

# Reordenar las columnas para que ESTATUS esté al final
df = df.select(*columns)

In [34]:
# Variable objetivo
df.groupBy("ESTATUS").count().show(20)

+-------+-----+
|ESTATUS|count|
+-------+-----+
|   baja|  425|
| activo|   70|
+-------+-----+



In [None]:
# Función de tratamiento de datos

def df_treat(df,input_columns,target,treat_outliers=False,treat_neg=False):

    # Puede ser que target no sea string
    df_renamed = df.withColumn("label_str",df[target].cast(StringType()))

    # variable target string -> numeric/dummy/boolean/onehotencoder
    indexer = StringIndexer(inputCol="label_str",
                            outputCol="label") # por default pyspark target es "label"
    df_indexed = indexer.fit(df_renamed).transform(df_renamed)
    print(df_indexed.groupBy("ESTATUS","label").count().show())

    # identificamos las variables categoricas y numericas
    numeric_input =[]
    string_input = []

    for column in input_columns: # se va por cada variable
        if str(df_indexed.schema[column].dataType)=='StringType()':
            indexer = StringIndexer(inputCol=column,
                                  outputCol=column+'_num') # funciond e indexeo
            df_indexed = indexer.fit(df_indexed).transform(df_indexed)
            new_col_name = column+'_num' # este nuevo nombre es el que se agrega a las listas vacías
            string_input.append(new_col_name)
        else:
           numeric_input.append(column)

    print("numeric_input:", numeric_input)
    print("string_input:", string_input)

    if treat_outliers:

        d = {} #aqui vamos a guardar los percentiles 1 y 99

        for col in numeric_input:
            d[col] = df_indexed.approxQuantile(col,
                                              [0.01,0.99],
                                               0.25)
    # checamos la asimetría de todas las columnas numéricas
        for col in numeric_input:
            skew = df_indexed.agg(skewness(df_indexed[col])).collect()
            skew = skew[0][0]

            if skew > 1: # si se tiene asimetría positiva, se hace tranformación log
                df_indexed = df_indexed.withColumn(col,
                                           log(when(df[col] < d[col][0],d[col][0]).when(df_indexed[col] > d[col][1], d[col][1]).otherwise(df_indexed[col] ) +1).alias(col))
                print("La variable "+col+" ha sido tratada para asimetría positiva con coeficiente = ",skew)
            elif skew < -1: # si se tiene asimetría negativa, se hace tranformación exp
                df_indexed = df_indexed.withColumn(col,
                                           exp(when(df[col] < d[col][0],d[col][0]).when(df_indexed[col] > d[col][1], d[col][1]).otherwise(df_indexed[col] ) +1).alias(col))
                print("La variable "+col+" ha sido tratada para asimetría negativa con coeficiente = ",skew)

    # Calculamos los valores minimos para todas las columnas del dataframe
    minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_input])

    # Creamos un array con todos los minimos y seleccionamos solo las columnas con valores de entrada
    min_array = minimums.select(array(numeric_input).alias("mins"))

    # Lo siguiente es collectar los minimos globales como objeto en python
    # aqui tambien se utiliza una funcion array_min en lugar de min, para spark sql
    df_minimum = min_array.select(min(min_array.mins)).collect()

    # Ahora tenemos que cortar lo anterior al número que necesitamos
    df_minimum = df_minimum[0][0][0]

    # Si hay algún valor negativo en el DF, se da un mensaje
    if df_minimum < 0:
        print("ATENCION: Existen valores negativos en el DataSet")
    else:
        print("No se encuentran valores negativos en el DataSet")

    # vectorizar
    # Antes de cooregir los valores negativos, se tiene que vectorizar lo que hemos hecho
    features_list = numeric_input + string_input
    # Creamos el objeto de VectorAssembler
    assembler = VectorAssembler(inputCols=features_list,
                                outputCol="features")
    # Utilizamos este assembler para trasformar los datos
    df_output = assembler.transform(df_indexed).select("features",'label')


    if treat_neg:
        # reescalar valores numéricos
        scaler = MinMaxScaler(inputCol='features',
                            outputCol="scaledFeatures") #el vector que ya habiamos creado
        print("Features escalados a un rango de: [%f,%f]" % (scaler.getMin(), scaler.getMax()))

        # Generar el modelo MinMaxScalerModel
        scalerModel = scaler.fit(df_output)

        # Reescalar cada feature al rango indicado
        df_scaled = scalerModel.transform(df_output)

        # DataFrame final
        df_final = df_scaled.select('label','scaledFeatures')
        df_final = df_final.withColumnRenamed('scaledFeatures','features')
        print("Listo")
    else:
        print("WARNING:No hay correcciones de valores negativos")
        df_final = df_output

    return df_final

In [None]:
input_columns = df.columns[1:-1] # la última entrada es la variable objetivo
target = "ESTATUS" # variable objetivo/dependiente

# Llamamos la función de tratamiento de datos
df_final = df_treat(df=df,
                      input_columns=input_columns,
                      target=target,
                      treat_outliers=True,
                      treat_neg=True)

df_final.toPandas()

+-------+-----+-----+
|ESTATUS|label|count|
+-------+-----+-----+
|   baja|  0.0|  425|
| activo|  1.0|   70|
+-------+-----+-----+

None
numeric_input: ['SALARIO_MENSUAL', 'EDAD']
string_input: ['INFONAVIT_num', 'PUESTO_num', 'AREA_num', 'TURNO_num', 'MUNICIPIO_num', 'ESCOLARIDAD_num', 'GENERO_num']
La variable SALARIO_MENSUAL ha sido tratada para asimetría positiva con coeficiente =  2.9469329728756533
No se encuentran valores negativos en el DataSet
Features escalados a un rango de: [0.000000,1.000000]
Listo


Unnamed: 0,label,features
0,0.0,"(0.20310421602746923, 0.5777777777777778, 1.0,..."
1,0.0,"(0.20310421602746923, 0.26666666666666666, 0.0..."
2,0.0,"(0.0, 0.26666666666666666, 0.0, 0.0, 0.0, 0.0,..."
3,0.0,"(0.0, 0.13333333333333333, 0.0, 0.0, 0.0, 0.0,..."
4,0.0,"(0.20310421602746923, 0.06666666666666667, 0.0..."
...,...,...
490,1.0,"(0.0, 0.22222222222222224, 0.0, 0.0, 0.0, 0.0,..."
491,1.0,"[0.796878086847233, 0.22222222222222224, 1.0, ..."
492,1.0,"(0.22773347223802257, 0.33333333333333337, 0.0..."
493,1.0,"[0.22773347223802257, 0.33333333333333337, 1.0..."


In [None]:
# Función de modelos de clasificación 

def ClassTrainEval(classifier,features,classes,folds,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__

        return Mtype

    Mtype = FindMtype(classifier)


    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):

        if Mtype == "OneVsRest":
            # Inicializar modelo base para clasificación
            lr = LogisticRegression()
            # Inicializar one vs rest classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Parametros de validacion cruzada:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            # Inicializar validación cruzada
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds)
            # Se realiza la validación cruzada y se regresa el mejor set de parámetros
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # se especifican las capas de la red neuronal
            # Capa de entrada de características de tamaño, dos intermedias de características +1 y del mismo tamaño que las características y salida de tamaño número de clases
            # Nota: validacion cruzada no puede utilizarse aquí
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," Los algoritmos utilizados no pueden utilizarse pues son para clasificación binaria")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):

            # Cambiar los parametros de cada modelo:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())

            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())

            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())

            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())

            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())

            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100, 120, 140, 160, 180, 200, 220, 240]) \
                             .build())

            # Se inicializa validación cruzada
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            fitModel = crossval.fit(train)
            return fitModel

    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)

    # se imprime la selección de features
    if fitModel is not None:

        if Mtype in("OneVsRest"):
            # Tomar el mejor modelo
            BestModel = fitModel.bestModel
            global OVR_BestModel
            OVR_BestModel = BestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Modelos binarios
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept)
                print('\033[1m' + 'Top 20 Coefficients:'+ '\033[0m')
                coeff_array = model.coefficients.toArray()
                coeff_scores = []
                for x in coeff_array:
                    coeff_scores.append(float(x))
                # Se crea un nuevo df
                result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
                print(result.orderBy(result["coeff"].desc()).show(truncate=False))


        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype + '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")
            global MLPC_Model
            MLPC_BestModel = fitModel

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # Importancia de las variables de entrada
            # Se realiza como la media sobre todos los árboles en el ensamble, normalizado a 1.
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Top 20 Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            featureImportances = BestModel.featureImportances.toArray()
            # numpy array to list
            imp_scores = []
            for x in featureImportances:
                imp_scores.append(float(x))
            # Zip
            result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
            print(result.orderBy(result["score"].desc()).show(truncate=False))

            # Se guardan las importancias de los modelos
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        # Se imprimen los coeficientes
        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            print("Intercept: " + str(BestModel.interceptVector))
            print('\033[1m' + " Top 20 Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            # numpy array to list
            coeff_array = BestModel.coefficientMatrix.toArray()
            coeff_scores = []
            for x in coeff_array[0]:
                coeff_scores.append(float(x))
            # zip
            result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
            print(result.orderBy(result["coeff"].desc()).show(truncate=False))
            # Se guardan los coeficientes y valores de los modelos
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        # Se imprimen los coeficientes
        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            print("Intercept: " + str(BestModel.intercept))
            print('\033[1m' + "Top 20 Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
#             print("Coefficients: \n" + str(BestModel.coefficients))
            coeff_array = BestModel.coefficients.toArray()
            coeff_scores = []
            for x in coeff_array:
                coeff_scores.append(float(x))
            # zip
            result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
            print(result.orderBy(result["coeff"].desc()).show(truncate=False))
            # Se guardan los coeficientes y valores de los modelos
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel


    # Match entre resultados y nombres de columnas, que se juntaran mas adelante:
    columns = ['Classifier', 'Result']

    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # lista
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # string
        score = [str(accuracy)] #string to list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))

    return result
    #También regresa el mejor modelo y pvalores

In [None]:
input_columns = input_columns[1:-1] # la última entrada es la variable objetivo
target = 'ESTATUS' # variable objetivo
estatus_count = df.select(countDistinct("ESTATUS")).collect() #Cuenta valors únicos en la variable objetivo
estatuses = estatus_count[0][0] # Buscamos el valor total del conteo


# Llamamos a la funcion para tratamiento de datos
test1_data = df_treat(df,input_columns,target,treat_outliers=False,treat_neg=False)
test1_data.limit(5).toPandas()

# Modelos a utilizar
classifiers = [
               LogisticRegression()
              ,OneVsRest()
              ,LinearSVC()
              ,NaiveBayes()
              ,RandomForestClassifier()
              ,GBTClassifier()
              ,DecisionTreeClassifier()
              ,MultilayerPerceptronClassifier()
              ]

train,test = test1_data.randomSplit([0.7,0.3])
features = test1_data.select(['features']).collect()
folds = 2

#Tabla de resultados
columns = ['Classifier', 'Resultado']
vals = [("Place Holder","N/A")]
results = spark.createDataFrame(vals, columns)

# Llamamos la función de clasificación
for classifier in classifiers:
    new_result = ClassTrainEval(classifier,features,estatuses,folds,train,test)
    results = results.union(new_result)
results = results.where("Classifier!='Place Holder'")
print("!!!!!Resultados finales!!!!!!!!")
results.show(100,False)

+-------+-----+-----+
|ESTATUS|label|count|
+-------+-----+-----+
|   baja|  0.0|  425|
| activo|  1.0|   70|
+-------+-----+-----+

None
numeric_input: ['SALARIO_MENSUAL']
string_input: ['PUESTO_num', 'AREA_num', 'TURNO_num', 'MUNICIPIO_num', 'ESCOLARIDAD_num', 'GENERO_num']
No se encuentran valores negativos en el DataSet
 
[1mLogisticRegression[0m
Intercept: [-3.11285748834446]
[1m Top 20 Coefficients[0m
You should compares these relative to eachother
+---------------+---------------------+
|feature        |coeff                |
+---------------+---------------------+
|AREA           |0.8779519420801623   |
|MUNICIPIO      |0.14399844787456878  |
|SALARIO_MENSUAL|0.05786109980013657  |
|PUESTO         |2.0744941922979553E-4|
|ESCOLARIDAD    |-0.05590397994692011 |
|GENERO         |-0.6433385030510809  |
|TURNO          |-1.1160111064659852  |
+---------------+---------------------+

None
 
[1mOneVsRest[0m
[1mIntercept: [0m 2.3896296563749555
[1mTop 20 Coefficients:[0m
+--