In [1]:
!pip install pyspark



In [1]:
import pandas as pd
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("HousingMarketAnalysis") \
    .getOrCreate()

In [3]:
df = pd.read_csv("/content/drive/MyDrive/housing_bda_project/Housing.csv")

In [4]:
dataset = spark.read.csv("/content/drive/MyDrive/housing_bda_project/Housing.csv", header=True, inferSchema=True)

In [5]:
df.head()

Unnamed: 0,price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
0,13300000,7420,4,2,3,yes,no,no,no,yes,2,yes,furnished
1,12250000,8960,4,4,4,yes,no,no,no,yes,3,no,furnished
2,12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished
3,12215000,7500,4,2,2,yes,no,yes,no,yes,3,yes,furnished
4,11410000,7420,4,1,2,yes,yes,yes,no,yes,2,no,furnished


In [6]:
df.isnull().sum()

Unnamed: 0,0
price,0
area,0
bedrooms,0
bathrooms,0
stories,0
mainroad,0
guestroom,0
basement,0
hotwaterheating,0
airconditioning,0


In [7]:
binary_columns = ["mainroad", "guestroom", "basement", "hotwaterheating", "airconditioning", "prefarea"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in binary_columns]

In [8]:
furnishing_indexer = StringIndexer(inputCol="furnishingstatus", outputCol="furnishingstatus_index")
furnishing_encoder = OneHotEncoder(inputCol="furnishingstatus_index", outputCol="furnishingstatus_encoded")

In [9]:
pipeline = Pipeline(stages=indexers + [furnishing_indexer, furnishing_encoder])

In [10]:
model = pipeline.fit(dataset)
transformed_data = model.transform(dataset)

In [11]:
transformed_data.select("mainroad_index", "guestroom_index", "furnishingstatus_encoded").show(5)

+--------------+---------------+------------------------+
|mainroad_index|guestroom_index|furnishingstatus_encoded|
+--------------+---------------+------------------------+
|           0.0|            0.0|               (2,[],[])|
|           0.0|            0.0|               (2,[],[])|
|           0.0|            0.0|           (2,[0],[1.0])|
|           0.0|            0.0|               (2,[],[])|
|           0.0|            1.0|               (2,[],[])|
+--------------+---------------+------------------------+
only showing top 5 rows



In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
feature_columns = ["area", "bedrooms", "bathrooms", "stories", "mainroad_index",
                   "guestroom_index", "basement_index", "hotwaterheating_index",
                   "airconditioning_index", "parking", "prefarea_index", "furnishingstatus_encoded"]

In [14]:
# Combine features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_data = assembler.transform(transformed_data)

In [15]:
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=42)

In [16]:
# Step 3: Initialize and train the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(train_data)

In [17]:
# Step 4: Initialize and train the Decision Tree Regressor model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="price")
dt_model = dt.fit(train_data)

In [18]:
# Step 5: Make predictions on the test data
lr_predictions = lr_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)

In [19]:
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

In [20]:
lr_rmse = evaluator.evaluate(lr_predictions)
print(f"Linear Regression RMSE: {lr_rmse}")

Linear Regression RMSE: 1118877.0537406444


In [21]:
dt_rmse = evaluator.evaluate(dt_predictions)
print(f"Decision Tree Regressor RMSE: {dt_rmse}")

Decision Tree Regressor RMSE: 1386979.7810884733


In [22]:
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
lr_mae = evaluator_mae.evaluate(lr_predictions)
dt_mae = evaluator_mae.evaluate(dt_predictions)

In [23]:
print(f"Linear Regression MAE: {lr_mae}")
print(f"Decision Tree Regressor MAE: {dt_mae}")

Linear Regression MAE: 857745.3626048075
Decision Tree Regressor MAE: 971361.7740006715


In [24]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
lr = LinearRegression(featuresCol="features", labelCol="price")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="price")

In [26]:
# Define hyperparameter grid for Linear Regression
lr_param_grid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.01, 0.1, 0.5])
                 .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .build())

# Define hyperparameter grid for Decision Tree Regressor
dt_param_grid = (ParamGridBuilder()
                 .addGrid(dt.maxDepth, [3, 5, 7])
                 .addGrid(dt.minInstancesPerNode, [1, 5, 10])
                 .build())

In [27]:
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

In [28]:
# Set up CrossValidator for Linear Regression
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_param_grid, evaluator=evaluator, numFolds=5)
# Set up CrossValidator for Decision Tree Regressor
dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_param_grid, evaluator=evaluator, numFolds=5)

In [29]:
# Fit cross-validated models
lr_cv_model = lr_cv.fit(train_data)
dt_cv_model = dt_cv.fit(train_data)

In [30]:
# Get the best models after tuning
best_lr_model = lr_cv_model.bestModel
best_dt_model = dt_cv_model.bestModel

In [31]:
# Make predictions on the test set using the best models
best_lr_predictions = best_lr_model.transform(test_data)
best_dt_predictions = best_dt_model.transform(test_data)

In [32]:
# Evaluate the best models
best_lr_rmse = evaluator.evaluate(best_lr_predictions)
best_dt_rmse = evaluator.evaluate(best_dt_predictions)

In [33]:
print(f"Tuned Linear Regression RMSE: {best_lr_rmse}")
print(f"Tuned Decision Tree Regressor RMSE: {best_dt_rmse}")

Tuned Linear Regression RMSE: 1118877.0863359626
Tuned Decision Tree Regressor RMSE: 1332150.998376276


In [34]:
# Additional Evaluation (e.g., MAE) on tuned models
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
best_lr_mae = evaluator_mae.evaluate(best_lr_predictions)
best_dt_mae = evaluator_mae.evaluate(best_dt_predictions)

In [35]:
print(f"Tuned Linear Regression MAE: {best_lr_mae}")
print(f"Tuned Decision Tree Regressor MAE: {best_dt_mae}")

Tuned Linear Regression MAE: 857745.4757071654
Tuned Decision Tree Regressor MAE: 945359.5482679035


In [38]:
df.describe()

Unnamed: 0,price,area,bedrooms,bathrooms,stories,parking
count,545.0,545.0,545.0,545.0,545.0,545.0
mean,4766729.0,5150.541284,2.965138,1.286239,1.805505,0.693578
std,1870440.0,2170.141023,0.738064,0.50247,0.867492,0.861586
min,1750000.0,1650.0,1.0,1.0,1.0,0.0
25%,3430000.0,3600.0,2.0,1.0,1.0,0.0
50%,4340000.0,4600.0,3.0,1.0,2.0,0.0
75%,5740000.0,6360.0,3.0,2.0,2.0,1.0
max,13300000.0,16200.0,6.0,4.0,4.0,3.0


In [51]:
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [44]:
feature_columns = ["area", "bedrooms", "bathrooms", "stories", "parking"]

In [45]:
# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
dataset_with_features = assembler.transform(dataset)

In [46]:
# Initialize K-Means model
kmeans = KMeans(featuresCol="features", k=3)  # Set k to the desired number of clusters
kmeans_model = kmeans.fit(dataset_with_features)
kmeans_predictions = kmeans_model.transform(dataset_with_features)

In [54]:
# Initialize Bisecting K-Means model
bisecting_kmeans = BisectingKMeans(featuresCol="features", k=5)  # Adjust k as needed
bisecting_kmeans_model = bisecting_kmeans.fit(dataset_with_features)
bisecting_kmeans_predictions = bisecting_kmeans_model.transform(dataset_with_features)

In [48]:
# Evaluate clustering with Silhouette Score
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol="features", metricName="silhouette", distanceMeasure="squaredEuclidean")

In [49]:
kmeans_silhouette = evaluator.evaluate(kmeans_predictions)
print(f"K-Means Silhouette Score: {kmeans_silhouette}")

K-Means Silhouette Score: 0.7702992874114833


In [55]:
# Evaluate Bisecting K-Means using Silhouette Score
bisecting_kmeans_silhouette = evaluator.evaluate(bisecting_kmeans_predictions)
print(f"Bisecting K-Means Silhouette Score: {bisecting_kmeans_silhouette}")

Bisecting K-Means Silhouette Score: 0.588677385322065


In [56]:
import joblib

In [62]:
# Save the best Linear Regression model (after hyperparameter tuning)
best_lr_model.save("/content/drive/MyDrive/housing_bda_project/best_lr_model")
print("Best Linear Regression model saved successfully.")

Best Linear Regression model saved successfully.


In [63]:
# Save the best Decision Tree model (after hyperparameter tuning)
best_dt_model.save("/content/drive/MyDrive/housing_bda_project/best_dt_model")
print("Best Decision Tree model saved successfully.")

Best Decision Tree model saved successfully.


In [64]:
# Save the K-Means model
kmeans_model.save("/content/drive/MyDrive/housing_bda_project/best_kmeans_model")

# Save the Bisecting K-Means model
bisecting_kmeans_model.save("/content/drive/MyDrive/housing_bda_project/best_bisecting_kmeans_model")

print("Unsupervised models saved successfully.")

Unsupervised models saved successfully.
