## Proyecto

Se desea desarrollar un modelo de machine learning a escala que cuente con la capacidad de evaluar de manera acelerada y precisa el valor de diversos bienes inmuebles para ofrecer recomendaciones de compra-venta con el objetivo de **aplicar estrategias de [House Flipping](https://www.lahaus.mx/blog/tendencias-del-mercado/house-flipping-invertir-mexico)**.

**Conjunto de Datos**

[KC House Dataset](https://www.kaggle.com/datasets/shivachandel/kc-house-data)

El conjunto de datos consiste en los precios de las viviendas del condado de King, un área en el estado de Washington, EE. UU. entre mayo de 2014 y mayo de 2015, estos datos cubren también el área de Seattle. El conjunto de datos consta de 21 variables y 21,613 observaciones.

**Diccionario de Datos**
    
* id - Identificador único para cada casa vendida
* date - Fecha de la venta
* price (target) - Precio de cada vivienda vendida
* bedrooms - Número de dormitorios
* bathrooms - Número de baños, donde 0.5 representa una habitación con inodoro pero sin ducha
* sqft_living - Pies cuadrados del espacio habitable interior de los apartamentos
* sqft_lot - Pies cuadrados del terreno
* floors - Número de pisos
* waterfront - Variable dummy que indica si el apartamento tiene vista al mar o no
* view - Índice de 0 a 4 de qué tan buena es la vista de la propiedad
* condition - Índice de 1 a 5 sobre la condición del apartamento,
* grade - Índice del 1 al 13, en el que de 1 a 3 no llega a la construcción y diseño de edificios, 7 tiene un nivel promedio de construcción y diseño y 11 a 13 tiene un alto nivel de calidad de construcción y diseño.
* sqft_above - Pies cuadrados del espacio interior de la vivienda que está por encima del nivel del suelo
* sqft_basement - Pies cuadrados del espacio interior de la vivienda que está por debajo del nivel del suelo
* yr_built - Año en que se construyó la casa inicialmente
* yr_renovated - Año de la última renovación de la casa
* zipcode - Área de código postal donde se encuentra la casa
* lat - Latitud
* long - Longitud
* sqft_living15 - Pies cuadrados del espacio habitable interior de la vivienda para los 15 vecinos más cercanos
* sqft_lot15 - Pies cuadrados de los lotes de terreno de los 15 vecinos más cercanos.


## Librerias

In [None]:
%pip install -q keplergl
%pip install -q pyod
%pip install -q hyperopt
%pip install -q mlflow

In [None]:
#sql.functions
from pyspark.sql.functions import col, substring, to_date, year, lit

#manipulación de datos
import numpy as np
import json

#visualización
from keplergl import KeplerGl

#
from pyod.models.lof import LOF

from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.clustering import KMeans

## Lectura de datos

In [None]:
"""
generation = "mod4gen17"
input_cont = f"/mnt/{generation}/input/kc_house_data.csv"
"""
input_cont = f"/mnt/input/kc_house_data.csv"
output_cont = '/dbfs/mnt/output/'

In [None]:
data = spark.read.csv(input_cont, inferSchema=True, header=True)
type(data)

## EDA

In [None]:
# Tamaño del DataFrame
print((data.count(), len(data.columns)))

In [None]:
# Visualización general
data.limit(20).display()

Tipos de variables

In [None]:
data.printSchema()

Años registrados

In [None]:
data.withColumn("Años", substring(col("date"), 1, 4)).groupBy("Años").count().display()

![Años](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/Imagen_1.png)

Cantidad de habitaciones

In [None]:
data.groupBy("bedrooms").count().display()

![Habitaciones](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_2.1.png)

Precio vs Cantidad de habitaciones

In [None]:
data.groupBy('bedrooms').mean('price').display()

![Price v Bedrooms](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_2.png)

Cantidad de baños

In [None]:
data.groupBy("bathrooms").count().display()

![Baños](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_3.png)

Precio vs Cantidad de Baños

In [None]:
data.groupBy('bathrooms').mean('price').display()

![Price v Bathrooms](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_4.png)

Número de pisos

In [None]:
data.groupBy("floors").count().display()

![Floors](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_5.png)

Precio vs Número de pisos

In [None]:
data.groupBy('floors').mean('price').display()

![Price v Floors](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_6.png)

Precio vs Pies cuadrados del espacio habitable

In [None]:
data.groupBy('sqft_living').mean('price').display()

![Sqft](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_7.png)

Precio vs Año de construcción

In [None]:
data.groupBy('yr_built').mean('price').display()

![Price v Sqft](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen_8.png)

> Observamos la relación entre el precio de la vivienda y algunas de las características más importantes de un inmueble, existe una relación de incremento en el precio mientras mejor sean las características relacionadas con el tamaño de la vivienda.

Precio de cada vivienda

In [None]:
data.select('price').summary("count", "mean", "min", "10%", "20%", "30%", "40%", "50%", "60%", "70%", "80%", "90%", "max").display()

In [None]:
display(data.select('price'))

![Prices](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/imagen9.png)

### Kepler

In [None]:
ubs = data.select('lat', 'long', 'price')

In [None]:
### Name of the Display
name = "House_Prices_HeatPoints"
cve = "heatpoints_map"

### Load a pre-saved Config of the Display
json_file_path = output_cont+cve+'.json'
# Load the JSON file
with open(json_file_path, "r") as json_file:
    json_data = json.load(json_file)
# Convert the JSON data to a dictionary
config = dict(json_data)

In [None]:
kpl_heatpoints = KeplerGl(height=500)
kpl_heatpoints.add_data(data=ubs.toPandas(), name=name)
kpl_heatpoints.config = config
kpl_heatpoints

![Mapa](heatpntsclusters_map_static.png)

> Podemos observar claramente la distribución del precio de viviendas por zonas, entonces el zipcode podria darnos información util sobre el precio. Identificamos zonas que se rodean de viviendas con costos elevados, por ejemplo, las viviendas cercanas al mar.

Podríamos identificar una mejor delimitación de zonas acorde al precio si creamos clústeres respecto al precio de las viviendas. Aplicando un algoritmo simple de K-means podemos lograr este objetivo.

In [None]:

ubs_assembler = VectorAssembler(inputCols=["price"], outputCol="features")
ubs_kmeans = KMeans(k=3, seed=123)

ubs_pipeline = Pipeline(stages=[ubs_assembler,ubs_kmeans])

aux = data.select('lat','long', 'price')

ubs_model = ubs_pipeline.fit(aux)

aux = ubs_model.transform(aux)
aux = aux.withColumnRenamed('prediction','Ubs_cl')

In [None]:
aux.groupBy('Ubs_cl').mean('price').orderBy('avg(price)').display()

In [None]:
aux = aux.select('lat','long', 'price', col('Ubs_cl').cast('string'))
display(aux)

In [None]:
### Name of the Display
name = "House_PricesCls_HeatPoints"
cve = "heatpntsclusters_map"

### Load a pre-saved Config of the Display
json_file_path = output_cont+cve+'.json'
# Load the JSON file
with open(json_file_path, "r") as json_file:
    json_data = json.load(json_file)
# Convert the JSON data to a dictionary
config = dict(json_data)

In [None]:
kpl_heatpntsclusters_map = KeplerGl(height=500)
kpl_heatpntsclusters_map.add_data(data=aux.toPandas(), name=name)
kpl_heatpntsclusters_map.config = config
kpl_heatpntsclusters_map

![Mapa](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/heatpntsclusters_map_static.png)

> Vemos en el mapa que los clústeres separaron de manera similar al gráfico anterior pero con una clara identificación de tres grupos de acuerdo al rango de precio del inmueble.

## Limpieza

#### Cast de variables

In [None]:
data.dtypes

In [None]:
new_data = (data.withColumn('id', col('id').cast('string'))
                .withColumn('zipcode', col('zipcode').cast('string'))
                .withColumn('lat', col('lat').cast('string'))
                .withColumn('long', col('long').cast('string')))

#### Fechas

In [None]:
new_data = new_data.withColumn( "date", to_date( substring(col("date"), 1, 8), "yyyyMMdd" ) )

In [None]:
new_data.dtypes

In [None]:
new_data.limit(10).display()

## Ingeniería de variables

Antigüedad del inmueble

In [None]:
new_data = new_data.withColumn('age_of_property', year(new_data["date"]) - new_data['yr_built'])

Años desde la última renovación

In [None]:
new_data.createOrReplaceTempView("data")
new_data = spark.sql("SELECT *, \
                        YEAR(date) - GREATEST(yr_built, yr_renovated) as yrs_last_renovated  \
                        FROM data")

In [None]:
new_data.limit(10).display()

### Selección de variables

In [None]:
varc = ['bedrooms','bathrooms','sqft_living','sqft_lot','floors',
        'view','condition','grade','sqft_above','sqft_basement',
        'sqft_living15','sqft_lot15', 'age_of_property', 'yrs_last_renovated']

vard = ['zipcode','waterfront']

um = ['id']

tgt = ['price']

### Ausentes

In [None]:
new_data.count()

In [None]:
for c in um+varc+vard+tgt:
    print(f'Nulos en {c}',1-new_data.na.drop(subset=[c]).count() / new_data.count())

In [None]:
new_data.filter("sqft_above is null").display()

In [None]:
new_data = new_data.na.drop()

### Zero's

In [None]:
display(new_data)

Analizando el Data Profile, notemos los siguientes datos para las variables que tienen al menos un zero, o algún dato missing:
* [waterfront, view, yr_renovated] tienen más del 90% de valores ausentes -> **Más del 90% de las casas listadas no tienen vista al mar, una pobre vista, o no han tenido renovaciones**
* [sqft_basement] tiene el 60% de zeros -> **El 60% de las casas listadas no tienen sótanos**
* [bedrooms] tiene el 0.05% de zeros y máx de 33 -> **Hay alguna casa que tiene hasta 33 habitaciones y hay un 0.05% de casas que se listaron como habitación/loft.**
* [bathrooms] tiene 0.03% de zeros -> **Hay algunas casas que no tienen baños** Como queremos ganar con nuestra inversión, instalar nuevos servicios puede resultar costoso, por lo que decidiremos eliminar los inmuebles que no cuenten con al menos, medio baño.
* [sqft_above] tiene el 0.01% de missings -> **Buscamos ofrecer beneficios al vender el inmueble, por lo que decidimos no ofrecer sótanos para habitar**

## Extremos

In [None]:
#Conjunto con variables predictoras
X = new_data.select(varc)
X.summary('0.1%','1%',"10%", "20%", "30%", "40%", "50%", "60%", "70%", "80%", "90%",'99%','99.9%').display()

In [None]:
X.toPandas().hist(figsize=(10,10));

### Local Outlier Factor

In [None]:
#Convertimos el DataFrame a un arreglo de numpy
data_np = np.array(X.select('*').collect())

#Instanciamos Y ajustamos el modelo Local Outlier Factor
model = LOF()
model.fit(data_np)

In [None]:
#Puntajes de anomalía para los puntos de datos
scores = model.decision_scores_

#Umbral para determinar los valores atípicos
threshold = model.threshold_

In [None]:
#Agregamos la columna de sore para cada registro
new_data = new_data.toPandas()
new_data['outlier_score'] = scores

new_data = spark.createDataFrame(new_data)

In [None]:
new_data.limit(5).display()

In [None]:
new_data = new_data.filter(new_data.outlier_score <= threshold)

In [None]:
print((new_data.count(), len(new_data.columns)))

In [None]:
my_final_data = new_data.select('*').drop('outlier_score')

In [None]:
my_final_data.toPandas().hist(figsize=(10,10));

## Modelo

In [None]:
from pyspark.ml.feature import  VectorIndexer, OneHotEncoder, StringIndexer

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import DecisionTreeRegressor

In [None]:
zip_indexer = StringIndexer(inputCol="zipcode", outputCol="zipcodeIndex")
zip_encoder = OneHotEncoder(inputCol="zipcodeIndex", outputCol="zipcodeVec")

water_indexer = StringIndexer(inputCol="waterfront", outputCol="waterfrontIndex")
water_encoder = OneHotEncoder(inputCol="waterfrontIndex", outputCol="waterfrontVec")

In [None]:
assembler = VectorAssembler(inputCols=varc+['waterfrontVec','zipcodeVec'], 
                            outputCol="features")

In [None]:
train_data, val_data = my_final_data.randomSplit([0.8, 0.2])

In [None]:
train_data.display()

In [None]:
train_data.columns

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="price")
rf = RandomForestRegressor(featuresCol="features", labelCol="price")
tree = DecisionTreeRegressor(featuresCol="features", labelCol="price")

In [None]:
lr_pipeline = Pipeline(stages=[zip_indexer,
                            water_indexer,
                            zip_encoder,
                            water_encoder,
                            assembler,
                            lr])
rf_pipeline = Pipeline(stages=[zip_indexer,
                            water_indexer,
                            zip_encoder,
                            water_encoder,
                            assembler,
                            rf])
tree_pipeline = Pipeline(stages=[zip_indexer,
                            water_indexer,
                            zip_encoder,
                            water_encoder,
                            assembler,
                            tree])

In [None]:
fit_lr = lr_pipeline.fit(train_data)
lr_predict_val = fit_lr.transform(val_data)
lr_predict_train = fit_lr.transform(train_data)
lr_predict_val.select('price','prediction').display()

In [None]:
fit_rf = rf_pipeline.fit(train_data)
rf_predict_val = fit_rf.transform(val_data)
rf_predict_train = fit_rf.transform(train_data)
rf_predict_val.select('price','prediction').display()

In [None]:
fit_tree = tree_pipeline.fit(train_data)
tree_predict_val = fit_tree.transform(val_data)
tree_predict_train = fit_tree.transform(train_data)
tree_predict_val.select('price','prediction').display()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_mse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")


In [None]:
model = ['Linear Regressor','Random Forest Regressor','Decision Tree Regressor']
train = [lr_predict_train,rf_predict_train,tree_predict_train]
test = [lr_predict_val,rf_predict_val,tree_predict_val]

In [None]:
for i in range(3):
    print('\t',model[i])
    mse_t = evaluator_mse.evaluate(train[i])
    mae_t = evaluator_mae.evaluate(train[i])
    r2_t = evaluator_r2.evaluate(train[i])
    rmse_t = evaluator_rmse.evaluate(train[i])
    mse_v = evaluator_mse.evaluate(test[i])
    mae_v = evaluator_mae.evaluate(test[i])
    r2_v = evaluator_r2.evaluate(test[i])
    rmse_v = evaluator_rmse.evaluate(test[i])
    print("Train Mean Squared Error (MSE): ", mse_t)
    print("Test Mean Squared Error (MSE): ", mse_v)
    print("Train R2: ", r2_t)
    print("Test R2: ", r2_v)
    print("Train Mean Absolute Error (MAE): ", mae_t)
    print("Test Mean Absolute Error (MAE): ", mae_v)
    print("Train Root Mean Squared Error (RMSE): ", rmse_t)
    print("Test Root Mean Squared Error (RMSE): ", rmse_v)
    print('---------------------------------------------')




> En general, tanto el MAE como el R2 indican que el Linear Regressor tiene un mejor desempeño en comparación con los otros modelos. El Linear Regressor muestra un MAE más bajo, lo que implica una mayor precisión en las predicciones, y tiene valores más altos de R2, lo que indica una mejor capacidad para explicar la variabilidad de los datos.

> Por lo tanto, basándonos en estas métricas, el Linear Regressor sería la elección preferida como el mejor modelo para predecir el precio los inmuebles.

In [None]:
import mlflow
import mlflow.pyspark.ml
import warnings
warnings.filterwarnings("ignore")

# Información del entrenamiento.
mlflow.pyspark.ml.autolog()

In [None]:
help(LinearRegression)

In [None]:
assembler = VectorAssembler(inputCols=varc+['waterfrontVec','zipcodeVec'], 
                            outputCol="features")

zip_indexer = StringIndexer(inputCol="zipcode", outputCol="zipcodeIndex")
zip_encoder = OneHotEncoder(inputCol="zipcodeIndex", outputCol="zipcodeVec")

water_indexer = StringIndexer(inputCol="waterfront", outputCol="waterfrontIndex")
water_encoder = OneHotEncoder(inputCol="waterfrontIndex", outputCol="waterfrontVec")


def train_linear_regression(maxIter, regParam, elasticNetParam, fitIntercept, solver, tol):

    # Especifique "nested=True" ya que esto se registrará como una ejecución secundaria dentro de Hyperopt.
    with mlflow.start_run(run_name="LinearRegression", nested=True):

        linear_reg = LinearRegression(featuresCol="features",
                                      labelCol="price",
                                      maxIter=maxIter,
                                      regParam=regParam,
                                      elasticNetParam=elasticNetParam,
                                      fitIntercept=fitIntercept,
                                      solver=solver,
                                      tol=tol)
        linear_reg_pipeline = Pipeline(stages=[zip_indexer,
                                               water_indexer,
                                               zip_encoder,
                                               water_encoder,
                                                assembler,
                                                linear_reg])

        model = linear_reg_pipeline.fit(train_data)
        evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
        predictions = model.transform(val_data)
        validation_metric = evaluator.evaluate(predictions)
        
        mlflow.log_metric("val_r2_score", validation_metric)

    return model, validation_metric

In [None]:
initial_model, val_metric = train_linear_regression(maxIter=100,
                                                    regParam=0.0,
                                                    elasticNetParam=0.0,
                                                    fitIntercept=True,
                                                    solver='auto',
                                                    tol=1e-6)

print(f"El modelo de regresión lineal entrenado logró un r2-score de {val_metric} en la porción de validación")

In [None]:
from hyperopt import fmin, tpe, hp, STATUS_OK

In [None]:
hyperspace = {
    "maxIter": hp.uniform("maxIter", 100, 10000),
    "regParam": hp.uniform("regParam", 0.0, 1.0),
    "elasticNetParam": hp.uniform("elasticNetParam", 0.0, 1.0),
    "fitIntercept": hp.choice("fitIntercept", [True, False]),
    "solver": hp.choice("solver", ["auto", "normal", "l-bfgs"]),
    "tol": hp.uniform("tol", 1e-6, 1e-3)
}

In [None]:
def train_with_hyperopt(hyperparams):
    maxIter = int(hyperparams["maxIter"])
    regParam = float(hyperparams["regParam"])
    elasticNetParam = float(hyperparams["elasticNetParam"])
    tol = float(hyperparams["tol"])
    solver = hyperparams["solver"]
    fitIntercept = hyperparams["fitIntercept"]

    model, r2_score = train_linear_regression(maxIter, regParam, elasticNetParam, fitIntercept, solver, tol)

    # Hyperopt asume el trabajo de minimizar una función de pérdida por lo que se toma el negativo del r2-score
    loss = -r2_score
    return {'loss': loss, 'status': STATUS_OK}

In [None]:
search_algorithm = tpe.suggest

with mlflow.start_run():
    best_params = fmin(fn=train_with_hyperopt, space=hyperspace, algo=search_algorithm, max_evals=8)

In [None]:
best_params

In [None]:
best_maxIter = int(best_params["maxIter"])
best_regParam = float(best_params["regParam"])
best_elasticNetParam = float(best_params["elasticNetParam"])
best_tol = float(best_params["tol"])
best_solver = ["auto", "normal", "l-bfgs", "auto-batch"][best_params["solver"]]
best_fitIntercept = [True, False][best_params["fitIntercept"]]


In [None]:
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_pipeline = Pipeline(stages=[zip_indexer,
                               water_indexer,
                               zip_encoder,
                               water_encoder,
                               assembler,
                               lr])

best_lr = LinearRegression(featuresCol="features", labelCol="price",
                            maxIter=best_maxIter,
                            regParam=best_regParam,
                            elasticNetParam=best_elasticNetParam,
                            fitIntercept=best_fitIntercept,
                            solver=best_solver,
                            tol=best_tol)
best_lr_pipeline = Pipeline(stages=[zip_indexer,
                                    water_indexer,
                                    zip_encoder,
                                    water_encoder,
                                    assembler,
                                    best_lr])

In [None]:
model = rf_pipeline.fit(train_data)
rf_predict_val = model.transform(val_data)
rf_predict_train = model.transform(train_data)

b_model = best_lr_pipeline.fit(train_data)
b_rf_predict_val = b_model.transform(val_data)
b_rf_predict_train = b_model.transform(train_data)

In [None]:
model = ['Linear Regressor','Hyper Linear Regressor']
train = [lr_predict_train,b_rf_predict_train]
test = [lr_predict_val,b_rf_predict_val]
for i in range(2):
    print('\t',model[i])
    r2_t = evaluator_r2.evaluate(train[i])
    r2_v = evaluator_r2.evaluate(test[i])
    print("Train R2: ", r2_t)
    print("Test R2: ", r2_v)

El mejor modelo resulo ser el modelo hiperparanetrizado ya que sobreajusta menos

In [None]:
best_model = b_model

In [None]:
best_model.save(output_cont+'/House_Prices_Model/')

In [None]:
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load(output_cont+'/House_Prices_Model/')

In [None]:
predict_val = loaded_model.transform(val_data)
predict_val.select('price','prediction').display()

### Entrenamiento del Modelo

**Adjuntamos el siguiente documento para detallar la estrategia de la toma de decisiones. https://rebecas17.notion.site/House-Flipping-52fb32339c124dd9a7d0e778b431ea87**

In [None]:
# Obtiener los coeficientes del modelo LinearRegression
betas = loaded_model.stages[-1].coefficients

# Obtiener los nombres de las variables de entrada del modelo
inputs = loaded_model.stages[-2].getInputCols()

print("Valores de las betas:")
print(betas)

print("Variables de entrada:")
print(inputs)

In [None]:
# Asociar los nombres de las columnas con los valores de beta
betas_por_variable = zip([x for x in inputs if x not in ['waterfrontVec','zipcodeVec']], [abs(x) for x in betas])

print("Valores de las betas por variable:")
for var, beta in betas_por_variable:
    print(f"{var}: {beta}")

> Podemos observar que las betas de las variables bedrooms, bathrooms y floors tienen un alto valor absoluto,  esto implica que estas variables tienen una buena contribución en la predicción del modelo. Específicamente, estas variables corresponden a características que se pueden "mejorar" en una vivienda para aumentar su valor.

Crearemos un modelo de Kmeans a partir de estas variables para identificar las características de las viviendas y la relación con el precio.

In [None]:
ubs_assembler = VectorAssembler(inputCols=["bedrooms", "bathrooms", "floors", "price"], outputCol="features")
ubs_kmeans = KMeans(k=3, seed=123)

ubs_pipeline = Pipeline(stages=[ubs_assembler,ubs_kmeans])

data_cluster = new_data.select('lat','long', "bedrooms", "bathrooms", "floors", 'price')

ubs_model = ubs_pipeline.fit(data_cluster)

data_cluster = ubs_model.transform(data_cluster)
data_cluster = data_cluster.withColumnRenamed('prediction','Ubs_cl')

In [None]:
data_cluster.display()

In [None]:
display(data_cluster)

![Histograma](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/modelcls_hist.png)
![Hists c Medians](https://raw.githubusercontent.com/RebecaOsorio/houseflipping/main/vizs/modelcls_medians.png)

Cluster 0

In [None]:
data_cluster.select('bedrooms', 'bathrooms', 'floors', 'price').filter("Ubs_cl = 0").summary().display()

Cluster 1

In [None]:
data_cluster.select('bedrooms', 'bathrooms', 'floors', 'price').filter("Ubs_cl = 1").summary().display()

Cluster 2

In [None]:
data_cluster.select('bedrooms', 'bathrooms', 'floors', 'price').filter("Ubs_cl = 2").summary().display()

Con los tres clusters resultantes podemos observar las siguientes características: 

Inmuebles lujosos (Cluster 2)
* Cantidad promedio de baños: 4
* Cantidad promedio de habitaciones: 3.13
* Cantidad promedio de pisos: 1.77
* Precio promedio: 1,656,006

Inmuebles promedio (Cluster 1)
* Cantidad promedio de baños: 3.6
* Cantidad promedio de habitaciones: 2.4
* Cantidad promedio de pisos: 1.6
* Precio promedio: 727,761

Inmuebles baratos (Cluster 0)
* Cantidad promedio de baños: 3.1
* Cantidad promedio de habitaciones: 1.8
* Cantidad promedio de pisos: 1.3
* Precio promedio: 356,570

Con esto, podemos identificar de manera clara las características que determinan el valor de un inmueble. Así, podemos afirmar con mayor seguridad que el valor de una propiedad puede aumentar significativamente al realizar ampliaciones, como agregar un cuarto adicional o un baño extra. Incluso, podríamos considerar una inversión aún mayor y añadir un piso extra.

In [None]:
aux = data_cluster.select('lat','long','price','Ubs_cl','bedrooms','bathrooms','floors')
aux = aux.withColumn("lat", col("lat").cast('float'))
aux = aux.withColumn("long", col("long").cast('float'))

In [None]:
### Name of the Display
name = "Model_Clusters"
cve = "model_cls_map"

### Load a pre-saved Config of the Display
json_file_path = output_cont+cve+'.json'
# Load the JSON file
with open(json_file_path, "r") as json_file:
    json_data = json.load(json_file)
# Convert the JSON data to a dictionary
config = dict(json_data)

In [None]:
kpl_clmodel = KeplerGl(height=500)
kpl_clmodel.add_data(data=aux.select('lat','long','price','Ubs_cl','bedrooms','bathrooms','floors').toPandas(),
                     name='House_Cluster')
kpl_clmodel.config = config
kpl_clmodel