# Modelado

In [None]:
# import libraries
import pandas as pd
from optimus import Optimus
op = Optimus()

In [None]:
# Read the data
# Data from http://rpubs.com/rhuebner/HRCodebook-13
df = op.read.csv("data/hr-data.csv", header=True)

In [None]:
# See the data
df.table()

In [None]:
# Basic data cleaning
from pyspark.sql.functions import when, count, col, isnull


integer_cols = ["MarriedID","MaritalStatusID", "EmpStatusID", "DeptID", "PerfScoreID", "PositionID", "Termd", "ManagerID", 
                "EmpSatisfaction", "SpecialProjectsCount", "DaysLateLast30", "GenderID"]
float_cols = ["PayRate", "EngagementSurvey"]

for col_name in integer_cols:
    df = df.withColumn(col_name, col(col_name).cast('int'))
    
for col_name in float_cols:
    df = df.withColumn(col_name, col(col_name).cast('float'))
    
df = df.dropna(how="all")
df = df.cols.years_between("DOB", date_format="mm/dd/yy",output_cols="Age")

from pyspark.ml.feature import Bucketizer
splits = [10, 20, 30, 40, 50, 60, float("inf")]

bucketizer = Bucketizer(splits=splits, inputCol="Age", outputCol="Age_bucket")
df = bucketizer.transform(df)

In [None]:
# Número de datos faltantes por columna
from pyspark.sql.functions import when, count, col, isnull

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).table()

In [None]:
df = df.dropna(how="any", subset=["ManagerID"])

In [None]:
# Número de datos faltantes por columna
from pyspark.sql.functions import when, count, col, isnull

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).table()

In [None]:
from optimus.ml import feature as fe
df = fe.vector_assembler(df,input_cols=['MaritalStatusID', 'GenderID', 'DeptID', 'PayRate', "ManagerID", 
                                   'EmpSatisfaction', 'Age_bucket'], output_col="features")

In [None]:
from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(numTopFeatures=5, featuresCol="features",
                         outputCol="selected_features", labelCol="Termd")

result = selector.fit(df).transform(df)

In [None]:
result.table()

In [None]:
# Train model.  This also runs the indexers.
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="Termd", featuresCol="selected_features", maxIter=10)
model = gbt.fit(result)

In [None]:
# Make predictions.
predictions = model.transform(result)

In [None]:
# Select example rows to display.
predictions.select("prediction", "Termd", "selected_features").table()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Termd", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
accuracy

## Mejorando el modelo

In [None]:
# División de datos
(training_data, test_data) = result.randomSplit([0.7, 0.3])

In [None]:
# Train model.  This also runs the indexers.
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="Termd", featuresCol="selected_features", maxIter=10)
model = gbt.fit(training_data)

# Make predictions.
predictions = model.transform(test_data)

In [None]:
# Select example rows to display.
predictions.select("prediction", "Termd", "selected_features").table()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Termd", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
model.featureImportances