## Imports

In [21]:
import csv
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, sum, mean
import math


from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

IP = '10.4.41.42'
PORT = '9870'
USER = 'bdm'
HDFS_PATH = '/user/bdm/idealista_income_musica_i_copes.csv'
LOCAL_PATH = 'aux_data/'


In [22]:
def download_csv_from_hdfs(hdfs_path, local_path):
    # Create an HDFS client
    client = InsecureClient(f'http://{IP}:{PORT}', user=USER)

    # Download the CSV file from HDFS
    client.download(hdfs_path, local_path, overwrite=True)

In [23]:
# Download CSV from HDFS
download_csv_from_hdfs(HDFS_PATH, LOCAL_PATH)

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Read the CSV file into a Spark DataFrame
df = spark.read.csv(LOCAL_PATH, header=True, inferSchema=True)
for column in df.columns:
    new_column = column.replace(".", "")  # Remove the dot character
    df = df.withColumnRenamed(column, new_column)
# Show the DataFrame
df.show()

+---------+-------+--------+----------+---------------+----------------------+---------+-----+--------------------+-----------------+--------------------------------+--------+----------------------------+------------------------------+-----------------------+-----------------------------+-----------------------------+-----------------+-----+------+---------+-------+----------+------------+-----------+---------+-----------------------+--------------------+-----------------------------------+---------------------------------+--------------------------+----------------------+---------------------------------+---------------------------+----------------------------------------------+-----------------------------+-------------------------------------------+---------------------+-----------------------------+-----------------------------------+-------------------+------------+----------------------------+------------------------------+--------------------------+------------------------------

In [86]:
# Get the column data types
column_types = df.dtypes

# Display the column data types
for column_name, data_type in column_types:
    print(f"Column '{column_name}': {data_type}")

Column 'bathrooms': string
Column 'country': string
Column 'distance': string
Column 'latitude': string
Column 'neighborhood_id': string
Column 'newDevelopmentFinished': string
Column 'numPhotos': string
Column 'size': string
Column 'thumbnail': string
Column 'topNewDevelopment': string
Column 'establishments_Bars de copes': string
Column 'establishments_Bars copes i Pubs': string
Column 'pop': string
Column 'establishments_Cuina dietètica': string
Column 'establishments_07 Teatres': string
Column 'establishments_Karaokes': string
Column 'establishments_Salons de ball': string
Column 'establishments_Bufets lliures': string
Column 'externalReference': string
Column 'floor': string
Column 'has360': string
Column 'has3DTour': string
Column 'hasLift': string
Column 'hasStaging': string
Column 'municipality': string
Column 'priceByArea': string
Column 'province': string
Column 'suggestedTexts_subtitle': string
Column 'url': string
Column 'establishments_Bars i pubs musicals': string
Column 

In [4]:
num_cols = ['bathrooms',
 'distance',
 'latitude',
 'numPhotos',
 'size',
 'establishments_Bars de copes',
 'establishments_Bars copes i Pubs',
 'pop',
 'establishments_Cuina dietètica',
 'establishments_07 Teatres',
 'establishments_Karaokes',
 'establishments_Salons de ball',
 'establishments_Bufets lliures',
 'floor',
 'priceByArea',
 'establishments_Bars i pubs musicals',
 'establishments_Cuina mediterrània',
 'establishments_Teatres',
 'establishments_Multiespais',
 'establishments_Auditoris i sales de concert',
 'establishments_Cuina mexicana',
 'establishments_Música',
 'establishments_Música llatina',
 'establishments_Oci i lleure',
 'establishments_Patrimoni cultural',
 'establishments_Espanyol - Flamenc - Sevillanes',
 'establishments_Humanitats i Lletres',
 'RFD',
 'establishments_Tapes',
 'establishments_Cocteleries',
 'establishments_Bars i Cafeteries',
 'establishments_Cuina andalusa',
 'establishments_Auditoris i Sales de Concerts',
 "establishments_Sales d'exposicions",
 "establishments_Punts d'interès cultural",
 'establishments_Associacions',
 'establishments_Tallers, estudis de creació',
 'establishments_11 Equipaments esportius',
 'establishments_Instal·lacions esportives',
 'establishments_Centres Espais Socials Culturals',
 'establishments_Centres i espais socials, culturals',
 'price',
 'establishments_Locals de musica en viu',
 'establishments_Locals Música en Viu',
 'establishments_Locals de música en viu',
 'establishments_Restaurants',
 'establishments_Cocteleries i xampanyeries',
 'establishments_Xampanyeries',
 'establishments_Tablaos flamencs',
 'establishments_Cuina regional',
 'establishments_Sales',
 'establishments_Carns a la brasa',
 'establishments_Fitness',
 'establishments_Espais Alternatius',
 'longitude',
 'establishments_Cuina naturista, vegetariana, biològica',
 'establishments_Ambient flamenc',
 'establishments_Sandvitxeries',
 'establishments_Cuina basca',
 'establishments_Nivell 3',
 'establishments_Altres esports',
 'establishments_Sales de festes',
 'establishments_Llesqueries',
 'establishments_Locals musicals ocasionals',
 'rooms',
 'establishments_Discoteques',
 'establishments_Sales de Música en Viu',
 'establishments_Cuina de mercat',
 'establishments_Cuina internacional',
 'establishments_Cuina clàssica',
 'establishments_Moto-ball',
 'establishments_Esglèsies i Locals Música Ocasional',
 'establishments_Carpes']

## Prepocessing

In [5]:
for column in num_cols:
    df = df.withColumn(column, col(column).cast('double'))

In [6]:
# Columns containing "establishment" in their names
columns_to_impute = [col_name for col_name in df.columns if "establishment" in col_name]

# Impute null values with 0 in the selected columns
df = df.na.fill(0, subset=columns_to_impute)

df.show()

+---------+-------+--------+----------+---------------+----------------------+---------+-----+--------------------+-----------------+--------------------------------+----------------------------+--------+------------------------------+-----------------------+-----------------------------+-----------------------------+-----------------+-----+------+---------+-------+----------+------------+-----------+---------+-----------------------+--------------------+-----------------------------------+---------------------------------+----------------------+--------------------------+-------------------------------------------+-----------------------------+---------------------+-----------------------------+---------------------------------+---------------------------+----------------------------------------------+-----------------------------------+---------+------------+------------+------+----------------------------+------------------------------+--------------------------------+--------------

In [7]:
# Count the number of null values in each column
null_counts = df.select([col(c).isNull().cast("int").alias(c) for c in df.columns])

# Compute the sum of null values for each column
column_null_counts = null_counts.agg(*[sum(col(c)).alias(c) for c in null_counts.columns])

# Extract the null count values
null_count_row = column_null_counts.collect()[0]

# Display the null counts for each column
for column in null_count_row.asDict():
    count = null_count_row[column]
    print(f"Column '{column}': {count}")





Column 'bathrooms': 14480
Column 'country': 13479
Column 'distance': 14480
Column 'latitude': 14480
Column 'neighborhood_id': 14411
Column 'newDevelopmentFinished': 29749
Column 'numPhotos': 14480
Column 'size': 14480
Column 'thumbnail': 14501
Column 'topNewDevelopment': 14477
Column 'establishments_Bars copes i Pubs': 0
Column 'establishments_Bars de copes': 0
Column 'pop': 14480
Column 'establishments_Cuina dietètica': 0
Column 'establishments_Karaokes': 0
Column 'establishments_Salons de ball': 0
Column 'establishments_Bufets lliures': 0
Column 'externalReference': 20025
Column 'floor': 21383
Column 'has360': 15435
Column 'has3DTour': 14478
Column 'hasLift': 15310
Column 'hasStaging': 14895
Column 'municipality': 14479
Column 'priceByArea': 19641
Column 'province': 14479
Column 'suggestedTexts_subtitle': 14479
Column 'url': 14479
Column 'establishments_Bars i pubs musicals': 0
Column 'establishments_Cuina mediterrània': 0
Column 'establishments_Teatres': 0
Column 'establishments_Mul

                                                                                

In [8]:
# Filter rows where "neighborhood" is null
null_neighborhood_df = df.filter(col("neighborhood").isNull())

# Show the rows with null "neighborhood"
null_neighborhood_df.show()

+---------+-----------------+--------+--------+---------------+----------------------+---------+----+---------+-----------------+--------------------------------+----------------------------+----+------------------------------+-----------------------+-----------------------------+-----------------------------+-----------------+-----+------+---------+-------+----------+------------+-----------+--------+-----------------------+----+-----------------------------------+---------------------------------+----------------------+--------------------------+-------------------------------------------+-----------------------------+---------------------+-----------------------------+---------------------------------+---------------------------+----------------------------------------------+-----------------------------------+--------+------------+------------+----+----------------------------+------------------------------+--------------------------------+--------------------------+---------------

In [9]:
# Filter rows where "neighborhood" is null
df = df.filter(col("neighborhood").isNotNull())
df = df.filter(col("price").isNotNull())

# Count the number of null values in each column
null_counts = df.select([col(c).isNull().cast("int").alias(c) for c in df.columns])

# Compute the sum of null values for each column
column_null_counts = null_counts.agg(*[sum(col(c)).alias(c) for c in null_counts.columns])

# Extract the null count values
null_count_row = column_null_counts.collect()[0]

# Display the null counts for each column
for column in null_count_row.asDict():
    count = null_count_row[column]
    print(f"Column '{column}': {count}")


Column 'bathrooms': 0
Column 'country': 0
Column 'distance': 0
Column 'latitude': 0
Column 'neighborhood_id': 0
Column 'newDevelopmentFinished': 10190
Column 'numPhotos': 0
Column 'size': 0
Column 'thumbnail': 16
Column 'topNewDevelopment': 0
Column 'establishments_Bars copes i Pubs': 0
Column 'establishments_Bars de copes': 0
Column 'pop': 0
Column 'establishments_Cuina dietètica': 0
Column 'establishments_Karaokes': 0
Column 'establishments_Salons de ball': 0
Column 'establishments_Bufets lliures': 0
Column 'externalReference': 586
Column 'floor': 3002
Column 'has360': 0
Column 'has3DTour': 0
Column 'hasLift': 832
Column 'hasStaging': 0
Column 'municipality': 0
Column 'priceByArea': 0
Column 'province': 0
Column 'suggestedTexts_subtitle': 0
Column 'url': 0
Column 'establishments_Bars i pubs musicals': 0
Column 'establishments_Cuina mediterrània': 0
Column 'establishments_Teatres': 0
Column 'establishments_Multiespais': 0
Column 'establishments_Auditoris i sales de concert': 0
Column 

In [10]:
has_cols = [col for col in df.columns if col.startswith('has')] + ['exterior']

df = df.na.fill(False, subset=has_cols)

for column in has_cols:
    df = df.withColumn(column, col(column).cast('boolean').cast('int'))

# Calculate the mean value of the "floor" column
mean_value = math.floor(df.select(mean(col('floor'))).first()[0])

# Impute null values in "floor" column with the mean value
df = df.na.fill(mean_value, subset=['floor'])

## Feature Selection

In [94]:
df.columns

['bathrooms',
 'country',
 'distance',
 'latitude',
 'neighborhood_id',
 'newDevelopmentFinished',
 'numPhotos',
 'size',
 'thumbnail',
 'topNewDevelopment',
 'establishments_Bars de copes',
 'establishments_Bars copes i Pubs',
 'pop',
 'establishments_Cuina dietètica',
 'establishments_07 Teatres',
 'establishments_Karaokes',
 'establishments_Salons de ball',
 'establishments_Bufets lliures',
 'externalReference',
 'floor',
 'has360',
 'has3DTour',
 'hasLift',
 'hasStaging',
 'municipality',
 'priceByArea',
 'province',
 'suggestedTexts_subtitle',
 'url',
 'establishments_Bars i pubs musicals',
 'establishments_Cuina mediterrània',
 'establishments_Teatres',
 'establishments_Multiespais',
 'establishments_Auditoris i sales de concert',
 'establishments_Cuina mexicana',
 'establishments_Música',
 'establishments_Música llatina',
 'establishments_Oci i lleure',
 'establishments_Patrimoni cultural',
 'establishments_Espanyol - Flamenc - Sevillanes',
 'establishments_Humanitats i Lletres'

In [11]:
final_cols = ['bathrooms',
 'latitude',
 'numPhotos',
 'size',
 'establishments_Bars de copes',
 'establishments_Bars copes i Pubs',
 'pop',
 'establishments_Cuina dietètica',
 'establishments_07 Teatres',
 'establishments_Karaokes',
 'establishments_Salons de ball',
 'establishments_Bufets lliures',
 'floor',
 'has360',
 'has3DTour',
 'hasLift',
 'hasStaging',
 'establishments_Bars i pubs musicals',
 'establishments_Cuina mediterrània',
 'establishments_Teatres',
 'establishments_Multiespais',
 'establishments_Auditoris i sales de concert',
 'establishments_Cuina mexicana',
 'establishments_Música',
 'establishments_Música llatina',
 'establishments_Oci i lleure',
 'establishments_Patrimoni cultural',
 'establishments_Espanyol - Flamenc - Sevillanes',
 'establishments_Humanitats i Lletres',
 'district',
 'RFD',
 'establishments_Tapes',
 'establishments_Cocteleries',
 'establishments_Bars i Cafeteries',
 'establishments_Cuina andalusa',
 'establishments_Auditoris i Sales de Concerts',
 "establishments_Sales d'exposicions",
 "establishments_Punts d'interès cultural",
 'establishments_Associacions',
 'establishments_Tallers, estudis de creació',
 'establishments_11 Equipaments esportius',
 'establishments_Instal·lacions esportives',
 'establishments_Centres Espais Socials Culturals',
 'establishments_Centres i espais socials, culturals',
 'hasPlan',
 'hasVideo',
 'price',
 'establishments_Locals de musica en viu',
 'establishments_Locals Música en Viu',
 'establishments_Locals de música en viu',
 'establishments_Restaurants',
 'establishments_Cocteleries i xampanyeries',
 'establishments_Xampanyeries',
 'establishments_Tablaos flamencs',
 'establishments_Cuina regional',
 'establishments_Sales',
 'establishments_Carns a la brasa',
 'establishments_Fitness',
 'establishments_Espais Alternatius',
 'longitude',
 'propertyType',
 'establishments_Cuina naturista, vegetariana, biològica',
 'establishments_Ambient flamenc',
 'establishments_Sandvitxeries',
 'establishments_Cuina basca',
 'establishments_Nivell 3',
 'establishments_Altres esports',
 'establishments_Sales de festes',
 'establishments_Llesqueries',
 'establishments_Locals musicals ocasionals',
 'exterior',
 'rooms',
 'establishments_Discoteques',
 'establishments_Sales de Música en Viu',
 'establishments_Cuina de mercat',
 'establishments_Cuina internacional',
 'establishments_Cuina clàssica',
 'establishments_Moto-ball',
 'establishments_Esglèsies i Locals Música Ocasional',
 'establishments_Carpes']

In [12]:
final_df = df.select(*final_cols)

# Count the number of null values in each column
null_counts = df.select([col(c).isNull().cast("int").alias(c) for c in final_df.columns])

# Compute the sum of null values for each column
column_null_counts = null_counts.agg(*[sum(col(c)).alias(c) for c in null_counts.columns])

# Extract the null count values
null_count_row = column_null_counts.collect()[0]

# Display the null counts for each column
for column in null_count_row.asDict():
    count = null_count_row[column]
    print(f"Column '{column}': {count}")


Column 'bathrooms': 0
Column 'latitude': 0
Column 'numPhotos': 0
Column 'size': 0
Column 'establishments_Bars de copes': 0
Column 'establishments_Bars copes i Pubs': 0
Column 'pop': 0
Column 'establishments_Cuina dietètica': 0
Column 'establishments_07 Teatres': 0
Column 'establishments_Karaokes': 0
Column 'establishments_Salons de ball': 0
Column 'establishments_Bufets lliures': 0
Column 'floor': 0
Column 'has360': 0
Column 'has3DTour': 0
Column 'hasLift': 832
Column 'hasStaging': 0
Column 'establishments_Bars i pubs musicals': 0
Column 'establishments_Cuina mediterrània': 0
Column 'establishments_Teatres': 0
Column 'establishments_Multiespais': 0
Column 'establishments_Auditoris i sales de concert': 0
Column 'establishments_Cuina mexicana': 0
Column 'establishments_Música': 0
Column 'establishments_Música llatina': 0
Column 'establishments_Oci i lleure': 0
Column 'establishments_Patrimoni cultural': 0
Column 'establishments_Espanyol - Flamenc - Sevillanes': 0
Column 'establishments_H

### One hot encoding

In [13]:
# Define the columns to perform one-hot encoding
columns_to_encode = ['district', 'propertyType']

# Create a list to store the stages of the pipeline
stages = []

# Apply one-hot encoding on the specified columns
for column in columns_to_encode:
    # Create a StringIndexer to convert the column values to numeric indices
    indexer = StringIndexer(inputCol=column, outputCol=column + "_index")
    
    # Create an instance of the OneHotEncoderEstimator to perform one-hot encoding
    encoder = OneHotEncoder(inputCols=[indexer.getOutputCol()], outputCols=[column + "_encoded"])
    
    # Add the stages to the pipeline
    stages += [indexer, encoder]

# Create a pipeline with the stages
pipeline = Pipeline(stages=stages)

# Fit the pipeline to the DataFrame
pipeline_model = pipeline.fit(final_df)

# Transform the DataFrame using the pipeline
final_df = pipeline_model.transform(final_df)

# Show the transformed DataFrame
final_df.show()

+---------+----------+---------+-----+----------------------------+--------------------------------+--------+------------------------------+-------------------------+-----------------------+-----------------------------+-----------------------------+-----+------+---------+-------+----------+-----------------------------------+---------------------------------+----------------------+--------------------------+-------------------------------------------+-----------------------------+---------------------+-----------------------------+---------------------------+---------------------------------+----------------------------------------------+-----------------------------------+---------+------+--------------------+--------------------------+--------------------------------+-----------------------------+--------------------------------------------+----------------------------------+---------------------------------------+---------------------------+-----------------------------------------

### Removing Price Outliers

In [14]:
# Calculate the quartiles and IQR
quartiles = final_df.approxQuantile('price', [0.25, 0.75], 0.01)
q1 = quartiles[0]
q3 = quartiles[1]
iqr = q3 - q1

# Define the lower and upper bounds for outliers
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
final_df = final_df.filter((col('price') >= lower_bound) & (col('price') <= upper_bound))

# Show the DataFrame without outliers
final_df.show()

+---------+----------+---------+-----+----------------------------+--------------------------------+--------+------------------------------+-------------------------+-----------------------+-----------------------------+-----------------------------+-----+------+---------+-------+----------+-----------------------------------+---------------------------------+----------------------+--------------------------+-------------------------------------------+-----------------------------+---------------------+-----------------------------+---------------------------+---------------------------------+----------------------------------------------+-----------------------------------+---------+------+--------------------+--------------------------+--------------------------------+-----------------------------+--------------------------------------------+----------------------------------+---------------------------------------+---------------------------+-----------------------------------------

### Splitting dataset

In [15]:
# Select the input features and target variable columns
remove_cols = ['price', 'district_encoded', 'propertyType_encoded', 'district', 'propertyType']
feature_columns = [col for col in final_df.columns if col not in remove_cols]
target_column = 'price'

# Create a VectorAssembler to combine the feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features', handleInvalid='skip')
assembled_df = assembler.transform(final_df)

# Split the DataFrame into train, validation, and test sets
train_ratio = 0.7
validation_ratio = 0.15
test_ratio = 0.15

train_df, validation_df, test_df = assembled_df.randomSplit([train_ratio, validation_ratio, test_ratio], seed=42)

## Fitting and Testing Model

In [16]:
# Create a LinearRegression model
lr = LinearRegression(featuresCol='features', labelCol=target_column)

# Fit the model on the training data
lr_model = lr.fit(train_df)

# Make predictions on the validation data
validation_predictions = lr_model.transform(validation_df)

# Evaluate the model on the validation data
evaluator = RegressionEvaluator(labelCol=target_column, metricName='rmse')
rmse = evaluator.evaluate(validation_predictions)
print('RMSE on validation data:', rmse)

# Make predictions on the test data
test_predictions = lr_model.transform(test_df)

# Evaluate the model on the test data
test_rmse = evaluator.evaluate(test_predictions)
print('RMSE on test data:', test_rmse)



CodeCache: size=131072Kb used=53929Kb max_used=53929Kb free=77142Kb
 bounds [0x000000010898c000, 0x000000010be8c000, 0x000000011098c000]
 total_blobs=18752 nmethods=16901 adapters=1763
 compilation: disabled (not enough contiguous free space left)


                                                                                

RMSE on validation data: 188061.4118205334
RMSE on test data: 192959.90459739906


In [17]:
test_predictions.select('price', 'prediction').show()

+--------+------------------+
|   price|        prediction|
+--------+------------------+
|280000.0|260201.23347252607|
|280000.0|259728.79762396216|
|280000.0| 260157.6893774271|
|290000.0| 176109.0258719623|
|269000.0|   69035.623539567|
|269000.0|   69035.623539567|
|280000.0|251639.62920114398|
|187500.0| 21043.23250862956|
|298000.0| 247862.0072915554|
|135000.0| 70487.37421530485|
|180000.0|217899.06791976094|
|280000.0| 245695.3214840293|
|180000.0| 237058.5717896521|
|180000.0| 223143.6949121058|
|180000.0| 223143.6949121058|
|180000.0| 245345.7787320614|
|240000.0|112426.12909033895|
|245000.0|38445.950350135565|
|245000.0| 32904.84639945626|
|219000.0|177137.58787724376|
+--------+------------------+
only showing top 20 rows



## Saving Model

In [18]:
# Save the linear regression model weights to disk
model_path = "./models"
lr_model.write().overwrite().save(model_path)

hdfs_con = InsecureClient(f'http://{IP}:9870', user='bdm')
    
# Upload the CSV file to HDFS
hdfs_con.upload('/user/bdm/models', model_path, overwrite=True)

                                                                                

'/user/bdm/models'