# Proyecto final
---
Este notebook tiene como objetivo ser presentado como trabajo final del módulo 3 del programa de formación MLDS, este proyecto fue realizado por:



*   Juan Camilo Gutierrez
*   Yenny Paola Dorado





 **Instalación de modulos de Pyspark y Python** 

In [None]:
# Instalamos el OpenJDK 8 con apt-get.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Configuramos la variable de entorno de Java.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


In [None]:
# Instalamos los paquetes de PySpark.
!pip install -q pyspark

[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[K     |████████████████████████████████| 199 kB 28.2 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# Instalamos los paquetes de PySpark.
!pip install -q pyspark
!pip install -q findspark

In [None]:
# Importamos pyspark
import pyspark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Librerías básicas de análisis de datos.
import numpy as np
import pandas as pd
import matplotlib as mpl

In [None]:
# Versiones de las librerías usadas.
!python --version
print('PySpark', pyspark.__version__)
print('NumPy', np.__version__)
print('Pandas', pd.__version__)
print('Matplotlib', mpl.__version__)

Python 3.7.13
PySpark 3.3.0
NumPy 1.21.6
Pandas 1.3.5
Matplotlib 3.2.2


# Descripción del dataset


---


El dataset que se utilizará para este proyecto es de la base de datos de la aplicación Airbnb, la cual se puede consultar en este [link](http://insideairbnb.com/get-the-data.html).
Específicamente, se seleccionó el listado de alojamientos de la ciudad de Londres en Inglaterra, el cual contiene la información detallada de los anuncios publicados de la ciudad.

# Problema


---

Se va a realizar el análisis hecho en el anterior proyecto, con el esquema de programación para el proceso de datos distribuidos **Apache Spark**, para ello se analizará la información con ayuda de este framework para procesar los datos, además se utilizarán técnicas de Machine Learning para analizar la distribución de precios y reseñas de las publicaciones de los alojamientos en la ciudad de **Londres**, la cual es la ciudad con mayores anuncios en Airbnb. 



# Objetivos 


---


# Generales

*   Conocer la distribución de precios ciudad de Londres.

# Específicos

*   Describir los datos con ayuda de Apache Spark.

* Modelar los precios mediante alguna técnica de Machine Learning.



#Carga de datos

---



Lectura de los datos

In [None]:
lista_original = pd.read_csv('./sample_data/listings.csv')

In [None]:
lista=lista_original

In [None]:
lista=lista.drop(['amenities','name','description','neighborhood_overview','host_about'],axis=1)

In [None]:
for i in range(lista['price'].size):
  lista['price'][i]=lista['price'][i].replace('$','')
  lista['price'][i]=lista['price'][i].replace(',','')
lista.price = lista.price.astype('float64')
lista.longitude = lista.longitude.astype('float64')
lista.latitude = lista.latitude.astype('float64')

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [None]:
lista.to_csv('./sample_data/listingsSpark.csv')

# Exploración de los datos
---

In [None]:
lista.shape

(66632, 69)

In [None]:
archivo = './sample_data/listingsSpark.csv'
df_spark = spark.read.csv(archivo, inferSchema=True, header=True,multiLine=True)

# imprimir tipo de archivo
print(type(df_spark))

<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
df_spark

DataFrame[_c0: int, id: bigint, listing_url: string, scrape_id: bigint, last_scraped: timestamp, picture_url: string, host_id: int, host_url: string, host_name: string, host_since: timestamp, host_location: string, host_response_time: string, host_response_rate: string, host_acceptance_rate: string, host_is_superhost: string, host_thumbnail_url: string, host_picture_url: string, host_neighbourhood: string, host_listings_count: double, host_total_listings_count: double, host_verifications: string, host_has_profile_pic: string, host_identity_verified: string, neighbourhood: string, neighbourhood_cleansed: string, neighbourhood_group_cleansed: string, latitude: double, longitude: double, property_type: string, room_type: string, accommodates: int, bathrooms: string, bathrooms_text: string, bedrooms: double, beds: double, price: double, minimum_nights: int, maximum_nights: int, minimum_minimum_nights: double, maximum_minimum_nights: double, minimum_maximum_nights: double, maximum_maximum_n

Revisión de cuantos registros tiene la base

In [None]:
df_spark.count()

66632

Estructura del DataFrame

In [None]:
df_spark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: timestamp (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: timestamp (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: double (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- host_verifications: string (nullable = true)
 |-- host_has_profile_pic: string (nu

Revisión de los nombres de las columnas del DataFrame

In [None]:
df_spark.columns

['_c0',
 'id',
 'listing_url',
 'scrape_id',
 'last_scraped',
 'picture_url',
 'host_id',
 'host_url',
 'host_name',
 'host_since',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_acceptance_rate',
 'host_is_superhost',
 'host_thumbnail_url',
 'host_picture_url',
 'host_neighbourhood',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'host_has_profile_pic',
 'host_identity_verified',
 'neighbourhood',
 'neighbourhood_cleansed',
 'neighbourhood_group_cleansed',
 'latitude',
 'longitude',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bathrooms_text',
 'bedrooms',
 'beds',
 'price',
 'minimum_nights',
 'maximum_nights',
 'minimum_minimum_nights',
 'maximum_minimum_nights',
 'minimum_maximum_nights',
 'maximum_maximum_nights',
 'minimum_nights_avg_ntm',
 'maximum_nights_avg_ntm',
 'calendar_updated',
 'has_availability',
 'availability_30',
 'availability_60',
 'availability_90',
 'availability_365',
 'calendar_last_scr

A continuación se observan los 20 primeros registros de la base

In [None]:
df_spark.show()

+---+-----+--------------------+--------------+-------------------+--------------------+-------+--------------------+------------+-------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+--------+---------+--------------------+---------------+------------+---------+----------------+--------+----+-----+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+---------------------+-----------------+---------------------+----------------------+-------------------+-----------------

In [None]:
df=df_spark.drop(*('_c0','listing_url','scrape_id','last_scraped','picture_url','host_url','host_since','host_response_time','host_response_rate','host_acceptance_rate','host_is_superhost','host_thumbnail_url','host_picture_url','host_neighbourhood','host_listings_count','host_total_listings_count','host_verifications','host_has_profile_pic','host_identity_verified','neighbourhood_cleansed','neighbourhood_group_cleansed','review_scores_checkin','review_scores_communication','review_scores_location','review_scores_value','license','instant_bookable','calculated_host_listings_count','calculated_host_listings_count_entire_homes','calculated_host_listings_count_private_rooms','calculated_host_listings_count_shared_rooms','reviews_per_month','minimum_minimum_nights','maximum_minimum_nights','minimum_maximum_nights','maximum_maximum_nights','minimum_nights_avg_ntm','maximum_nights_avg_ntm','calendar_updated','has_availability','availability_30','availability_60','availability_90','availability_365','calendar_last_scraped','number_of_reviews_ltm','bathrooms_text','number_of_reviews','bathrooms','number_of_reviews_l30d','first_review','last_review','review_scores_cleanliness','review_scores_accuracy','id', 'name', 'description', 'neighborhood_overview', 'host_id','host_name', 'host_location', 'host_about', 'neighbourhood'))

In [None]:
df=df.na.drop(how="any")

Definición de algunas de las variables

* **id:**  Número de identificación único del anuncio
* **name:** Nombre de la publicación
* **description:** Descripción del anuncio
* **neighborhood_overview:** Descripción del barrio dada por el arrendatario
* **host_id:** Número de identificación único del arrendatario
* **host_name:** Nombre del arrendatario
* **host_location:** Ubicación del arrendatario
* **host_about:** Descripción del arrendatario
* **neighbourhood:** Nombre del vecindario
* **latitude:** Latitud del hospedaje
* **longitude:** Longitud del hospedaje
* **property_type:** Tipo de propiedad (casa, edificio, hotel …)
* **room_type:** Tipo de habitación (compartida, privada, hotel…)
* **accommodates:** Número máximo de personas
* **bedrooms:** Número de dormitorios
* **beds:** Número de camas
* **price:** Precio por día
* **minimum_nights:** Mínimo de noches
* **maximum_nights:** Máximo de noches
* **review_scores_rating:** Calificación de usuarios al inmueble



# Análisis descriptivo de las variables
---
A continuación, se va obtener algunas estadísticas descriptivas de las variables cuantitativas. 

In [None]:
df.describe().show()

+-------+-------------------+-------------------+-------------+---------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|summary|           latitude|          longitude|property_type|      room_type|      accommodates|          bedrooms|              beds|             price|    minimum_nights|    maximum_nights|review_scores_rating|
+-------+-------------------+-------------------+-------------+---------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|  count|              44997|              44997|        44997|          44997|             44997|             44997|             44997|             44997|             44997|             44997|               44997|
|   mean|  51.50893200362299|-0.1270199451892754|         null|           null|3.1411649665533257|1.4916549992221704| 1.798186545769718|128.

In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
latitude,44997,51.50893200362299,0.04805062705041062,51.29057,51.68522
longitude,44997,-0.1270199451892754,0.09737045904210184,-0.51065,0.27896
property_type,44997,,,Boat,Yurt
room_type,44997,,,Entire home/apt,Shared room
accommodates,44997,3.1411649665533257,1.9450132958090918,1,16
bedrooms,44997,1.4916549992221704,0.8662193434820611,1.0,22.0
beds,44997,1.798186545769718,1.3075096800014594,1.0,60.0
price,44997,128.13067537835855,262.7594767420607,8.0,18679.0
minimum_nights,44997,5.319487965864391,23.773350967402173,1,1124


# Análisis mediante Modelos de Machine Learning
---

En esta sección se utilizarán dos modelos de aprendizaje supervisados los cuales son: regresión lineal y bosques aleatorios para predecir los precios de los alojamientos de la ciudad de Londres.

# Modelo de Regresión Lineal

Primero se convertiran todas las características de diferentes columnas en una sola columna y llamemos a esta nueva columna de vector como 'Atributos' en la columna de salida.

In [None]:
from pyspark.ml.feature import VectorAssembler
#Ingresar todas las característica en una columna

assembler = VectorAssembler(inputCols=['latitude', 'longitude', 'accommodates', 'bedrooms', 'beds','review_scores_rating'], outputCol = 'Attributes')

output = assembler.transform(df)

#Entrada vs Salida
finalized_data = output.select("Attributes","price")

finalized_data.show()

+--------------------+-----+
|          Attributes|price|
+--------------------+-----+
|[51.48085,-0.2808...|195.0|
|[51.58478,-0.1605...| 72.0|
|[51.47119,-0.1625...|250.0|
|[51.57438,-0.2108...| 29.0|
|[51.4878,-0.16813...| 75.0|
|[51.52195,-0.1409...|307.0|
|[51.53403,0.02709...| 33.0|
|[51.54092,-0.0590...| 76.0|
|[51.46416,-0.3255...| 65.0|
|[51.50681,-0.2334...| 62.0|
|[51.50701,-0.2336...|190.0|
|[51.58684,-0.0863...| 45.0|
|[51.54796,-0.1646...|110.0|
|[51.52139,-0.1393...|400.0|
|[51.41565,-0.0771...| 40.0|
|[51.4786,-0.06114...| 42.0|
|[51.60268,-0.2626...| 25.0|
|[51.52605,-0.1994...| 42.0|
|[51.59031,-0.0940...| 43.0|
|[51.574,-0.21058,...| 25.0|
+--------------------+-----+
only showing top 20 rows



A continuación, debemos dividir los datos de entrenamiento y prueba de acuerdo con nuestro conjunto de datos (0.8 y 0.2 en este caso).

In [None]:
from pyspark.ml.regression import LinearRegression
#Dividir los datos en entrenamiento y prueba
train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'price')

#Ajustar el modelo a partir del conjunto de entrenamiento
regressor = regressor.fit(train_data)

#Predecir los ### en conjunto de prueba
pred = regressor.evaluate(test_data)

#Predecir el modelo
pred.predictions.show()

+--------------------+-----+------------------+
|          Attributes|price|        prediction|
+--------------------+-----+------------------+
|[51.30085,0.03037...| 75.0|115.44957308830249|
|[51.30137,0.05199...| 43.0| 145.7300144564083|
|[51.302101,0.0617...|231.0| 58.65941744646011|
|[51.30514,0.02887...| 32.0| 39.83022795907027|
|[51.30587,0.02067...| 25.0|36.229528994565726|
|[51.30675,0.03485...| 55.0| 61.55241236217262|
|[51.30918,-0.1139...| 35.0| 47.98998354518608|
|[51.31459,-0.1576...| 15.0| 50.00484174019789|
|[51.31459,-0.1576...| 20.0| 76.52558717726174|
|[51.32016,-0.1162...| 75.0|227.20313055252632|
|[51.32026,-0.1531...|357.0| 76.35614731400369|
|[51.32213,-0.1369...|160.0|160.95235146004984|
|[51.32238,-0.0982...| 20.0|  45.7877357952525|
|[51.33159,-0.1082...|190.0|220.27003936447977|
|[51.33843,-0.1052...| 47.0| 73.15780258387645|
|[51.33917,0.10434...|121.0|140.35235990238561|
|[51.34034,-0.1155...| 28.0| 72.38678974000209|
|[51.3425,-0.32074...| 87.0| 281.8588402

se obtuvieron los coeficientes y el intercepto del modelo de regresión usando el siguiente comando:

In [None]:
#Coeficientes del modelo de regresión
coeff = regressor.coefficients

#X y Y Intercepto
intr = regressor.intercept

print ("los coeficientes del modelo son : %a" %coeff)
print ("El intercepto es : %f" %intr)

los coeficientes del modelo son : DenseVector([33.3773, -79.532, 26.5207, 35.4531, -2.6748, -4.9735])
El intercepto es : -1709.707135


Finalmente se realiza la evaluación del modelo con el módulo **RegressionEvaluato** de Pyspark.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# Error de raíz cuadrada media
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Error cuadrático medio
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Error absoluto medio
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coeficiente de determinación
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)


RMSE: 260.821
MSE: 68027.442
MAE: 68.411
r2: 0.083


# Bosques Aleatorios (Random Forest)

In [None]:
df2 = df.select('latitude', 'longitude', 'accommodates', 'bedrooms', 'beds','review_scores_rating','price','property_type','room_type')
cols = df2.columns
df2.printSchema()

root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- price: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)



In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
categoricalColumns = ['property_type','room_type']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'price', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['latitude', 'longitude', 'accommodates', 'bedrooms', 'beds','review_scores_rating']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df2)
df2 = pipelineModel.transform(df2)
selectedCols = ['features','label'] + cols
df2 = df2.select(selectedCols)
df2.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- price: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)



In [None]:
train, test = df2.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 31568
Test Dataset Count: 13429


In [None]:
train.show()

+--------------------+-----+--------+---------+------------+--------+----+--------------------+-----+------------------+---------------+
|            features|label|latitude|longitude|accommodates|bedrooms|beds|review_scores_rating|price|     property_type|      room_type|
+--------------------+-----+--------+---------+------------+--------+----+--------------------+-----+------------------+---------------+
|(102,[0,93,96,97,...| 32.0|51.41639| -0.20367|           4|     2.0| 2.0|                 0.0|180.0|Entire rental unit|Entire home/apt|
|(102,[0,93,96,97,...|  1.0|51.41744| -0.17846|           3|     2.0| 2.0|                 0.0|100.0|Entire rental unit|Entire home/apt|
|(102,[0,93,96,97,...| 68.0|51.42018| -0.08408|           3|     1.0| 1.0|                 0.0| 58.0|Entire rental unit|Entire home/apt|
|(102,[0,93,96,97,...| 64.0|51.42175| -0.19843|           4|     2.0| 2.0|                 0.0|149.0|Entire rental unit|Entire home/apt|
|(102,[0,93,96,97,...| 22.0| 51.4239|  -0

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('label','price', 'rawPrediction', 'prediction', 'probability').show()

+-----+------+--------------------+----------+--------------------+
|label| price|       rawPrediction|prediction|         probability|
+-----+------+--------------------+----------+--------------------+
|  0.0|  40.0|[0.16818638094707...|       1.0|[0.00840931904735...|
| 16.0|  85.0|[0.16818638094707...|       1.0|[0.00840931904735...|
|106.0|  57.0|[0.16818638094707...|       1.0|[0.00840931904735...|
|  5.0|  60.0|[0.18219518957572...|       1.0|[0.00910975947878...|
| 19.0| 200.0|[0.09193604373618...|       1.0|[0.00459680218680...|
| 11.0|  70.0|[0.14330083794453...|       1.0|[0.00716504189722...|
|  7.0| 120.0|[0.09193604373618...|       1.0|[0.00459680218680...|
| 21.0| 140.0|[0.16818638094707...|       1.0|[0.00840931904735...|
| 22.0|  99.0|[0.08615436647441...|       1.0|[0.00430771832372...|
| 19.0| 200.0|[0.09193604373618...|       1.0|[0.00459680218680...|
| 13.0|  75.0|[0.22855267056560...|       6.0|[0.01142763352828...|
|398.0| 396.0|[0.16818638094707...|       1.0|[0

Evaluación del clasificador



In [None]:
eval = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# Error de raíz cuadrada media
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)

# Error cuadrático medio
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Error absoluto medio
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coeficiente de determinación
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)


RMSE: 309.985
MSE: 96090.624
MAE: 125.339
r2: -0.187


#Conclusiones
---

Al aplicar los dos modelos supervisados, lo que se obtuvieron fue un coeficiente de determinación menor al 20%, lo que nos indica que estos dos modelos no predicen los precios de los alojamientos correctamente, esto se debe a la baja correlación que hay entre las variables, problemática que se evidenció en el proyecto del módulo anterior, por lo que utilizaron modelos diferentes para obtener una mejor predicción, sin embargo, con este ejercicio se quería implementar lo aprendido con la metodología Apache Spark en la parte de Machine Learnig, donde pudimos encontrar que es muy similar, pero más potente al momento de correr los modelos, lo que se refleja en el tiempo de entrenamiento y evaluación de los mismos.

# Video
---
A continuación se tiene el link del video cargado en YouTube:
https://youtu.be/NdH5OrEMPO8