# Predictive analysis 

### Import libraries

In [2]:
from utils import *

from time import strftime, gmtime
from datetime import datetime
import os
from hdfs import InsecureClient

import re
import os
from time import strftime, gmtime
from datetime import datetime
import shutil

import findspark
import warnings
from pyspark.sql.functions import split, col, avg, count
from pyspark.ml.feature import FeatureHasher

import pyspark as py
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

### Read login information

In [34]:
%run utils.py

log = log_config("My.log")
logging_info = logging_creation()

user_name = logging_info["user_name"]
host = logging_info["host"]

### Download data from formatted zone

In [55]:
%run data_SelectorsAndFormatters.py

inp_path="model_temp"
master_files = {
    "hotels": {"Description": "Information about hotels in neighbourhoods"},
    "renda_familiar": {
        "keywords": ["renda_familiar"],
        "Description": "Data from Open Barcelona with incomes of families. Can be joined with 'idealista' file using a lookup file",
    },
    "idealista": {
        "keywords": ["idealista"],
        "Description": "Appartments from idealista. Can be joined with 'renta familiar' file using a lookup file",
    },
    "lookup_renta_idealista": {
        "keywords": ["extended"],
        "Description": "Lookup datable to join 'Idealista' and 'renda familiar'",
    },
}

In [56]:
# Load last versions of all dataframes from explotation zone
for key_file in master_files.keys():
    formatted_data_selector(log, user_name, host, key_file)

### Load dataframes

In [57]:
# Ensure that the spark session is correctly loaded,
# then create a sesion
findspark.init("D:\spark\spark-3.5.1-bin-hadoop3")

conf = (
    SparkConf()
        .set("spark.master", "local")
        .set("spark.app.name", "Formatted zone loader")
    )
spark = SparkSession.builder.config(conf=conf).getOrCreate()
log.info("Spark sesion correctly initialized")

# Load all dataframes in a dictionary
dfs = {}
file_names = os.listdir(inp_path)

for file in file_names:
    full_in_path = inp_path + "/" + file

    # Load dataframes
    short_name = file.split("-", maxsplit=2)[2]
    short_name = "".join(short_name)

    dfs[short_name] = (
            spark.read.option("multiline", "true").format("parquet").load(full_in_path)
    )


### Preprocessing

In [58]:
# Define from all files usefull columns for the analysis
# {file1: [columns], file2: [columns]}
selected_fields = {
    "renda_familiar": [
            "Nom_Districte",
            "Codi_Districte",
            "Índex RFD Barcelona = 100",
        ],
        "idealista": ["district", "numPhotos", "price", "floor", "size", "bathrooms"],
        "lookup_renta_idealista": ["district", "district_id"],
        "hotels": [
            "addresses_district_id",
            "name",
            "secondary_filters_name",
            "geo_epgs_4326_lat",
            "geo_epgs_4326_lon",
        ],
    }


In [59]:
for key, df in dfs.items():

    # Select columns and remove Nan
    df = df.select(selected_fields[key])
    print(f"From '{key}' dataframe, selected the following schema")
    df.printSchema()
    n_col_raw = df.count()
    df = df.na.drop()

    print(
        f"Removed {((n_col_raw - df.count())/n_col_raw) * 100} % rows that contained NA's"
    )

    dfs[key] = df

# Calculate KPI's
dfs["hotels"] = (
    dfs["hotels"]
    .withColumn(
        "stars",
        split(col("secondary_filters_name"), " ").cast("array<int>"),
    )
    .withColumn("stars", col("stars")[1])
)
dfs["hotels"] = dfs["hotels"].drop("secondary_filters_name")

# Declare numeric variables
dfs["hotels"] = (
    dfs["hotels"]
    .withColumn(
        "addresses_district_id",
        dfs["hotels"]["addresses_district_id"].cast(IntegerType()),
    )
    .withColumn(
        "geo_epgs_4326_lat",
        dfs["hotels"]["geo_epgs_4326_lat"].cast(IntegerType()),
    )
    .withColumn(
        "geo_epgs_4326_lon",
        dfs["hotels"]["geo_epgs_4326_lon"].cast(IntegerType()),
    )
)

dfs["idealista"] = dfs["idealista"].withColumn(
    "floor",
    dfs["idealista"]["floor"].cast(IntegerType()),
)

# Transform "districts names" from "renda familiar" 
# so that they match the lookup table
dfs["renda_familiar"] = (
    dfs["renda_familiar"]
    .withColumn(
        "Nom_Districte",
        split(col("Nom_Districte"), '"'),
    )
    .withColumn("Nom_Districte", col("Nom_Districte")[1])
    .withColumn(
        "Índex RFD Barcelona = 100",
        split(col("Índex RFD Barcelona = 100"), '"'),
    )
    .withColumn("Índex RFD Barcelona = 100", col("Índex RFD Barcelona = 100")[1])
)

# Group dataframes by district to reduce cardinality of joins
dfs["hotels"] = (
    dfs["hotels"]
    .groupBy("addresses_district_id")
    .agg(
        avg("stars").alias("Avg_stars"),
        count("name").alias("N_hotels"),
        avg("geo_epgs_4326_lat").alias("Avg_lat"),
        avg("geo_epgs_4326_lon").alias("Avg_long"),
    )
)

dfs["renda_familiar"] = (
    dfs["renda_familiar"]
    .groupBy(["Nom_Districte", "Codi_Districte"])
    .agg(avg("Índex RFD Barcelona = 100").alias("Avg_Index_RFD"))
)

## Join files (remove useless attributes after each join)
# 'renta familiar' and hotels
dfs["model"] = dfs["hotels"].join(
    dfs["renda_familiar"],
    dfs["hotels"].addresses_district_id == dfs["renda_familiar"].Codi_Districte,
    "inner",
)

dfs["model"] = dfs["model"].drop(*["Codi_Districte", "addresses_district_id"])

# join lookup tables
dfs["model"] = dfs["model"].join(
    dfs["lookup_renta_idealista"],
    dfs["model"].Nom_Districte == dfs["lookup_renta_idealista"].district,
    "inner",
)

dfs["model"] = dfs["model"].drop("district_id")

# join idealista
dfs["model"] = dfs["model"].join(
    dfs["idealista"],
    dfs["model"].district == dfs["idealista"].district,
    "inner",
)
dfs["model"] = dfs["model"].drop("district")
dfs["model"] = dfs["model"].drop("Nom_districte")

From 'renda_familiar' dataframe, selected the following schema
root
 |-- Nom_Districte: string (nullable = true)
 |-- Codi_Districte: string (nullable = true)
 |-- Índex RFD Barcelona = 100: string (nullable = true)

Removed 0.0 % rows that contained NA's
From 'idealista' dataframe, selected the following schema
root
 |-- district: string (nullable = true)
 |-- numPhotos: long (nullable = true)
 |-- price: double (nullable = true)
 |-- floor: string (nullable = true)
 |-- size: double (nullable = true)
 |-- bathrooms: long (nullable = true)

Removed 23.0026251919362 % rows that contained NA's
From 'lookup_renta_idealista' dataframe, selected the following schema
root
 |-- district: string (nullable = true)
 |-- district_id: string (nullable = true)

Removed 0.0 % rows that contained NA's
From 'hotels' dataframe, selected the following schema
root
 |-- addresses_district_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- secondary_filters_name: string (nullable = true

### Upload preprocessed dataframe to the explotation zone

In [60]:
model_name = "GLM_V1.parquet"
explotation_zone_path = "user/bdm/Model_explotation_zone" 

In [64]:
hdfs_client = InsecureClient(host, user=user_name)
out_full_path = explotation_zone_path + "/" + model_name 

dfs["model"].write.parquet(model_name)
hdfs_client.upload(out_full_path, model_name, overwrite=True)

log.info(f"Model {file} uploaded correctly at '{out_full_path}' path")

### Create the model

In [70]:
features = [
    "Avg_stars",
    "N_hotels",
    "Avg_lat",
    "Avg_lat",
    "numPhotos",
    "floor",
    "size",
    "Avg_Index_RFD",
    "numPhotos",
    "bathrooms",
    "Avg_Index_RFD",
]

df = dfs["model"]

In [71]:
vectorAssembler = VectorAssembler(
    inputCols=features,
    outputCol="features",
    handleInvalid="skip",
)
df = vectorAssembler.transform(df)

# Divide dataset
train, test = df.randomSplit([0.7, 0.3], seed=123)
print(f"Data splitted in Test {train.count()} rows and Train {test.count()} rows")

# Define the model and train it
glr = GeneralizedLinearRegression(
    family="gaussian", link="identity", maxIter=10, regParam=0.3, labelCol="price"
)

model1 = glr.fit(train)

Data splitted in Test 38520 rows and Train 16720 rows


In [72]:
predictions = model1.transform(train)

# Evaluate the model
print("Evaluating results of the model...")

metrics = ["r2", "rmse"]

for metric in metrics:
    evaluator = RegressionEvaluator(metricName=metric, labelCol="price")
    results = evaluator.evaluate(predictions)

    print(f"{metric} = {results}")

Evaluating results of the model...
r2 = 0.7444120529989751
rmse = 211984.0575014574
