<a href="https://colab.research.google.com/github/AlbertP54/Proyecto-Final-1/blob/main/Proyecto_Final_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



**Importando el data set**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('binary_class').getOrCreate()

from google.colab import files
uploaded = files.upload()

df = spark.read.csv('classification_data.csv', inferSchema=True,header=True)

Saving classification_data.csv to classification_data.csv


**Imprimimos el tamaño del data set y el número de columnas**

In [None]:
print((df.count(),len(df.columns)))

(46751, 12)


**Imprimimos el esquema**

In [None]:
df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- is_first_loan: integer (nullable = true)
 |-- total_credit_card_limit: integer (nullable = true)
 |-- avg_percentage_credit_card_limit_used_last_year: double (nullable = true)
 |-- saving_amount: integer (nullable = true)
 |-- checking_amount: integer (nullable = true)
 |-- is_employed: integer (nullable = true)
 |-- yearly_salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- dependent_number: integer (nullable = true)
 |-- label: integer (nullable = true)



**Imprimimos los primeros 5 datos en las columnas del data set**

In [None]:
df.show(5)

+-------+------------+-------------+-----------------------+-----------------------------------------------+-------------+---------------+-----------+-------------+---+----------------+-----+
|loan_id|loan_purpose|is_first_loan|total_credit_card_limit|avg_percentage_credit_card_limit_used_last_year|saving_amount|checking_amount|is_employed|yearly_salary|age|dependent_number|label|
+-------+------------+-------------+-----------------------+-----------------------------------------------+-------------+---------------+-----------+-------------+---+----------------+-----+
|    A_1|    personal|            1|                   7900|                                            0.8|         1103|           6393|          1|        16400| 42|               4|    0|
|    A_2|    personal|            0|                   3300|                                           0.29|         2588|            832|          1|        75500| 56|               1|    0|
|    A_3|    personal|            0|    

**Agrupamos las veces que se conto un 1 y un 0 en la columna label**

In [None]:
df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|16201|
|    0|30550|
+-----+-----+



**Agrupamos los motivos del prestamo en la columna loan_purpose**

In [None]:
df.groupBy('loan_purpose').count().show()

+------------+-----+
|loan_purpose|count|
+------------+-----+
|      others| 6763|
|   emergency| 7562|
|    property|11388|
|  operations|10580|
|    personal|10458|
+------------+-----+



**Codificamos las variables independientes para un mejor manejo del data set a la hora de implementar el modelo**

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder

loan_purpose_indexer = StringIndexer(inputCol="loan_purpose", outputCol="loan_index").fit(df)
df = loan_purpose_indexer.transform(df)
loan_encoder = OneHotEncoder(inputCol="loan_index", outputCol="loan_purpose_vec")
ohe = loan_encoder.fit(df)
df = ohe.transform(df)
df.select(['loan_purpose','loan_index','loan_purpose_vec']).show(3,False)

+------------+----------+----------------+
|loan_purpose|loan_index|loan_purpose_vec|
+------------+----------+----------------+
|personal    |2.0       |(4,[2],[1.0])   |
|personal    |2.0       |(4,[2],[1.0])   |
|personal    |2.0       |(4,[2],[1.0])   |
+------------+----------+----------------+
only showing top 3 rows



**Creamos un vector unico de caracteristicas con las siguientes columnas para la implementacion del modelo**

In [None]:
from pyspark.ml.feature import VectorAssembler

df_assembler = VectorAssembler(inputCols=[
      'is_first_loan', 
      'total_credit_card_limit',
      'avg_percentage_credit_card_limit_used_last_year',
      'saving_amount',
      'checking_amount',
      'is_employed',
      'yearly_salary',
      'age',
      'dependent_number',
      'loan_purpose_vec'], outputCol="features")

df = df_assembler.transform(df)
df.select(['features','label']).show(10,False)

+--------------------------------------------------------------------+-----+
|features                                                            |label|
+--------------------------------------------------------------------+-----+
|[1.0,7900.0,0.8,1103.0,6393.0,1.0,16400.0,42.0,4.0,0.0,0.0,1.0,0.0] |0    |
|[0.0,3300.0,0.29,2588.0,832.0,1.0,75500.0,56.0,1.0,0.0,0.0,1.0,0.0] |0    |
|[0.0,7600.0,0.9,1651.0,8868.0,1.0,59000.0,46.0,1.0,0.0,0.0,1.0,0.0] |0    |
|[1.0,3400.0,0.38,1269.0,6863.0,1.0,26000.0,55.0,8.0,0.0,0.0,1.0,0.0]|0    |
|[0.0,2600.0,0.89,1310.0,3423.0,1.0,9700.0,41.0,4.0,0.0,0.0,0.0,1.0] |1    |
|[0.0,7600.0,0.51,1040.0,2406.0,1.0,22900.0,52.0,0.0,0.0,1.0,0.0,0.0]|0    |
|[1.0,6900.0,0.82,2408.0,5556.0,1.0,34800.0,48.0,4.0,0.0,1.0,0.0,0.0]|0    |
|[0.0,5700.0,0.56,1933.0,4139.0,1.0,32500.0,64.0,2.0,0.0,0.0,1.0,0.0]|0    |
|[1.0,3400.0,0.95,3866.0,4131.0,1.0,13300.0,23.0,3.0,0.0,0.0,1.0,0.0]|0    |
|[0.0,2900.0,0.91,88.0,2725.0,1.0,21100.0,52.0,1.0,0.0,0.0,1.0,0.0]  |1    |

**Creamos un data frame con solo 2 columnas, 'features' y 'label'.
Dividimos el data frame en un set de entrenamiento y otro de pruebas**

In [None]:
model_df=df.select(['features','label'])

training_df,test_df = model_df.randomSplit([0.75,0.25])

**Implementamos el modelo de LogisticRegression**

In [None]:
from pyspark.ml.classification import LogisticRegression

log_reg=LogisticRegression().fit(training_df)
lr_summary=log_reg.summary
lr_summary.accuracy
lr_summary.areaUnderROC

0.9585732539995662

**Mostramos el desempeño del modelo entrenado sin hiperparametros**

In [None]:
print(lr_summary.precisionByLabel)

[0.9235086168802474, 0.8391563867886987]


In [None]:
print(lr_summary.recallByLabel)

[0.9118237347294939, 0.8589816700610998]


In [None]:
predictions = log_reg.transform(test_df)
predictions.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13,[0,1,2,3,4,7]...|    1|[-5.3845241436004...|[0.00456607786481...|       1.0|
|(13,[0,1,2,3,4,7]...|    1|[-1.7081464044825...|[0.15340429021031...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.9030411053460...|[0.00100371684312...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.1450775638483...|[0.00213942264516...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.2136499184714...|[0.00199791761251...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-5.8386667317368...|[0.00290426416148...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-3.4109738539574...|[0.03195425935040...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-2.8561121694912...|[0.05436623001303...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-5.7260530248319...|[0.00324932615354...|       1.0|
|(13,[0,1,2,3,4,

**Imprimimos la precision con la que acerto el modelo en las predicciones**

In [None]:
model_predictions = log_reg.transform(test_df)
model_predictions = log_reg.evaluate(test_df)
model_predictions.accuracy

0.8974558670820353

In [None]:
model_predictions.areaUnderROC

0.959806919491592

In [None]:
print(model_predictions.recallByLabel)

[0.9117955439056357, 0.869587366276108]


In [None]:
print(model_predictions.precisionByLabel)

[0.9314499933056634, 0.8353315390261806]


**Ahora implementamos el modelo de RandomForest que es una mezcla de varios arboles de decision**

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier()
rf_model = rf.fit(training_df)

model_predictions = rf_model.transform(test_df)

**Utilizamos validacion cruzada para obtener los mejores hiperparametros con el  mejor desempeño en el modelo**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
rf = RandomForestClassifier()
paramGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [5,10,20,25,30])
                               .addGrid(rf.maxBins, [20,30,40 ])
                               .addGrid(rf.numTrees, [5, 20,50])
                               .build())

cv = CrossValidator(estimator=rf, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds=5)
cv_model = cv.fit(training_df)

**Probamos el modelo con los mejores hiperparametros en el set de prueba e imprimimos la precision del desempeño del modelo**

In [None]:
best_rf_model = cv_model.bestModel
model_predictions = best_rf_model.transform(test_df)
true_pos = model_predictions.filter(model_predictions['label']==1).filter(model_predictions['prediction']==1).count()
actual_pos = model_predictions.filter(model_predictions['label']==1).count()
pred_pos = model_predictions.filter(model_predictions['prediction']==1).count()
recall_rate=float(true_pos)/(actual_pos)
print(recall_rate)

0.9184921039225675
