In [129]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
import pandas as pd

In [130]:
spark = SparkSession.builder.appName('Decision Tree Regression').getOrCreate()

In [131]:
data = spark.read.csv('bank_data.csv', header = True, inferSchema = True)
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [132]:
pd.DataFrame(data.take(12), columns=data.columns).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11
age,59,56,41,55,54,42,56,60,37,28,38,30
job,admin.,admin.,technician,services,admin.,management,management,retired,technician,services,admin.,blue-collar
marital,married,married,married,married,married,single,married,divorced,married,single,single,married
education,secondary,secondary,secondary,secondary,tertiary,tertiary,tertiary,secondary,secondary,secondary,secondary,secondary
default,no,no,no,no,no,no,no,no,no,no,no,no
balance,2343,45,1270,2476,184,0,830,545,1,5090,100,309
housing,yes,no,yes,yes,no,yes,yes,yes,yes,yes,yes,yes
loan,no,no,no,no,no,yes,yes,no,no,no,no,no
contact,unknown,unknown,unknown,unknown,unknown,unknown,unknown,unknown,unknown,unknown,unknown,unknown
day,5,5,5,5,5,5,6,6,6,6,7,7


Filtramos los datos que son de tipo numérico, ya sea enteros o dobles para poder utilizar Panda para describirlo y sacar los promedios, el mínimo y máximo de los datos, entre otros.

In [133]:
numeric_features = [i[0] for i in data.dtypes if i[1] == 'int' or i[1] == 'double']
data.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,11162,41.231947679627304,11.913369192215518,18,95
balance,11162,1528.5385235620856,3225.413325946149,-6847,81204
day,11162,15.658036194230425,8.420739541006462,1,31
duration,11162,371.99381831213043,347.12838571630687,2,3881
campaign,11162,2.508421429851281,2.7220771816614824,1,63
pdays,11162,51.33040673714388,108.75828197197717,-1,854
previous,11162,0.8325568894463358,2.292007218670508,0,58


## Procesamiento de los datos para crear el modelo  

Creación de los pipelines que nos ayudará realizar multiples pasos, la salida que tenemos de cada uno de los pasos es la entrada del siguiente

In [134]:
pipeline_stages=[] #arreglo donde se almacenarán los pipelines stages posteriormente

In [135]:
# Separar las columnas de tipo categóricas
categorical_columns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome','deposit']

In [136]:
# Ciclo for para ir usando StringIndexer y especificar los nombres de las columnas categóricas que convertiremos
# Estas se van añadiendo una vez convertidas a los pipelines stages que utilizaremos luego
for value in categorical_columns:
    string_indexer = StringIndexer(inputCol=value, outputCol=value+"Indexer")
    pipeline_stages += [string_indexer]

In [137]:
numerical_columns = ['age', 'duration', 'campaign', 'pdays', 'previous'] # Agrupar nombres de las columnas de tipo numérico

In [138]:
# Lista para combinar los nombres de las columnas categóricas (que al transformarse les agregamos el sufijo Indexer)
# con los de las numéricas
combined_columns = [columns + "Indexer" for columns in categorical_columns] + numerical_columns
combined_columns

['jobIndexer',
 'maritalIndexer',
 'educationIndexer',
 'defaultIndexer',
 'housingIndexer',
 'loanIndexer',
 'contactIndexer',
 'poutcomeIndexer',
 'depositIndexer',
 'age',
 'duration',
 'campaign',
 'pdays',
 'previous']

In [139]:
# Especificar variables de entrada para el vector assembler
vector_assembler = VectorAssembler(inputCols=combined_columns, outputCol="features")

# Añadir el vector assembler a los pipelines stages
pipeline_stages += [vector_assembler]

In [140]:
pipeline = Pipeline(stages = pipeline_stages) # Pasos que quiero ejecutar para obtener la información, stages

In [141]:
pipeline_fit = pipeline.fit(data).transform(data)

In [152]:
pipeline_fit

DataFrame[age: int, job: string, marital: string, education: string, default: string, balance: int, housing: string, loan: string, contact: string, day: int, month: string, duration: int, campaign: int, pdays: int, previous: int, poutcome: string, deposit: string, jobIndexer: double, maritalIndexer: double, educationIndexer: double, defaultIndexer: double, housingIndexer: double, loanIndexer: double, contactIndexer: double, poutcomeIndexer: double, depositIndexer: double, features: vector]

In [142]:
data = pipeline_fit.select(["features","balance"])
data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- balance: integer (nullable = true)



In [174]:
# Separar los datos
training_data, testing_data = data.randomSplit([0.7, 0.3])
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(testing_data.count()))

Training Dataset Count: 7836
Test Dataset Count: 3326


## Ajuste e implementación del modelo

In [178]:
decisiontreeregressor = DecisionTreeRegressor(featuresCol = 'features', labelCol = 'balance', maxDepth = 10)
model = decisiontreeregressor.fit(training_data)

## Predicción del modelo

In [176]:
predictions = model.transform(testing_data)
predictions.select('balance','prediction').show(10)

+-------+------------------+
|balance|        prediction|
+-------+------------------+
|   3014|             715.5|
|   -295| 635.7724137931034|
|    426| 635.7724137931034|
|      1|             715.5|
|      0|1695.8507462686566|
|    258|             715.5|
|    910|1125.6614583333333|
|   2744|1125.6614583333333|
|    195|1125.6614583333333|
|   9851|1125.6614583333333|
+-------+------------------+
only showing top 10 rows



In [177]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='balance', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("Margen de error = %g" % rmse)

Margen de error = 4421.27


## NEW BANK DATA
Aplicar el modelo creado para hacer predicciones de nuevos datos

In [147]:
new_clients = spark.read.csv('new_bank_data.csv',inferSchema=True, header=True)

In [148]:
new_clients.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [149]:
test_new_clients =  pipeline.fit(new_clients).transform(new_clients)

In [150]:
# Utilizamos el modelo de DecisionTreeRegressor creado anteriormente
final_predictions = model.transform(test_new_clients)

In [151]:
final_predictions.select('features','prediction').show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.0,0.0,1.0,0.0,...|1923.1463963963963|
|[1.0,1.0,0.0,0.0,...|             347.2|
|(14,[0,1,3,7,9,10...| 968.3629242819843|
|(14,[0,1,7,9,10,1...| 968.3629242819843|
|(14,[2,3,4,9,10,1...|  594.303448275862|
|(14,[2,5,8,9,10,1...| 662.8115942028985|
|[1.0,2.0,0.0,0.0,...| 662.8115942028985|
|(14,[0,1,3,9,10,1...|            385.75|
|[3.0,1.0,0.0,1.0,...|1591.7884615384614|
|(14,[0,2,7,8,9,10...| 1411.364406779661|
+--------------------+------------------+



## Conclusión

Al analizar los datos, ciertos factores de una muestra de clientes, ya sea su edad, trabajom estado civil, nivel de educación, impago, prestamos y la fecha del último contacto podemos llegar a la conclusion que el saldo medio anual de una persona tomando en cuanta los factores antes mencionados en promedio es de 1,528 dólares, con un margen de error de 4,421 dólares. 

Si no tuvieramos las anomalias se encuentran en los datos, como outcome o contact, en donde el estado no se conoce al tener contacto con el cliente o no se sabe si este contacto fue exitoso o falló, el margen de error aumenta porque estos datos son de importancia para motivar a los clientes a hacer depósitos y terner un registro de si esto funcionó o no, y así estos tengan un mayor número de saldo medio anual en sus registros. Se recomendaría obtener esta información para mejorar las predicciones del modelo creado.
