In [20]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import col


In [2]:
# Iniciar una sesión de PySpark
spark = SparkSession.builder.appName("MortgagePrediction").getOrCreate()

In [35]:
# Cargar los datos
file_path = "/content/drive/My Drive/clean_mortgage_data.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)
data = data.filter(col("MARKET").rlike(r"\("))
data=data.orderBy(["YEAR", "MONTH"], ascending=[True, True])
# Mostrar las primeras filas
data.show(5)

+------+---------+-------------+---------------+-----+-------------+--------------------+------+----+-------+-----+----------+-----------+-----------+------------------+
|SOURCE|FREQUENCY|     SERIESID|       GEOLEVEL|GEOID|      GEONAME|              MARKET|PERIOD|YEAR|QUARTER|MONTH|SUPPRESSED|MORT_NUMBER|DOLLAR_MEAN|        VOL_DOLLAR|
+------+---------+-------------+---------------+-----+-------------+--------------------+------+----+-------+-----+----------+-----------+-----------+------------------+
|  NMDB|Quarterly|PCT_CLTV_8090|       National|  USA|United States|Conventional Conf...|1998Q1|1998|      1|    3|         0|       16.9|       20.1|            6793.8|
|  NMDB|Quarterly|     TOT_ORIG|  Census Region|  RNE|    Northeast|All Mortgages (Ho...|1998Q1|1998|      1|    3|         0|      145.0|    16994.0|         4.92826E7|
|  NMDB|Quarterly|PCT_CLTV_8090|  Census Region|  RNE|    Northeast|Conventional Conf...|1998Q1|1998|      1|    3|         0|        9.4|       12.1|

In [36]:
# Manejo de valores nulos (si los hay)
data = data.dropna()

In [9]:
# Transformar columnas categóricas en índices numéricos
indexer_geoname = StringIndexer(inputCol="GEONAME", outputCol="GEONAME_Index")
indexer_market = StringIndexer(inputCol="MARKET", outputCol="MARKET_Index")

In [37]:
original_data=data

In [10]:
data = indexer_geoname.fit(data).transform(data)
data = indexer_market.fit(data).transform(data)

In [11]:
# Crear un vector de características
assembler = VectorAssembler(
    inputCols=["GEONAME_Index", "MARKET_Index", "YEAR"],
    outputCol="features"
)
data = assembler.transform(data)

In [13]:
# Seleccionar características y etiqueta
data = data.select("features", "MORT_NUMBER", "YEAR")

In [14]:
# Ordenar los datos por YEAR en orden ascendente
data = data.orderBy("YEAR")
# Dividir el conjunto de datos en 80% entrenamiento y 20% prueba basado en la temporalidad
# Calcular el límite para el 80% de los datos
total_count = data.count()
train_count = int(total_count * 0.8)
# Dividir los datos
train_data = data.limit(train_count)  # Los primeros años (80%)
test_data = data.subtract(train_data)  # El resto (20%)

In [15]:
# Crear y entrenar el modelo de regresión lineal
lr = LinearRegression(featuresCol="features", labelCol="MORT_NUMBER")
lr_model = lr.fit(train_data)


In [16]:
# Hacer predicciones en el conjunto de prueba
predictions = lr_model.transform(test_data)

In [17]:
# Evaluar el modelo
evaluator = RegressionEvaluator(
    labelCol="MORT_NUMBER",
    predictionCol="prediction",
    metricName="rmse"
)

In [18]:
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Mostrar predicciones
predictions.select("features", "MORT_NUMBER", "prediction").show(100)

Root Mean Squared Error (RMSE): 118.78156380764898
+-----------------+-----------+------------------+
|         features|MORT_NUMBER|        prediction|
+-----------------+-----------+------------------+
| [0.0,0.0,2018.0]|        6.3| 40.72878024569286|
| [0.0,0.0,2018.0]|       11.9| 40.72878024569286|
| [0.0,0.0,2018.0]|       13.4| 40.72878024569286|
| [0.0,0.0,2018.0]|       14.2| 40.72878024569286|
| [0.0,0.0,2018.0]|       23.1| 40.72878024569286|
| [0.0,0.0,2018.0]|       25.2| 40.72878024569286|
| [0.0,0.0,2018.0]|       33.5| 40.72878024569286|
| [0.0,0.0,2018.0]|       36.6| 40.72878024569286|
| [0.0,0.0,2018.0]|       51.7| 40.72878024569286|
| [0.0,0.0,2018.0]|       87.6| 40.72878024569286|
| [0.0,1.0,2018.0]|        5.4|40.395873579257795|
| [0.0,1.0,2018.0]|       19.6|40.395873579257795|
| [0.0,1.0,2018.0]|       23.1|40.395873579257795|
| [0.0,1.0,2018.0]|       25.7|40.395873579257795|
| [0.0,1.0,2018.0]|       34.2|40.395873579257795|
| [0.0,1.0,2018.0]|       43.3|

In [54]:
input_data = [
    Row(GEONAME="Northeast", MARKET="All Mortgages (Home Purchase)", YEAR=2023)
]

In [55]:
input_df = spark.createDataFrame(input_data)

In [56]:
input_df

DataFrame[GEONAME: string, MARKET: string, YEAR: bigint]

In [57]:
# Transformar GEONAME y MARKET al mismo formato que el modelo entrenado
input_df = indexer_geoname.fit(original_data).transform(input_df)
input_df = indexer_market.fit(original_data).transform(input_df)


In [58]:
# Crear el vector de características
input_df = assembler.transform(input_df)

# Seleccionar solo las columnas necesarias
input_df = input_df.select("features")

# Realizar la predicción
predictions = lr_model.transform(input_df)



In [59]:
# Mostrar la predicción
predictions.select("features", "prediction").show()

+----------------+------------------+
|        features|        prediction|
+----------------+------------------+
|[6.0,0.0,2023.0]|40.350187068881496|
+----------------+------------------+

