In [50]:
from pyspark.sql import SparkSession
import pandas as pd

# Initialize a Spark session
spark = SparkSession.builder.appName("AirbnbPriceFilling").getOrCreate()

# Cargar dataset de Madrid
# Load your datasets into Spark DataFrames
orig_mad = spark.read.csv('datasets/datasets_originales/listings-detailed-mad-original.csv', header=True, inferSchema=True, multiLine=True, sep=',', quote='"', escape='\\')
df_mad = spark.read.csv('datasets/listings_prepared_mad.csv', header=True, inferSchema=True)

In [51]:
df_mad.show(5)

+-------------------+--------------------+--------------------+---------+---------------+------------------------------+-----------------+-------+----------------------------+-----------------+-------------------+------------+--------+-------+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|                 id|         listing_url|                name|  host_id|      host_name|calculated_host_listings_count|host_is_superhost|license|neighbourhood_group_cleansed|         latitude|          longitude|   room_type|bedrooms|  price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+-------------------+--------------------+--------------------+---------+---------------+------------------------------+-----------------+-------+----------------------------+-----------------+-------------------+------------+--------+-------+-------+-

In [52]:
orig_mad.printSchema()
from pyspark.sql.functions import col, regexp_extract

# Use a regular expression to filter out non-numeric 'id' values
# Regular expression to match only numeric strings
df_numeric_ids = orig_mad.filter(regexp_extract(col("id"), r'^\d+$', 0) != '')

# Show the result to verify the changes
df_numeric_ids.show(5)

# Optionally, save the cleaned DataFrame to a new CSV file
df_numeric_ids.write.csv("temp_ids_mad.csv", header=True)

root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_cou

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/Users/kurtiswalton/Ironhack/final-project/temp_ids_mad.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [53]:
df_numeric_ids.count()

                                                                                

26924

In [54]:
# Display the schema of the DataFrame
df_mad.printSchema()

# Get the number of rows and columns
print(f"Number of rows: {df_mad.count()}, Number of columns: {len(df_mad.columns)}")

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- license: string (nullable = true)
 |-- neighbourhood_group_cleansed: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- price: string (nullable = true)
 |-- kitchen: string (nullable = true)
 |-- patio or balcony: string (nullable = true)
 |-- elevator: integer (nullable = true)
 |-- air conditioning: integer (nullable = true)
 |-- long_term: integer (nullable = true)
 |-- short_term: integer (nullable = true)
 |-- possible_long_term: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- review_scores_rating: doubl

In [55]:
# Remove the dollar sign and commas, and convert the column to a numerical format:
from pyspark.sql.functions import regexp_replace, col

df_mad = df_mad.withColumn("price", regexp_replace(col("price"), "[\$,]", "").cast("float"))

In [56]:
#  Identify Missing Values
from pyspark.sql.functions import col, sum as _sum

# Count missing values for each column
df_mad.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df_mad.columns]).show()

+---+-----------+----+-------+---------+------------------------------+-----------------+-------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
| id|listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|license|neighbourhood_group_cleansed|latitude|longitude|room_type|bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+---+-----------+----+-------+---------+------------------------------+-----------------+-------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|  0|          0|   0|      0|        4|                             0|              155|  24174|

In [57]:
### Trying to make the above more readable

from pyspark.sql.functions import col, sum as _sum

# Count missing values for each column
missing_counts = df_mad.select([
    _sum(col(c).isNull().cast("int")).alias(c) for c in df_mad.columns
])

# Transform the result into a more readable format
missing_counts = missing_counts.collect()[0].asDict()

# Convert the dictionary to a DataFrame
missing_df = spark.createDataFrame(
    [(k, v) for k, v in missing_counts.items()],
    ["Column", "MissingValues"]
)

# Optionally, filter out columns with no missing values and sort by missing value count
missing_df = missing_df.filter(col("MissingValues") > 0).orderBy(col("MissingValues").desc())

# Show the result
missing_df.show(truncate=False)

# The dataset has several columns with missing values:

# license shows 24174 missing values. We can drop this column
# review_scores_rating is missing 5856 times. We can replace these missing values with 0
# price: Missing in 5305 entries. Drop or fill in depending on the cases.
# bedrooms missing in 2211 cases.  > check for more than one bedroom > mark 1, if less than 1 > mark 0
# host_is_superhost missing 155 times. Fill in with 'f'
# neighbourhood_group_cleansed: 7 missing > manual review
# host_name missing 4 times. Fill with 'no disponible'
# latitude missing 1 time > manual review > I have checked the row (id 16270115) and the latitude value is not missing
# kitchen missing 1 time > manual review


+----------------------------+-------------+
|Column                      |MissingValues|
+----------------------------+-------------+
|license                     |24174        |
|review_scores_rating        |5856         |
|price                       |5305         |
|bedrooms                    |2211         |
|host_is_superhost           |155          |
|neighbourhood_group_cleansed|7            |
|host_name                   |4            |
|latitude                    |1            |
|kitchen                     |1            |
+----------------------------+-------------+



In [58]:
# Drop the license Column
df_mad = df_mad.drop("license")

In [59]:
# Fill review_scores_rating with 0

df_mad =df_mad.na.fill({"review_scores_rating": 0})

In [60]:
# Fill host_name with 'No Disponible'
df_mad = df_mad.na.fill({"host_name": "No Disponible"})

In [61]:
# Fill host_is_superhost with 'f'
df_mad = df_mad.na.fill({"host_is_superhost": "f"})

In [62]:
# Check by the URL link to fill in the neighbourhood for missing value

# Filter the DataFrame to find the rows with the missing value in 'neighbourhood_group_cleansed'
missing_neighbourhood = df_mad.filter(col("neighbourhood_group_cleansed").isNull())

# Show the row(s) with the missing value
missing_neighbourhood.show()

# It can clearly be seen below that there are values present in neighbourhood_group_cleansed for each of these listings. 
# Unclear why they show as missing values in python.
# I will drop these rows to avoid further complications.

+------------------+--------------------+--------------------+--------------------+---------+------------------------------+-----------------+----------------------------+---------+-----------------+------------------+---------------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|                id|         listing_url|                name|             host_id|host_name|calculated_host_listings_count|host_is_superhost|neighbourhood_group_cleansed| latitude|        longitude|         room_type|       bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+------------------+--------------------+--------------------+--------------------+---------+------------------------------+-----------------+----------------------------+---------+-----------------+------------------+---------------+-----+-------+----------

In [63]:
# Drop the rows with 'missing' values to avoid further complications.
df_mad = df_mad.na.drop(subset=['neighbourhood_group_cleansed'])

# Filter the DataFrame to find the rows with the missing value in 'neighbourhood_group_cleansed'
missing_neighbourhood = df_mad.filter(col("neighbourhood_group_cleansed").isNull())

# Show the row(s) with the missing value
missing_neighbourhood.show()

+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
| id|listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|neighbourhood_group_cleansed|latitude|longitude|room_type|bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+---

In [64]:
# Filter rows where the 'latitude' column is null
rows_with_missing_latitude = df_mad.filter(col("latitude").isNull())

# Show the rows with missing values in 'latitude'
rows_with_missing_latitude.show(truncate=False)

+--------+-------------------------------------+-----------+--------------+--------------------------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|id      |listing_url                          |name       |host_id       |host_name                 |calculated_host_listings_count|host_is_superhost|neighbourhood_group_cleansed|latitude|longitude|room_type|bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+--------+-------------------------------------+-----------+--------------+--------------------------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+-----

In [65]:
# I have manually checked the row in our cleaned database (id 16270115) and the value for latitude is NOT missing. 
# Therefore, I will add the value again manually.

# Fill 'missing' values in the 'latitude' column with the value present in the dataset
df_mad = df_mad.fillna({"latitude": 40.40902})

# Filter rows where the 'latitude' column is null
rows_with_missing_latitude = df_mad.filter(col("latitude").isNull())

# Show the rows with missing values in 'latitude'
rows_with_missing_latitude.show(truncate=False)

+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|id |listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|neighbourhood_group_cleansed|latitude|longitude|room_type|bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+---

In [66]:
# Repeat the above for the missing 'kitchen' value, as the dataset shows no missing value in this column

# Filter rows where the 'kitchen' column is null
rows_with_missing_kitchen = df_mad.filter(col("kitchen").isNull())

# Show the rows with missing values in 'kitchen'
rows_with_missing_kitchen.show(truncate=False)

+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|id |listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|neighbourhood_group_cleansed|latitude|longitude|room_type|bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+---

In [67]:
# The dataset entry for id 631090264341908777 shows '1' in the kitchen column.
# Therefore I will add the value again manually

# Fill missing values in the 'kitchen' column with the value present in the dataset
df_mad = df_mad.fillna({"kitchen": 1})

# Filter rows where the 'kitchen' column is null
rows_with_missing_kitchen = df_mad.filter(col("kitchen").isNull())

# Show the rows with missing values in 'kitchen'
rows_with_missing_kitchen.show(truncate=False)

+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
|id |listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|neighbourhood_group_cleansed|latitude|longitude|room_type|bedrooms|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+--------+---------+---------+--------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+
+---+-----------+----+-------+---------+------------------------------+-----------------+----------------------------+---

In [68]:
# To fill in "bedrooms" columns and also encode 'bedroom' and 'room type' with the following theory

# if the type is "shared bedroom" or null, we encode it as 0
# if the typr is "private room" or "hotel room", we encode it as 1
# and if the type if "entire home/apt", we encode it with 2
from pyspark.sql.functions import when, col

# Apply the Encoding Based on room_type
# Apply the encoding to the 'room_type' column
df_mad = df_mad.withColumn("room_type_encoded", 
                   when(col("room_type").isNull(), 0)
                   .when(col("room_type") == "Shared room", 0)
                   .when(col("room_type") == "Private room", 1)
                   .when(col("room_type") == "Hotel room", 1)
                   .when(col("room_type") == "Entire home/apt", 2)
                   .otherwise(0))  # This covers any unexpected or additional room types

# Now combine with the 'bedrooms' logic
df_mad = df_mad.withColumn("bedrooms_encoded", 
                   when(col("bedrooms") > 1, 1).otherwise(0))

# Show the result to verify the changes
df_mad.select("room_type", "room_type_encoded", "bedrooms", "bedrooms_encoded").show(10)

# Then remove 'room_type' and 'bedrooms'
df_mad = df_mad.drop("room_type")
df_mad = df_mad.drop("bedrooms")


+------------+-----------------+--------+----------------+
|   room_type|room_type_encoded|bedrooms|bedrooms_encoded|
+------------+-----------------+--------+----------------+
|Private room|                1|     1.0|               0|
|Private room|                1|     1.0|               0|
|Private room|                1|     2.0|               1|
|Private room|                1|     1.0|               0|
|Private room|                1|     1.0|               0|
|Private room|                1|     1.0|               0|
|Private room|                1|     1.0|               0|
|Private room|                1|     1.0|               0|
|Private room|                1|     1.0|               0|
| Shared room|                0|    NULL|               0|
+------------+-----------------+--------+----------------+
only showing top 10 rows



In [69]:
# Drop rows where 'neighbourhood_group_cleansed' is null in df_mad
df_mad = df_mad.dropna(subset=["neighbourhood_group_cleansed"])

In [70]:
# Fill in "price" column with the following theory

# 1. Mark the rows where the "availability_30" and "availability_60" are both '0' in the orig_mad dataframe
# 2. Found the correspondent rows by using its "id" in df_mad and drop the empty rows
# 3. Fill in the rest of the null values with the average price in the same neighbourhood

In [71]:
# Load the orig_mad dataframe
zero_availability_ids = df_numeric_ids.filter((col("availability_30") == 0) & (col("availability_60") == 0)).select("id")
zero_availability_ids.count()

                                                                                

43

In [72]:
# # Convert zero_availability_ids to a list
zero_availability_ids_list = [row.id for row in zero_availability_ids.distinct().collect()]

# # Filter df_mad to remove rows where id is in zero_availability_ids_list
df_mad_filtered = df_mad.filter(~col("id").isin(zero_availability_ids_list))

                                                                                

In [73]:
df_mad_filtered.count()

26874

In [74]:
from pyspark.sql import functions as F

# # Calculate the average price per neighbourhood
avg_price_neighbourhood = df_mad_filtered.groupBy("neighbourhood_group_cleansed").agg(F.round(F.mean("price")).alias("avg_price"))

# Join the original DataFrame with the average prices DataFrame
df_mad_with_avg = df_mad_filtered.join(avg_price_neighbourhood, on="neighbourhood_group_cleansed", how="left")

# # Fill missing price values with the calculated average price
df_mad_final = df_mad_with_avg.withColumn("price", F.coalesce(col("price"), col("avg_price")))

# # Drop the avg_price column as it's no longer needed
df_mad_final = df_mad_final.drop("avg_price")

# # Show the result to verify the changes
df_mad_final.select("id", "neighbourhood_group_cleansed", "price").show(10)

+-------------------+----------------------------+-----+
|                 id|neighbourhood_group_cleansed|price|
+-------------------+----------------------------+-----+
|1104967621421098322|                      Retiro| 68.0|
| 930941073152690361|          Puente de Vallecas| 16.0|
|           51647382|          Puente de Vallecas| 25.0|
|1026072412626070839|                 Carabanchel| 26.0|
| 766423030884644417|                   Salamanca|100.0|
|           52547327|                      Centro| 45.0|
| 912133850058274259|                      Centro| 25.0|
|           35987869|                  Arganzuela|141.0|
|           36412220|                 Carabanchel| 82.0|
|           19032176|                     Barajas|101.0|
+-------------------+----------------------------+-----+
only showing top 10 rows



In [75]:
df_mad_final.count()

26874

In [76]:
# Count missing values for each column
df_mad_final.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df_mad_final.columns]).show()

+----------------------------+---+-----------+----+-------+---------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
|neighbourhood_group_cleansed| id|listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|latitude|longitude|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|room_type_encoded|bedrooms_encoded|
+----------------------------+---+-----------+----+-------+---------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
|                           0|  0|          0|   0|      0|        0|    

In [77]:
from pyspark.sql.functions import col

# Create a condition to check for nulls in any column
condition = None
for c in df_mad_final.columns:
    if condition is None:
        condition = col(c).isNull()
    else:
        condition = condition | col(c).isNull()

# Filter rows where the condition is met (i.e., any column is null)
missing_data = df_mad_final.filter(condition)

# Show the rows with missing values
missing_data.show()

+----------------------------+--------+--------------------+-----------+--------------+--------------------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
|neighbourhood_group_cleansed|      id|         listing_url|       name|       host_id|           host_name|calculated_host_listings_count|host_is_superhost|latitude|longitude|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|room_type_encoded|bedrooms_encoded|
+----------------------------+--------+--------------------+-----------+--------------+--------------------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+-------------

In [78]:
# I have manually checked the row in our cleaned database (id 16270115) and the value for price is NOT missing. 
# Therefore, I will add the value again manually.

# Fill 'missing' values in the 'price' column with the value present in the dataset
df_mad_final = df_mad_final.fillna({"price": 204})

# Filter rows where the 'price' column is null
rows_with_missing_price = df_mad_final.filter(col("price").isNull())

# Show the rows with missing values in 'price'
rows_with_missing_price.show(truncate=False)

+----------------------------+---+-----------+----+-------+---------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
|neighbourhood_group_cleansed|id |listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|latitude|longitude|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|room_type_encoded|bedrooms_encoded|
+----------------------------+---+-----------+----+-------+---------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
+----------------------------+---+-----------+----+-------+---------+----

In [79]:
from pyspark.sql.functions import col

# Create a condition to check for nulls in any column
condition = None
for c in df_mad_final.columns:
    if condition is None:
        condition = col(c).isNull()
    else:
        condition = condition | col(c).isNull()

# Filter rows where the condition is met (i.e., any column is null)
missing_data = df_mad_final.filter(condition)

# Show the rows with missing values
missing_data.show()

+----------------------------+---+-----------+----+-------+---------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
|neighbourhood_group_cleansed| id|listing_url|name|host_id|host_name|calculated_host_listings_count|host_is_superhost|latitude|longitude|price|kitchen|patio or balcony|elevator|air conditioning|long_term|short_term|possible_long_term|number_of_reviews|review_scores_rating|room_type_encoded|bedrooms_encoded|
+----------------------------+---+-----------+----+-------+---------+------------------------------+-----------------+--------+---------+-----+-------+----------------+--------+----------------+---------+----------+------------------+-----------------+--------------------+-----------------+----------------+
+----------------------------+---+-----------+----+-------+---------+----

In [80]:
# Coalesce the DataFrame to a single partition
df_mad_final_coalesced = df_mad_final.coalesce(1)

# Save the DataFrame as a single CSV file
df_mad_final_coalesced.write.mode("overwrite").csv("cleaned_listings_mad-coalesced.csv", header=True)


                                                                                

In [None]:
# Save the cleaned final DataFrame to a new CSV file, overwriting if it exists
df_mad_final.write.mode("overwrite").csv("cleaned_listings_mad.csv", header=True)

In [None]:
spark.stop()