In [1]:
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace,cast,trim
from pyspark.sql.functions import split, col,expr,when
import warnings
warnings.filterwarnings("ignore")

In [2]:
spark = SparkSession.builder \
                .master('yarn') \
                .appName('de-capstone-project') \
                .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.27.0') \
                .getOrCreate()

24/06/17 20:35:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
df = spark.read.csv('gs://de-zoomcamp-bucket-1/2024-05-21/*', inferSchema=True , header=True, multiLine=True) 

                                                                                

In [4]:
df.show(5)

                                                                                

+----------+---------+--------+-----------------+-----------+---------------+--------------------+---------------+---------+--------------------+-------------+
|      area|bathrooms|bedrooms|completion_status| furnishing|       latitude|            location|      longitude|    price|   property_keywords|property_type|
+----------+---------+--------+-----------------+-----------+---------------+--------------------+---------------+---------+--------------------+-------------+
|  694 sqft|  2 Baths|   1 Bed|            Ready|Unfurnished|25.064106143447|Gardenia Livings,...|55.235191930164|  900,000|Brand new| Closed...|    Apartment|
|1,461 sqft|  3 Baths|  2 Beds|            Ready|Unfurnished|      25.075682|Al Sahab Tower 2,...|      55.135403|2,500,000|Vacant | Two Bed ...|    Apartment|
|1,104 sqft|  2 Baths|  2 Beds|            Ready|  Furnished|25.078159674258|Goldcrest Views 1...|55.151089601233|1,600,000|2 Bed | Lake View...|    Apartment|
|1,289 sqft|  2 Baths|  2 Beds|         

In [5]:
df.printSchema()

root
 |-- area: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- completion_status: string (nullable = true)
 |-- furnishing: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- location: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- price: string (nullable = true)
 |-- property_keywords: string (nullable = true)
 |-- property_type: string (nullable = true)



## Data Cleaning

In [6]:
# Drop duplicates values 
df = df.dropDuplicates()

### AREA 

In [7]:
# Clean area column
df = df.withColumn("area", regexp_replace("area", ",", ""))
df = df.withColumn("area", regexp_replace("area", " sqft", "").cast("int"))

### PROPERTY TYPE

In [8]:
# Define valid property types to keep
valid_property_types = [
    "Apartment", "Villa", "Townhouse", "Penthouse", 
    "Hotel Apartment", "Residential Building", "Residential Floor", "Villa Compound"
]

# Filter DataFrame to keep only valid property types
df = df.filter(df["property_type"].isin(valid_property_types))


In [9]:
df.groupby('property_type').count().orderBy(col('count').desc()).show()

[Stage 6:>                                                          (0 + 2) / 2]

+--------------------+-----+
|       property_type|count|
+--------------------+-----+
|           Apartment|27643|
|               Villa| 4543|
|           Townhouse| 2970|
|           Penthouse|  335|
|     Hotel Apartment|  233|
|Residential Building|   24|
|   Residential Floor|   19|
|      Villa Compound|    9|
+--------------------+-----+




                                                                                

### BEDROOM

In [10]:
df.groupby('bedrooms').count().show()

[Stage 12:>                                                         (0 + 2) / 2]

+--------------------+-----+
|            bedrooms|count|
+--------------------+-----+
|              4 Beds| 3424|
|              3 Beds| 5191|
|               1 Bed|11052|
|          2,275 sqft|    1|
|             4 Baths|    1|
|              8 Beds|   25|
|            11 Baths|    2|
|              6 Beds|  589|
|             5 Baths|    1|
|         12,354 sqft|    1|
|          5,785 sqft|    1|
|         61,677 sqft|    1|
|.b18418aa{min-wid...|    5|
|             11 Beds|   18|
|              2 Beds| 9119|
|         10,108 sqft|    1|
|             10 Beds|   14|
|              5 Beds| 1474|
|._5a63199c{displa...|    4|
|              Studio| 4691|
+--------------------+-----+
only showing top 20 rows




                                                                                

In [11]:
### Values have sqft are |Residential Building and Residential Floor

In [12]:
# Replace sqft values of bedrooms by 0 bedrooms
df = df.withColumn('bedrooms', when(col("bedrooms").contains("sqft"), 0).otherwise(col("bedrooms")))

In [13]:
df.groupby('bedrooms').count().orderBy(col('count').desc()).show()

24/06/17 20:36:47 WARN YarnAllocator: Container from a bad node: container_1718650127279_0002_01_000002 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 143. Diagnostics: [2024-06-17 20:36:47.555]Container killed on request. Exit code is 143
[2024-06-17 20:36:47.555]Container exited with a non-zero exit code 143. 
[2024-06-17 20:36:47.556]Killed by external signal
.
24/06/17 20:36:47 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1718650127279_0002_01_000002 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 143. Diagnostics: [2024-06-17 20:36:47.555]Container killed on request. Exit code is 143
[2024-06-17 20:36:47.555]Container exited with a non-zero exit code 143. 
[2024-06-17 20:36:47.556]Killed by external signal
.
24/06/17 20:36:47 ERROR YarnScheduler: Lost executor 2 on cluster-111-m.us-central1-f.c.de-zoomcamp

+--------------------+-----+
|            bedrooms|count|
+--------------------+-----+
|               1 Bed|11052|
|              2 Beds| 9119|
|              3 Beds| 5191|
|              Studio| 4691|
|              4 Beds| 3424|
|              5 Beds| 1474|
|              6 Beds|  589|
|              7 Beds|  145|
|              8 Beds|   25|
|             11 Beds|   18|
|             10 Beds|   14|
|              9 Beds|   11|
|                   0|    9|
|.b18418aa{min-wid...|    5|
|._5a63199c{displa...|    4|
|            11 Baths|    2|
|             4 Baths|    1|
|             5 Baths|    1|
|             2 Baths|    1|
+--------------------+-----+




                                                                                

In [14]:
# Define valid bedrooms number to keep
valid_bedrooms_number = [
    0, "Studio", "1 Bed", "2 Beds", "3 Beds", "4 Beds","5 Beds"
    "6 Beds", "7 Beds", "8 Beds", "9 Beds", "10 Beds","11 Beds"
]

# Filter DataFrame to keep only valid bedrooms number
df = df.filter(df["bedrooms"].isin(valid_bedrooms_number))


In [15]:
df.groupby('bedrooms').count().orderBy(col('count').desc()).show()



+--------+-----+
|bedrooms|count|
+--------+-----+
|   1 Bed|11052|
|  2 Beds| 9119|
|  3 Beds| 5191|
|  Studio| 4691|
|  4 Beds| 3424|
|  7 Beds|  145|
|  8 Beds|   25|
| 11 Beds|   18|
| 10 Beds|   14|
|  9 Beds|   11|
|       0|    9|
+--------+-----+




                                                                                

In [16]:
# Clean bedrooms column 
df = df.withColumn("bedrooms",regexp_replace("bedrooms","(Beds|Bed)",""))

In [17]:
df.groupby('bedrooms').count().orderBy(col('count').desc()).show()



+--------+-----+
|bedrooms|count|
+--------+-----+
|      1 |11052|
|      2 | 9119|
|      3 | 5191|
|  Studio| 4691|
|      4 | 3424|
|      7 |  145|
|      8 |   25|
|     11 |   18|
|     10 |   14|
|      9 |   11|
|       0|    9|
+--------+-----+




                                                                                

### BATHROOMS

In [18]:
df.groupby('bathrooms').count().show()



+--------------------+-----+
|           bathrooms|count|
+--------------------+-----+
|             3 Baths| 7089|
|             6 Baths|  422|
|             9 Baths|   27|
|             4 Baths| 4345|
|            11 Baths|   33|
|                   3|    3|
|             8 Baths|   59|
|             7 Baths|  113|
|             5 Baths| 2141|
|              1 Bath| 8708|
|.b18418aa{min-wid...|   12|
|            10 Baths|   28|
|._5a63199c{displa...|   34|
|             2 Baths|10676|
|                   2|    7|
|                   6|    1|
|                   4|    1|
+--------------------+-----+




[Stage 39:>                                                         (0 + 1) / 1]

                                                                                

In [19]:
# Clean bathrooms column 
df = df.withColumn("bathrooms",trim(regexp_replace("bathrooms", "(Baths|Bath)","")))

In [20]:
df = df.filter(df['bathrooms'].isin(list(range(1,12))))

In [21]:
df.groupby('bathrooms').count().orderBy(col("count").desc()).show()

24/06/17 20:37:21 WARN YarnAllocator: Container from a bad node: container_1718650127279_0002_01_000001 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 143. Diagnostics: [2024-06-17 20:37:21.236]Container killed on request. Exit code is 143
[2024-06-17 20:37:21.236]Container exited with a non-zero exit code 143. 
[2024-06-17 20:37:21.237]Killed by external signal
.
24/06/17 20:37:21 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container from a bad node: container_1718650127279_0002_01_000001 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 143. Diagnostics: [2024-06-17 20:37:21.236]Container killed on request. Exit code is 143
[2024-06-17 20:37:21.236]Container exited with a non-zero exit code 143. 
[2024-06-17 20:37:21.237]Killed by external signal
.
24/06/17 20:37:21 ERROR YarnScheduler: Lost executor 1 on cluster-111-m.us-central1-f.c.de-zoomcamp

+---------+-----+
|bathrooms|count|
+---------+-----+
|        2|10683|
|        1| 8708|
|        3| 7092|
|        4| 4346|
|        5| 2141|
|        6|  423|
|        7|  113|
|        8|   59|
|       11|   33|
|       10|   28|
|        9|   27|
+---------+-----+




[Stage 45:>                                                         (0 + 1) / 1]

                                                                                

### PRICE

In [22]:
# clean the price column and convert to integer 
df= df.withColumn("price",regexp_replace("price", ",","").cast("int"))

In [23]:
df.select('price').describe().show()

24/06/17 20:37:45 WARN YarnAllocator: Container from a bad node: container_1718650127279_0002_01_000003 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 137. Diagnostics: [2024-06-17 20:37:45.695]Container killed on request. Exit code is 137
[2024-06-17 20:37:45.695]Container exited with a non-zero exit code 137. 
[2024-06-17 20:37:45.696]Killed by external signal
.
24/06/17 20:37:45 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 3 for reason Container from a bad node: container_1718650127279_0002_01_000003 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 137. Diagnostics: [2024-06-17 20:37:45.695]Container killed on request. Exit code is 137
[2024-06-17 20:37:45.695]Container exited with a non-zero exit code 137. 
[2024-06-17 20:37:45.696]Killed by external signal
.
24/06/17 20:37:45 ERROR YarnScheduler: Lost executor 3 on cluster-111-m.us-central1-f.c.de-zoomcamp

+-------+--------------------+
|summary|               price|
+-------+--------------------+
|  count|               33635|
|   mean|   3512180.373569199|
| stddev|1.4893529375864023E7|
|    min|              250000|
|    max|          2000000000|
+-------+--------------------+




                                                                                

In [24]:
df.filter(df['price'] == 2000000000).show()



+-------+---------+--------+-----------------+----------+---------------+--------------------+---------------+----------+--------------------+---------------+
|   area|bathrooms|bedrooms|completion_status|furnishing|       latitude|            location|      longitude|     price|   property_keywords|  property_type|
+-------+---------+--------+-----------------+----------+---------------+--------------------+---------------+----------+--------------------+---------------+
|1015596|       11|     11 |            Ready|      NULL|25.186637082868|Palm Jumeirah, Dubai|55.293192662827|2000000000|Luxurious 5 Star ...|Hotel Apartment|
+-------+---------+--------+-----------------+----------+---------------+--------------------+---------------+----------+--------------------+---------------+




                                                                                

### LONGITUDE, LATITUDE

In [25]:
# convert the columns latitude, longitude to float 
df = df.withColumn("latitude", df["latitude"].cast("float"))
df = df.withColumn("longitude", df["longitude"].cast("float"))

### LOCATION

In [26]:
# split the location column and create a column for district 
df = df.withColumn('location', split(df['location'], ', '))
df = df.withColumn('district', trim(expr("location[size(location) - 2]")))
df = df.drop('location')

In [27]:
df.show(5)



+----+---------+--------+-----------------+-----------+---------+---------+-------+--------------------+-------------+--------------------+
|area|bathrooms|bedrooms|completion_status| furnishing| latitude|longitude|  price|   property_keywords|property_type|            district|
+----+---------+--------+-----------------+-----------+---------+---------+-------+--------------------+-------------+--------------------+
|1158|        2|      1 |            Ready|  Furnished| 25.12959|55.153034|2958000|Full Sea View | B...|    Apartment|       Palm Jumeirah|
|1317|        3|      2 |            Ready|Unfurnished|25.081251|55.139626|2875000|Upgraded | Amazin...|    Apartment|Jumeirah Beach Re...|
|1881|        5|      3 |            Ready|Unfurnished|25.120432| 55.25815|1750000|Great Location| E...|    Townhouse|DAMAC Hills 2 (Ak...|
| 736|        1|      1 |            Ready|  Furnished|25.056181| 55.20421|1050000|Ready to Move in ...|    Apartment|Jumeirah Village ...|
|1100|        3|    


[Stage 57:>                                                         (0 + 1) / 1]

                                                                                

In [28]:
# property types 
df.groupBy('district').count().show(30)



+--------------------+-----+
|            district|count|
+--------------------+-----+
|   The World Islands|   61|
|           The Acres|   21|
|               Mudon|  235|
|      Emirates Hills|    4|
|               Deira|    6|
|           Wasl Gate|   93|
|           Expo City|   56|
|           Al Mizhar|    1|
|             Al Twar|    2|
|   Arabian Ranches 2|   63|
|       Jumeirah Park|  110|
| Dubai Maritime City|  158|
|             Remraam|  105|
|Dubai Silicon Oas...|  201|
|           Al Barari|   36|
|       Palm Jumeirah| 1233|
|  Dubai Science Park|  403|
|         Dubai South|  583|
|           Al Warqaa|    2|
|                Reem|  134|
|     Green Community|   28|
|       Tilal Al Ghaf|  175|
|Dubai Production ...|  384|
|      Living Legends|   34|
|             Al Mina|   11|
| Dubai Creek Harbour| 1130|
|      Sobha Hartland|  811|
|Barsha Heights (T...|   56|
|               Majan|  262|
|Jumeirah Lake Tow...| 1025|
+--------------------+-----+
only showing t


                                                                                

### FURNISHING

In [29]:
# Check values in furnishing
df.groupby('furnishing').count().show()



+-----------+-----+
| furnishing|count|
+-----------+-----+
|       NULL| 3973|
|Unfurnished|19741|
|  Furnished| 9939|
+-----------+-----+




                                                                                

In [30]:
# Replace null values by 0 
df = df.withColumn('furnishing', when(df['furnishing'].isNull(),'Unknown').otherwise(df['furnishing']))

In [31]:
df.groupby('furnishing').count().show()

24/06/17 20:38:24 WARN YarnAllocator: Container from a bad node: container_1718650127279_0002_01_000004 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 143. Diagnostics: [2024-06-17 20:38:23.905]Container killed on request. Exit code is 143
[2024-06-17 20:38:23.905]Container exited with a non-zero exit code 143. 
[2024-06-17 20:38:23.905]Killed by external signal
.
24/06/17 20:38:24 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 4 for reason Container from a bad node: container_1718650127279_0002_01_000004 on host: cluster-111-m.us-central1-f.c.de-zoomcamp-capstone-project.internal. Exit status: 143. Diagnostics: [2024-06-17 20:38:23.905]Container killed on request. Exit code is 143
[2024-06-17 20:38:23.905]Container exited with a non-zero exit code 143. 
[2024-06-17 20:38:23.905]Killed by external signal
.
24/06/17 20:38:24 ERROR YarnScheduler: Lost executor 4 on cluster-111-m.us-central1-f.c.de-zoomcamp

+-----------+-----+
| furnishing|count|
+-----------+-----+
|Unfurnished|19741|
|  Furnished| 9939|
|    Unknown| 3973|
+-----------+-----+




[Stage 75:>                                                         (0 + 1) / 1]

                                                                                

### COMPLETION STATUS

In [32]:
# Check values in completion_status
df.groupby('completion_status').count().show()



+-----------------+-----+
|completion_status|count|
+-----------------+-----+
|         Off-Plan|15802|
|            Ready|17851|
+-----------------+-----+




                                                                                

In [33]:
# Check null values in latitude and longitude and drop these values
(df.filter(col('longitude').isNull()).count(),  df.filter(col('latitude').isNull()).count())

                                                                                

(1781, 1781)

In [34]:
# Drop Null values 
df = df.dropna(subset=['price','area', 'longitude','latitude'])

In [35]:
# Cleaned Dataframe 
cleaned_df = df.select(['area','bathrooms','bedrooms','longitude','latitude','completion_status','furnishing',\
                       'property_type','district','price'])

In [36]:
cleaned_df.show(5)

[Stage 96:>                                                         (0 + 1) / 1]

+----+---------+--------+---------+---------+-----------------+-----------+-------------+-------------------+-------+
|area|bathrooms|bedrooms|longitude| latitude|completion_status| furnishing|property_type|           district|  price|
+----+---------+--------+---------+---------+-----------------+-----------+-------------+-------------------+-------+
|1392|        3|      2 |55.135094|25.075712|            Ready|  Furnished|    Apartment|       Dubai Marina|2600000|
|1139|        3|      2 |  55.3442|25.201546|         Off-Plan|Unfurnished|    Apartment|Dubai Creek Harbour|2500000|
|4905|        4|      3 | 55.24493| 25.10581|            Ready|Unfurnished|        Villa| Dubai Hills Estate|6750000|
|1289|        3|      2 |55.351616|25.206316|         Off-Plan|Unfurnished|    Apartment|Dubai Creek Harbour|2953888|
|1450|        2|      2 |55.278976|25.186949|            Ready|Unfurnished|    Apartment|     Downtown Dubai|3200000|
+----+---------+--------+---------+---------+-----------


                                                                                

In [37]:
# Save the data to bucket
cleaned_df.write.parquet('gs://de-zoomcamp-bucket-1/clean_data', mode='overwrite')

                                                                                

In [39]:
spark.stop()