# Data Loading
---
Installing the libraries

In [1]:
%%bash
download_latest_file() {
    current_date=$(date +%Y-%m-%d)
    base_url="https://github.com/ggurjar333/real-estate-analysis-dubai/releases/download"
    release="release-${current_date}"
    file="dld_rent_contracts_${current_date}.parquet"

    wget "${base_url}/${release}/${file}"
}
download_latest_file

--2025-02-23 12:10:36--  https://github.com/ggurjar333/real-estate-analysis-dubai/releases/download/release-2025-02-23/dld_rent_contracts_2025-02-23.parquet
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/640962286/4f04a9f1-07e6-49c1-919c-50b9e650e78c?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250223T121036Z&X-Amz-Expires=300&X-Amz-Signature=65ec1419ec2232959c1c785216778665fb5ca356b75a28330d528aee8a4cd2e7&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Ddld_rent_contracts_2025-02-23.parquet&response-content-type=application%2Foctet-stream [following]
--2025-02-23 12:10:36--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/640962286/4f04

## I have saved the dataset on Google Drive for faster access. If you haven't please don't run the following cell.

In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pandas

import findspark
findspark.init()

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Dubai Land Development") \
    .getOrCreate()



In [2]:
from datetime import date

In [3]:
rent_contracts_df = spark.read.parquet(f"dld_rent_contracts_{date.today()}.parquet")
rent_contracts_df.printSchema()

root
 |-- contract_id: string (nullable = true)
 |-- contract_reg_type_id: long (nullable = true)
 |-- contract_reg_type_ar: string (nullable = true)
 |-- contract_reg_type_en: string (nullable = true)
 |-- contract_start_date: string (nullable = true)
 |-- contract_end_date: string (nullable = true)
 |-- contract_amount: long (nullable = true)
 |-- annual_amount: long (nullable = true)
 |-- no_of_prop: long (nullable = true)
 |-- line_number: long (nullable = true)
 |-- is_free_hold: long (nullable = true)
 |-- ejari_bus_property_type_id: long (nullable = true)
 |-- ejari_bus_property_type_ar: string (nullable = true)
 |-- ejari_bus_property_type_en: string (nullable = true)
 |-- ejari_property_type_id: long (nullable = true)
 |-- ejari_property_type_en: string (nullable = true)
 |-- ejari_property_type_ar: string (nullable = true)
 |-- ejari_property_sub_type_id: long (nullable = true)
 |-- ejari_property_sub_type_en: string (nullable = true)
 |-- ejari_property_sub_type_ar: string (

In [4]:
rent_contracts_df.show(5)

+-------------+--------------------+--------------------+--------------------+-------------------+-----------------+---------------+-------------+----------+-----------+------------+--------------------------+--------------------------+--------------------------+----------------------+----------------------+----------------------+--------------------------+--------------------------+--------------------------+-----------------+-----------------+--------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+---------------+---------------+--------------+--------------+--------------+
|  contract_id|contract_reg_type_id|contract_reg_type_ar|contract_reg_type_en|contract_start_date|contract_end_date|contract_amount|annual_amount|no_of_prop|line_number|is_free_hold|ejari_bus_property_type_id|ejari_bus_property_

In [6]:
from pyspark.sql.functions import col
# Convert contract start and end dates to datetime objects if they aren't already
rent_contracts_df = rent_contracts_df.withColumn("contract_start_date", col("contract_start_date").cast("date"))
rent_contracts_df = rent_contracts_df.withColumn("contract_end_date", col("contract_end_date").cast("date"))


In [7]:
# prompt: 2. Descriptive Statistics
# Contract Amounts: Calculate the average, median, and range of contract_amount to understand pricing trends.
# Contract Duration: Analyze the duration of contracts by calculating the difference between contract_end_date and contract_start_date.
# Property Types: Count the occurrences of each property type (ejari_property_type_en) to identify the most common types of properties rented.

from pyspark.sql.functions import avg, max, min, col, datediff, count

# Calculate average, median, and range of contract_amount
contract_amount_stats = rent_contracts_df.select(
    avg("contract_amount").alias("avg_contract_amount"),
    max("contract_amount").alias("max_contract_amount"),
    min("contract_amount").alias("min_contract_amount")
)

contract_amount_stats.show()

# Calculate contract duration
rent_contracts_df = rent_contracts_df.withColumn(
    "contract_duration", datediff(col("contract_end_date"), col("contract_start_date"))
)
rent_contracts_df.show(5)

# Count occurrences of each property type
property_type_counts = rent_contracts_df.groupBy("ejari_property_type_en").agg(
    count("*").alias("property_count")
)

property_type_counts.show()


+-------------------+-------------------+-------------------+
|avg_contract_amount|max_contract_amount|min_contract_amount|
+-------------------+-------------------+-------------------+
|  730937.7223864113|         4200000003|                  0|
+-------------------+-------------------+-------------------+

+-------------+--------------------+--------------------+--------------------+-------------------+-----------------+---------------+-------------+----------+-----------+------------+--------------------------+--------------------------+--------------------------+----------------------+----------------------+----------------------+--------------------------+--------------------------+--------------------------+-----------------+-----------------+--------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------------

In [8]:
# prompt: 3. Trends Over Time
# Contract Start and End Dates: Analyze the distribution of contract start dates to identify peak rental periods.
# Renewals vs. New Contracts: Compare the number of new contracts versus renewals to assess tenant retention and market stability.

from pyspark.sql.functions import month, year

# Analyze contract start date distribution
start_date_counts = rent_contracts_df.groupBy(month("contract_start_date"), year("contract_start_date")).count().orderBy(year("contract_start_date"),month("contract_start_date"))
start_date_counts.show(100)


# Analyze contract renewal vs. new contracts (assuming you have a column indicating this)
#  Replace "is_renewal" with the actual column name in your DataFrame.
# If there's no such column, you need to engineer it based on your data.
if "contract_reg_type_en" in rent_contracts_df.columns:
    reg_type_counts = rent_contracts_df.groupBy("contract_reg_type_en").count()
    reg_type_counts.show()
else:
    print("No 'contract_reg_type_en' column found. Please add a column to identify contract renewals.")


+--------------------------+-------------------------+-------+
|month(contract_start_date)|year(contract_start_date)|  count|
+--------------------------+-------------------------+-------+
|                      NULL|                     NULL|8554502|
+--------------------------+-------------------------+-------+

+--------------------+-------+
|contract_reg_type_en|  count|
+--------------------+-------+
|               Renew|4092412|
|                 New|4462090|
+--------------------+-------+



In [10]:
# prompt: 4. Property Usage Analysis
# Residential vs. Commercial: Compare the average contract amounts for residential and commercial properties to understand market dynamics.
# Property Subtypes: Analyze the distribution of property subtypes (e.g., 1 bed room, 2 bed rooms) to identify popular configurations.

# Analyze residential vs. commercial property contract amounts
residential_commercial_avg = rent_contracts_df.groupBy("ejari_property_type_en").agg(
    avg("contract_amount").alias("avg_contract_amount")
)
residential_commercial_avg.show()

# Analyze the distribution of property subtypes (assuming you have a 'property_subtype' column)
if "ejari_property_sub_type_en" in rent_contracts_df.columns:
    property_subtype_counts = rent_contracts_df.groupBy("ejari_property_sub_type_en").agg(
        count("*").alias("property_subtype_count")
    )
    property_subtype_counts.show()
else:
    print("No 'ejari_property_sub_type_en' column found. Please add a relevant column to your DataFrame.")


+----------------------+-------------------+
|ejari_property_type_en|avg_contract_amount|
+----------------------+-------------------+
|               Parking|  457364.4467979068|
|           Health club|   1415430.74108053|
|                  Bank|  3337451.146067416|
|    Resturants Complex|  813715.2333333333|
|        Medical center| 1151954.0466101696|
|                  Farm|  637142.8571428572|
|                  Desk|  8807.157894736842|
|                   Spa|  643067.9722222222|
|                   ATM|  61935.57249070632|
|     Complex Warehouse|  195500.7550347858|
|         swimming pool|           875151.0|
|            Open space| 464621.38119911175|
|         Land Parking | 147998.73684210525|
|                  Hall| 387615.95454545453|
|                Office| 304438.17869835155|
|               Laundry| 154757.14285714287|
|  Supermarket, a mu...|  3225602.027586207|
|                School|  1.0064177715625E7|
|           Supermarket| 2226240.6923076925|
|         

In [12]:
# 5. Geographic Insights
# Area Analysis: Group data by area_name_en to identify which areas have the highest number of contracts and average contract amounts.
# Proximity to Landmarks: Analyze how proximity to landmarks (e.g., malls, metro stations) affects rental prices.
# nearest_landmark_en, nearest_metro_en, nearest_mall_en
area_analysis = rent_contracts_df.groupBy("area_name_en").agg(
    count("*").alias("contract_count"),
    avg("contract_amount").alias("avg_contract_amount")
)
area_analysis.show()

proximity_analysis = rent_contracts_df.groupBy("nearest_landmark_en", "nearest_metro_en", "nearest_mall_en").agg(
    avg("contract_amount").alias("avg_contract_amount")
)
proximity_analysis.show()

+--------------------+--------------+-------------------+
|        area_name_en|contract_count|avg_contract_amount|
+--------------------+--------------+-------------------+
|    Um Hurair Second|         20510| 379722.88941979525|
|         Al Khabeesi|         53867|  104748.6792655986|
|        Al Rashidiya|         19621| 101958.25284134346|
|Al Barsha South F...|         20854| 107605.65536587706|
|          Al Kheeran|         15332|  702968.7926558831|
|       Al Goze Third|        132335|  2099899.820652133|
|      Al Twar Second|           746| 301938.76273458445|
|             Al Ttay|         13332|  900543.4243924392|
|     Um Suqaim First|         13621|   631140.102341972|
|  Nad Al Shiba First|         11790| 184939.69304495334|
|       Lehbab Second|           919| 46045.532100108816|
|      Zareeba Duviya|            37|   47297.2972972973|
|      Madinat Hind 2|            27| 252629.62962962964|
|       Al Yelayiss 1|         14103| 130560.74154435226|
|       Saih S

In [14]:
# prompt: 6. Tenant Insights
# Tenant Types: Analyze the distribution of tenant types to understand the demographics of renters.
# Contract Amounts by Tenant Type: Compare average contract amounts across different tenant types to identify potential market segments.

# Tenant Insights
# Tenant Types: Analyze the distribution of tenant types to understand the demographics of renters.
tenant_type_distribution = rent_contracts_df.groupBy("tenant_type_en").count().orderBy("count", ascending=False)
tenant_type_distribution.show()

# Contract Amounts by Tenant Type: Compare average contract amounts across different tenant types to identify potential market segments.
avg_contract_by_tenant_type = rent_contracts_df.groupBy("tenant_type_en").agg(avg("contract_amount").alias("avg_contract_amount"))
avg_contract_by_tenant_type.show()


+--------------+-------+
|tenant_type_en|  count|
+--------------+-------+
|        Person|4266033|
|     Authority|3478256|
|              | 810213|
+--------------+-------+

+--------------+-------------------+
|tenant_type_en|avg_contract_amount|
+--------------+-------------------+
|        Person| 208105.18177519957|
|     Authority| 1474688.7088517924|
|              |   290886.205738491|
+--------------+-------------------+



In [21]:
# prompt: 8. Predictive Analysis
# Price Prediction: Use regression analysis to predict contract amounts based on features such as property type, area, and contract duration.
# Churn Prediction: Analyze factors that may lead to tenant churn (e.g., contract renewals) to develop strategies for tenant retention.

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

# Filter out rows where 'ejari_property_type_en' or 'area_name_en' is null or empty
cleaned_data = rent_contracts_df.filter(
    col('ejari_property_type_en').isNotNull() & (col('ejari_property_type_en') != "") &
    col('area_name_en').isNotNull() & (col('area_name_en') != "")
)

# Handle categorical columns (ejari_property_type_en, area_name_en) with StringIndexer and OneHotEncoder

area_indexer = StringIndexer(inputCol="area_name_en", outputCol="area_index")
property_type_indexer = StringIndexer(inputCol="ejari_property_type_en", outputCol="property_type_index")

area_encoder = OneHotEncoder(inputCol="area_index", outputCol="area_encoded")
property_type_encoder = OneHotEncoder(inputCol="property_type_index", outputCol="property_type_encoded")

# Assemble features
assembler = VectorAssembler(inputCols=["area_encoded", "property_type_encoded", "contract_duration", "contract_amount"], outputCol="features")

# StandardScaler (optional but can improve model performance)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Regression model
lr = LinearRegression(featuresCol="scaled_features", labelCol="contract_amount")

# Pipeline
pipeline = Pipeline(stages=[area_indexer, property_type_indexer, area_encoder, property_type_encoder, assembler, scaler, lr])

# Split the data into training and testing sets
train_data, test_data = cleaned_data.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="contract_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Py4JJavaError: An error occurred while calling o623.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 60.0 failed 1 times, most recent failure: Lost task 1.0 in stage 60.0 (TID 61) (401c0e10757c executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$4144/0x000000084179d040`: (struct<area_encoded:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,property_type_encoded:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,contract_duration_double_VectorAssembler_016393c5230b:double,contract_amount_double_VectorAssembler_016393c5230b:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:880)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:880)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$4144/0x000000084179d040`: (struct<area_encoded:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,property_type_encoded:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,contract_duration_double_VectorAssembler_016393c5230b:double,contract_amount_double_VectorAssembler_016393c5230b:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:880)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:880)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 26 more


# Data Exploration

### Save the raw data (bronze layer) to Hive data warehouse.


In [None]:
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
rent_contract_df.write.mode("overwrite").saveAsTable("bronze.rent_contracts")

# **Data Cleaning**

## Summary
Use of `summary` function  to find the number of non-null values, which can be used to infer missing values. The following cell takes ~14 minutes on Google Colab.

In [None]:
rent_contracts_summary = rent_contracts_df.describe()
rent_contracts_summary.show()


+-------+------------+--------------------+--------------------+--------------------+-------------------+-----------------+-----------------+-----------------+------------------+------------------+-------------------+--------------------------+--------------------------+--------------------------+----------------------+----------------------+----------------------+--------------------------+--------------------------+--------------------------+-----------------+-----------------+------------------+--------------------+--------------------+--------------------+-----------------+-----------------+------------+--------------+------------------+--------------------+--------------------+------------------+-------------------+---------------+------------------+------------------+--------------+--------------+
|summary| contract_id|contract_reg_type_id|contract_reg_type_ar|contract_reg_type_en|contract_start_date|contract_end_date|  contract_amount|    annual_amount|        no_of_prop|       

## Phase 1
---
Let's separate numeric and string columns
* Fill NULLs in string columns with "Unknown"
* Fill NULLs in numeric columns with 0


In [None]:
from pyspark.sql.functions import when, StringType, to_date

# Separate numeric and string columns
string_columns = [col for col, dtype in rent_contracts_df.dtypes if dtype == "string"]
numeric_columns = [col for col, dtype in rent_contracts_df.dtypes if dtype in ["int", "double"]]

# Fill NULLs in string columns with "Unknown"
rent_contracts_df_filled = rent_contracts_df.na.fill("Unknown", subset=string_columns)

# Fill NULLs in numeric columns with 0
rent_contracts_df_filled = rent_contracts_df_filled.na.fill(0, subset=numeric_columns)
rent_contracts_df_filled.show(5)

+-------------+--------------------+--------------------+--------------------+-------------------+-----------------+---------------+-------------+----------+-----------+------------+--------------------------+--------------------------+--------------------------+----------------------+----------------------+----------------------+--------------------------+--------------------------+--------------------------+-----------------+-----------------+--------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+---------------+---------------+--------------+--------------+--------------+
|  contract_id|contract_reg_type_id|contract_reg_type_ar|contract_reg_type_en|contract_start_date|contract_end_date|contract_amount|annual_amount|no_of_prop|line_number|is_free_hold|ejari_bus_property_type_id|ejari_bus_property_

## Phase 2
---
Let's convert date columns from StringType to DateType
* contract_start_date
* contract_end_date


In [None]:
from pyspark.sql.functions import to_date, year, month, dayofmonth

# Convert date columns from StringType to DateType
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_start_date", to_date(rent_contracts_df_filled["contract_start_date"], "dd-MM-yyyy")
)
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_end_date", to_date(rent_contracts_df_filled["contract_end_date"], "dd-MM-yyyy")
)

# Create contract_start_year, contract_start_month, contract_start_day columns from contract_start_date
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_start_year", year("contract_start_date")
    )
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_start_month", month("contract_start_date"))
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_start_day", dayofmonth("contract_start_date"))

# Create contract_end_year, contract_end_month, contract_end_day columns from contract_end_date
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_end_year", year("contract_end_date"))
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_end_month", month("contract_end_date"))
rent_contracts_df_filled = rent_contracts_df_filled.withColumn(
    "contract_end_day", dayofmonth("contract_end_date"))

rent_contracts_df_filled.show(5)

+-------------+--------------------+--------------------+--------------------+-------------------+-----------------+---------------+-------------+----------+-----------+------------+--------------------------+--------------------------+--------------------------+----------------------+----------------------+----------------------+--------------------------+--------------------------+--------------------------+-----------------+-----------------+--------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+---------------+---------------+--------------+--------------+--------------+-------------------+--------------------+------------------+-----------------+------------------+----------------+
|  contract_id|contract_reg_type_id|contract_reg_type_ar|contract_reg_type_en|contract_start_date|contract_end_dat

## Save cleaned data (silver layer) to Hive data warehouse.


In [None]:
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
rent_contracts_df_filled.write.mode("overwrite").saveAsTable("silver.rent_contracts")

# Basic Analysis

## Average Rental Price by Property Type

In [None]:
average_rent_by_property = spark.sql("""
    SELECT
          ejari_property_type_en
        , ROUND(AVG(annual_amount), 2) AS average_rent
    FROM silver.rent_contracts
    GROUP BY ejari_property_type_en
    ORDER BY average_rent DESC
""")
average_rent_by_property.show()

+----------------------+------------+
|ejari_property_type_en|average_rent|
+----------------------+------------+
|            University|   6000000.0|
|                School|  5899303.22|
|              Hospital|  3752910.85|
|                  Bank|  3328262.96|
|                 Hotel|  3272997.37|
|               College|  3176737.31|
|      Hotel apartments|  2757311.98|
|  Supermarket, a mu...|  1621205.04|
|       Shopping Center|   1606000.0|
|           Labor Camps|  1567115.49|
|              Building|  1525777.98|
|           Sports Club|  1346740.16|
|        hotel building|   1305000.0|
|                Cinema|  1065034.94|
|           Supermarket|  1054737.95|
|        Service Center|  1042636.49|
|            Portacabin|   917946.46|
|   Staff Accommodation|    775152.4|
|     Comercial Complex|   758445.21|
|        Medical center|   592220.22|
+----------------------+------------+
only showing top 20 rows



## Total Number of Contracts Per Year

In [None]:
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
# Aggregate the number of rental contracts by year
no_of_contracts_by_year = spark.sql("""
    SELECT
          contract_end_year
        , COUNT(*) AS total_contracts
    FROM silver.rent_contracts
    WHERE contract_end_year <= year(current_date())
    GROUP BY contract_end_year
    ORDER BY total_contracts DESC
""")
no_of_contracts_by_year.show(5)

+-----------------+---------------+
|contract_end_year|total_contracts|
+-----------------+---------------+
|             2025|        1011515|
|             2024|         997647|
|             2023|         897390|
|             2022|         762576|
|             2020|         643837|
+-----------------+---------------+
only showing top 5 rows



## Plot

In [None]:
import plotly.express as px

# Convert the Spark DataFrame to a Pandas DataFrame for plotting
pandas_df = no_of_contracts_by_year.toPandas()

# Create the plot
fig = px.line(
    pandas_df,
    x='contract_end_year',
    y='total_contracts',
    title='Trend of Rental Contracts Over Time',
    markers=True
)

fig.update_layout(
    xaxis_title="Year",
    yaxis_title="Number of Contracts (In Millions)"
)

fig.show()

## Save analyzed data (gold layer) to Hive data warehouse.

In [None]:
# Save metrics to Hive data warehouse
no_of_contracts_by_year.write.mode("overwrite").saveAsTable("gold.no_of_contracts_by_year")
average_rent_by_property.write.mode("overwrite").saveAsTable("gold.average_rent_by_property")

# Data Visualization

## Top 5 Areas by the number of contracts

In [None]:
no_of_contracts_by_area = spark.sql("""
    SELECT
          area_name_en
        , COUNT(*) AS total_contracts
    FROM silver.rent_contracts
    GROUP BY area_name_en
    ORDER BY total_contracts DESC
""")
no_of_contracts_by_area.show(5)
no_of_contracts_by_area.write.mode("overwrite").saveAsTable("gold.no_of_contracts_by_area")

+--------------------+---------------+
|        area_name_en|total_contracts|
+--------------------+---------------+
|     Al Warsan First|         318073|
|Jabal Ali Industr...|         289570|
|     Jabal Ali First|         275205|
|   Muhaisanah Second|         269075|
|           Al Karama|         225229|
+--------------------+---------------+
only showing top 5 rows



In [None]:
import plotly.express as px

# Convert the Spark DataFrame to a Pandas DataFrame for plotting
pandas_df = no_of_contracts_by_area.toPandas()

# Create the plot
fig = px.bar(
    pandas_df.head(5),
    x='area_name_en',
    y='total_contracts',
    color='area_name_en',
    title='Top 5 Areas by the Number of Contracts'
)

fig.update_layout(
    xaxis_title='Area',
    yaxis_title='Number of Contracts',
    showlegend=False  # Hide the legend
)

fig.show()

## The average rental price per year and area name

In [None]:
# Calculate average rental price per year per area
average_rent_by_year_and_area = spark.sql("""
    SELECT
          contract_end_year as year
        , area_name_en AS area_name
        , ROUND(AVG(annual_amount)) AS average_rent
    FROM silver.rent_contracts
    WHERE contract_end_year <= year(current_date())
    GROUP BY contract_end_year, area_name_en
    ORDER BY contract_end_year DESC
""")
average_rent_by_year_and_area.write.mode("overwrite").saveAsTable("gold.average_rent_by_year_and_area")

In [None]:
import plotly.express as px

# Convert to Pandas DataFrame for plotting
average_rent_pandas = average_rent_by_year_and_area.toPandas()

fig = px.line(
    average_rent_pandas,
    x='year',
    y='average_rent',
    color='area_name',
    title='Yearly Trend of Average Rental Price by Area'
)

fig.update_layout(
    xaxis_title='Year',
    yaxis_title='Average Rental Price',
    legend_title='Area Name',
    legend=dict(x=1, y=1)  # Place legend outside to the right
)

fig.show()

---
Author: Gaurav Gurjar

Email: ggurjar333@gmail.com

Date: 09/01/2025