In [3]:
import pyspark

In [4]:
sc = pyspark.SparkContext('local[*]')

In [5]:
sc

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession(sc)
spark

In [8]:
prestamos = spark.read.csv("Default_Fin.csv", header=True)

In [9]:
prest2= prestamos.withColumnRenamed("Bank Balance", "Balance_Cuenta").withColumnRenamed("Annual Salary","Salario_Anual").withColumnRenamed("Defaulted?","Mora")

In [10]:
prestmora = prest2.drop("Index")

In [11]:
from pyspark.sql.functions import col, explode, array, lit

In [12]:
#Para lidiar con el Dataset desbalanceado hacemos un OVERSAMPLING. Smote para pyspark no existe, si existe en Scala.
major_df = prestmora.filter(prestmora.Mora == 0)
minor_df = prestmora.filter(prestmora.Mora == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 29


In [13]:
a = range(ratio)
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

# combinar la minoria oversampleada con la mayoria  
df_combinado = major_df.unionAll(oversampled_df)
df_combinado.show()

+--------+--------------+-------------+----+
|Employed|Balance_Cuenta|Salario_Anual|Mora|
+--------+--------------+-------------+----+
|       1|       8754.36|    532339.56|   0|
|       0|       9806.16|    145273.56|   0|
|       1|       12882.6|    381205.68|   0|
|       1|          6351|    428453.88|   0|
|       1|       9427.92|       461562|   0|
|       0|      11035.08|     89898.72|   0|
|       1|       9906.12|    298862.76|   0|
|       0|       9704.04|     211205.4|   0|
|       1|      13932.72|    449622.36|   0|
|       1|             0|    351303.24|   0|
|       0|             0|    262452.84|   0|
|       0|      14646.96|    159222.72|   0|
|       1|        2844.6|     339020.4|   0|
|       1|       7280.88|    539934.72|   0|
|       1|      13355.64|    285722.04|   0|
|       1|       3434.76|    540508.92|   0|
|       1|             0|    603183.72|   0|
|       0|       6330.48|    211638.48|   0|
|       1|       5831.28|    738793.32|   0|
|       1|

In [14]:
from pyspark.ml.feature import RFormula

In [15]:
supervised = RFormula(formula = 'Mora ~. + Employed:Balance_Cuenta + Employed:Salario_Anual')

In [16]:
fittedRF = supervised.fit(df_combinado)

In [17]:
preparedDF = fittedRF.transform(df_combinado)

In [18]:
preparedDF.show(truncate=False)

+--------+--------------+-------------+----+--------------------------------------------------------+-----+
|Employed|Balance_Cuenta|Salario_Anual|Mora|features                                                |label|
+--------+--------------+-------------+----+--------------------------------------------------------+-----+
|1       |8754.36       |532339.56    |0   |(57647,[0,8429,16902,27643,45344],[1.0,1.0,1.0,1.0,1.0])|0.0  |
|0       |9806.16       |145273.56    |0   |(57647,[9102,9818,37543,48249],[1.0,1.0,1.0,1.0])       |0.0  |
|1       |12882.6       |381205.68    |0   |(57647,[0,2367,13738,21581,42180],[1.0,1.0,1.0,1.0,1.0])|0.0  |
|1       |6351          |428453.88    |0   |(57647,[0,7045,14612,26259,43054],[1.0,1.0,1.0,1.0,1.0])|0.0  |
|1       |9427.92       |461562       |0   |(57647,[0,8855,15374,28069,43816],[1.0,1.0,1.0,1.0,1.0])|0.0  |
|0       |11035.08      |89898.72     |0   |(57647,[1243,19200,29684,57631],[1.0,1.0,1.0,1.0])      |0.0  |
|1       |9906.12       |298

In [19]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [20]:
lr = LogisticRegression().setLabelCol('label').setFeaturesCol('features')
dt = DecisionTreeClassifier().setLabelCol('label').setFeaturesCol('features')

In [21]:
stages = [lr]

pipeline= Pipeline().setStages(stages)

In [22]:
params = ParamGridBuilder()\
    .addGrid(lr.elasticNetParam,[0.0,0.5])\
    .addGrid(lr.regParam,[0.1,2.0])\
    .build()

In [23]:
evaluator = BinaryClassificationEvaluator()\
    .setMetricName('areaUnderROC')\
    .setRawPredictionCol('prediction')\
    .setLabelCol('label')

In [24]:
#Ensamblamos los componentes
tvs = TrainValidationSplit()\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(params)\
    .setEstimator(pipeline)\
    .setEvaluator(evaluator)

In [25]:
dataTrain, dataTest = preparedDF.randomSplit([0.7,0.3])

In [26]:
tvsFitted = tvs.fit(dataTrain)

In [27]:
dataTestPrediction = tvsFitted.transform(dataTest)
dataTestPrediction.show()

+--------+--------------+-------------+----+--------------------+-----+--------------------+--------------------+----------+
|Employed|Balance_Cuenta|Salario_Anual|Mora|            features|label|       rawPrediction|         probability|prediction|
+--------+--------------+-------------+----+--------------------+-----+--------------------+--------------------+----------+
|       0|             0|    111857.52|   0|(57647,[1,9603,28...|  0.0|[1.28549303730463...|[0.78338336208691...|       0.0|
|       0|             0|    123315.12|   0|(57647,[1,9644,28...|  0.0|[1.28549303730463...|[0.78338336208691...|       0.0|
|       0|             0|    160010.88|   0|(57647,[1,9983,28...|  0.0|[1.28549303730463...|[0.78338336208691...|       0.0|
|       0|             0|    160893.12|   0|(57647,[1,9997,28...|  0.0|[1.28549303730463...|[0.78338336208691...|       0.0|
|       0|             0|    163461.72|   0|(57647,[1,10031,2...|  0.0|[1.28549303730463...|[0.78338336208691...|       0.0|


In [28]:
#Hacemos una evaluacion de tipo area under ROc sobre las columnas prediction y label
evaluator.evaluate(dataTestPrediction)

0.8640776699029127

In [29]:
tvsFitted.params

[Param(parent='TrainValidationSplitModel_90d4214e1ce1', name='estimator', doc='estimator to be cross-validated'),
 Param(parent='TrainValidationSplitModel_90d4214e1ce1', name='estimatorParamMaps', doc='estimator param maps'),
 Param(parent='TrainValidationSplitModel_90d4214e1ce1', name='evaluator', doc='evaluator used to select hyper-parameters that maximize the validator metric'),
 Param(parent='TrainValidationSplitModel_90d4214e1ce1', name='seed', doc='random seed.'),
 Param(parent='TrainValidationSplitModel_90d4214e1ce1', name='trainRatio', doc='Param for ratio between train and     validation data. Must be between 0 and 1.')]

In [30]:
print(dt.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: label)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for discreti

In [31]:
stages2 = [dt]
pipeline2= Pipeline().setStages(stages2)

In [32]:
params2 = ParamGridBuilder()\
    .addGrid(dt.maxDepth,[2, 3])\
    .build()

In [33]:
evaluator2 = BinaryClassificationEvaluator()\
    .setMetricName('areaUnderROC')\
    .setRawPredictionCol('prediction')\
    .setLabelCol('label')

In [34]:
tvs2 = TrainValidationSplit()\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(params2)\
    .setEstimator(pipeline2)\
    .setEvaluator(evaluator2)

In [35]:
tvsFitted2 = tvs2.fit(dataTrain)

In [36]:
dataTestPrediction2 = tvsFitted2.transform(dataTest)
dataTestPrediction2.show()

+--------+--------------+-------------+----+--------------------+-----+-------------+-----------+----------+
|Employed|Balance_Cuenta|Salario_Anual|Mora|            features|label|rawPrediction|probability|prediction|
+--------+--------------+-------------+----+--------------------+-----+-------------+-----------+----------+
|       0|             0|    111857.52|   0|(57647,[1,9603,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    123315.12|   0|(57647,[1,9644,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    160010.88|   0|(57647,[1,9983,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    160893.12|   0|(57647,[1,9997,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    163461.72|   0|(57647,[1,10031,2...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    188387.76|   0|(57647,[1,10452,2...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|         

In [37]:
evaluator2.evaluate(dataTestPrediction2)

0.544982037006299

In [38]:
print("Test Area Under ROC: " + str(evaluator2.evaluate(dataTestPrediction, {evaluator2.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8640776699029127


In [39]:
from pyspark.ml.classification import RandomForestClassifier

In [40]:
rfc = RandomForestClassifier()

In [41]:
rfModel = rfc.fit(dataTrain)

In [42]:
DataTestPrediction3 = rfModel.transform(dataTest)
DataTestPrediction3.show()

+--------+--------------+-------------+----+--------------------+-----+--------------------+--------------------+----------+
|Employed|Balance_Cuenta|Salario_Anual|Mora|            features|label|       rawPrediction|         probability|prediction|
+--------+--------------+-------------+----+--------------------+-----+--------------------+--------------------+----------+
|       0|             0|    111857.52|   0|(57647,[1,9603,28...|  0.0|[11.1099342781369...|[0.55549671390684...|       0.0|
|       0|             0|    123315.12|   0|(57647,[1,9644,28...|  0.0|[11.1099342781369...|[0.55549671390684...|       0.0|
|       0|             0|    160010.88|   0|(57647,[1,9983,28...|  0.0|[11.1099342781369...|[0.55549671390684...|       0.0|
|       0|             0|    160893.12|   0|(57647,[1,9997,28...|  0.0|[11.1099342781369...|[0.55549671390684...|       0.0|
|       0|             0|    163461.72|   0|(57647,[1,10031,2...|  0.0|[11.1099342781369...|[0.55549671390684...|       0.0|


In [43]:
evaluator2.evaluate(DataTestPrediction3)

0.5921230561189993

In [44]:
from pyspark.ml.classification import NaiveBayes

In [48]:
nb = NaiveBayes()
stages4 = [nb]
pipeline4= Pipeline().setStages(stages2)

In [49]:
params4 = ParamGridBuilder()\
    .addGrid(nb.modelType,['multinomial','gaussian','bernoulli'])\
    .build()

In [50]:
tvs4 = TrainValidationSplit()\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(params4)\
    .setEstimator(pipeline4)\
    .setEvaluator(evaluator2)

In [51]:
tvsFitted4 = tvs4.fit(dataTrain)

In [55]:
dataTestPrediction4 = tvsFitted4.transform(dataTest)
dataTestPrediction4.show()

+--------+--------------+-------------+----+--------------------+-----+-------------+-----------+----------+
|Employed|Balance_Cuenta|Salario_Anual|Mora|            features|label|rawPrediction|probability|prediction|
+--------+--------------+-------------+----+--------------------+-----+-------------+-----------+----------+
|       0|             0|    111857.52|   0|(57647,[1,9603,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    123315.12|   0|(57647,[1,9644,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    160010.88|   0|(57647,[1,9983,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    160893.12|   0|(57647,[1,9997,28...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    163461.72|   0|(57647,[1,10031,2...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|             0|    188387.76|   0|(57647,[1,10452,2...|  0.0|  [345.0,0.0]|  [1.0,0.0]|       0.0|
|       0|         

In [54]:
evaluator2.evaluate(dataTestPrediction4)

0.546165268919754

In [62]:
tvsFitted.save('demo_data/best-model')

In [64]:
from pyspark.ml.tuning import TrainValidationSplitModel

model = TrainValidationSplitModel.load('demo_data/best-model')

In [68]:
model.transform(preparedDF).show()

+--------+--------------+-------------+----+--------------------+-----+--------------------+--------------------+----------+
|Employed|Balance_Cuenta|Salario_Anual|Mora|            features|label|       rawPrediction|         probability|prediction|
+--------+--------------+-------------+----+--------------------+-----+--------------------+--------------------+----------+
|       1|       8754.36|    532339.56|   0|(57647,[0,8429,16...|  0.0|[2.66561947473234...|[0.93496718688331...|       0.0|
|       0|       9806.16|    145273.56|   0|(57647,[9102,9818...|  0.0|[-0.0533260676229...|[0.48667164139847...|       1.0|
|       1|       12882.6|    381205.68|   0|(57647,[0,2367,13...|  0.0|[2.66561947473234...|[0.93496718688331...|       0.0|
|       1|          6351|    428453.88|   0|(57647,[0,7045,14...|  0.0|[2.66561947473234...|[0.93496718688331...|       0.0|
|       1|       9427.92|       461562|   0|(57647,[0,8855,15...|  0.0|[2.66561947473234...|[0.93496718688331...|       0.0|
