# CSCI316 Group Assignment 1
# Task 2
# Group: G18

In [4]:
pip install pandas numpy matplotlib seaborn pyspark==3.5.1 scikit-learn jupyter

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, RFormula, Bucketizer, StandardScaler, OneHotEncoder
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Correlation

In [6]:
spark = SparkSession.builder \
    .appName("EnhancedRentalPriceClassification") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

In [7]:
print(f"Spark Version: {spark.version}")

Spark Version: 3.5.1


In [8]:
import os, sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .master("local[*]")
         .config("spark.driver.bindAddress","127.0.0.1")
         .config("spark.driver.host","127.0.0.1")
         .getOrCreate())

spark.range(1).show()

+---+
| id|
+---+
|  0|
+---+



In [9]:
from pyspark.sql import functions as F

df_spark = (spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("sep", ";")
    .csv("apartments_for_rent_classified_10K.csv"))

# normalize headers (strip spaces, lowercase, replace whitespace with _)
clean_cols = [c.strip().lower().replace(" ", "_") for c in df_spark.columns]
df_spark = df_spark.toDF(*clean_cols)

print(df_spark.columns)  # check exact names

['id', 'category', 'title', 'body', 'amenities', 'bathrooms', 'bedrooms', 'currency', 'fee', 'has_photo', 'pets_allowed', 'price', 'price_display', 'price_type', 'square_feet', 'address', 'cityname', 'state', 'latitude', 'longitude', 'source', 'time']


In [10]:
df_spark = df_spark.withColumn(
    "price_num",
    F.regexp_replace(F.col("price"), r"[^0-9\.-]", "").cast("double")
)

In [11]:
(df_spark
 .select("price_num")
 .summary()
 .show())

print("\n=== DATA EXPLORATION ===")
df_spark.printSchema()

#null counts
df_spark.select([
    F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c) for c in df_spark.columns
]).show()

+-------+------------------+
|summary|         price_num|
+-------+------------------+
|  count|             10000|
|   mean|         1486.2775|
| stddev|1076.5079675665088|
|    min|             200.0|
|    25%|             949.0|
|    50%|            1270.0|
|    75%|            1695.0|
|    max|           52500.0|
+-------+------------------+


=== DATA EXPLORATION ===
root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- fee: string (nullable = true)
 |-- has_photo: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: integer (nullable = true)
 |-- address: string (nu

In [12]:
#prepare the Data for Machine Learning Algorithms
print("\n=== Data Preparation ===")

#start with clean data
print("Basic data cleaning...")
print()
df_clean = df_spark.filter(col('price').isNotNull() & (col('price') > 0))

#convert price to double to ensure it's numeric
df_clean = df_clean.withColumn("price", col("price").cast("double"))

#fill missing values with safe defaults
df_clean = df_clean.fillna({
    'bedrooms': 1.0,
    'bathrooms': 1.0,
    'square_feet': 500.0,
    'latitude': 0.0,
    'longitude': 0.0,
    'cityname': 'Unknown',
    'state': 'Unknown',
    'pets_allowed': 'Unknown',
    'has_photo': 'No',
    'source': 'Unknown'
})

#cast numeric columns to double
numeric_cols = ['bedrooms', 'bathrooms', 'square_feet', 'latitude', 'longitude']
for col_name in numeric_cols:
    if col_name in df_clean.columns:
        df_clean = df_clean.withColumn(col_name, col(col_name).cast("double"))

print("Creating engineered features with RFormula approach")
print()

#safe feature engineering (avoiding division by zero)
df_clean = df_clean.withColumn("price_per_sqft",
    when(col("square_feet") > 0, col("price") / col("square_feet")).otherwise(0.0))

df_clean = df_clean.withColumn("total_rooms",
    col("bedrooms") + col("bathrooms"))

df_clean = df_clean.withColumn("location_score",
    abs(col("latitude")) + abs(col("longitude")))

#room density feature
df_clean = df_clean.withColumn("room_density",
    when(col("square_feet") > 0, col("total_rooms") / col("square_feet")).otherwise(0.0))

print("New features created successfully")
print()

print("Creating price categories for classification")

#create price categories using quantiles
price_quantiles = df_clean.approxQuantile("price", [0.0, 0.33, 0.67, 1.0], 0.05)
print(f"Price boundaries: {price_quantiles}")

#create bucketizer
bucketizer = Bucketizer(
    splits=price_quantiles,
    inputCol="price",
    outputCol="price_category"
)

df_categorized = bucketizer.transform(df_clean)

#add readable labels
df_categorized = df_categorized.withColumn("price_label",
    when(col("price_category") == 0.0, "Low")
    .when(col("price_category") == 1.0, "Medium")
    .otherwise("High")
)

print("Price category distribution:")
df_categorized.groupBy("price_category", "price_label").count().orderBy("price_category").show()


=== Data Preparation ===
Basic data cleaning...

Creating engineered features with RFormula approach

New features created successfully

Creating price categories for classification
Price boundaries: [200.0, 1000.0, 1465.0, 52500.0]
Price category distribution:
+--------------+-----------+-----+
|price_category|price_label|count|
+--------------+-----------+-----+
|           0.0|        Low| 2999|
|           1.0|     Medium| 3322|
|           2.0|       High| 3679|
+--------------+-----------+-----+



In [13]:
print("String indexing for categorical variables")

#simple string indexing for key categorical columns
categorical_cols = ['state', 'has_photo']
indexers = []
indexed_cols = []

for col_name in categorical_cols:
    if col_name in df_categorized.columns:
        indexer = StringIndexer(
            inputCol=col_name,
            outputCol=f"{col_name}_idx",
            handleInvalid="keep"
        )
        indexers.append(indexer)
        indexed_cols.append(f"{col_name}_idx")

print("String indexers created.")

print("Feature selection...")

#select reliable features
final_features = [
    'bedrooms', 'bathrooms', 'square_feet',
    'price_per_sqft', 'total_rooms', 'location_score', 'room_density'
] + indexed_cols

print(f"Selected features: {final_features}")

String indexing for categorical variables
String indexers created.
Feature selection...
Selected features: ['bedrooms', 'bathrooms', 'square_feet', 'price_per_sqft', 'total_rooms', 'location_score', 'room_density', 'state_idx', 'has_photo_idx']


In [14]:
#Configuration
DF_READY    = df_categorized
FEATURES_COL = "final_features"
LABEL_COL    = "price_num"

In [15]:
print("6. Train/test split...")

#split data (80-20)
train_data, test_data = df_categorized.randomSplit([0.8, 0.2], seed=42)

print(f"Training data: {train_data.count()} rows")
print(f"Test data: {test_data.count()} rows")

print("Train set price distribution:")
train_data.groupBy("price_category", "price_label").count().orderBy("price_category").show()

6. Train/test split...
Training data: 8072 rows
Test data: 1928 rows
Train set price distribution:
+--------------+-----------+-----+
|price_category|price_label|count|
+--------------+-----------+-----+
|           0.0|        Low| 2398|
|           1.0|     Medium| 2683|
|           2.0|       High| 2991|
+--------------+-----------+-----+



In [16]:
from pyspark.sql.functions import col, isnan

print("final_features:", final_features)
print("df_categorized columns:", df_categorized.columns)

missing = [c for c in final_features if c not in df_categorized.columns]
print("MISSING features:", missing)

#keep only existing features
final_features = [c for c in final_features if c in df_categorized.columns]

final_features: ['bedrooms', 'bathrooms', 'square_feet', 'price_per_sqft', 'total_rooms', 'location_score', 'room_density', 'state_idx', 'has_photo_idx']
df_categorized columns: ['id', 'category', 'title', 'body', 'amenities', 'bathrooms', 'bedrooms', 'currency', 'fee', 'has_photo', 'pets_allowed', 'price', 'price_display', 'price_type', 'square_feet', 'address', 'cityname', 'state', 'latitude', 'longitude', 'source', 'time', 'price_num', 'price_per_sqft', 'total_rooms', 'location_score', 'room_density', 'price_category', 'price_label']
MISSING features: ['state_idx', 'has_photo_idx']


In [17]:
from pyspark.ml import Pipeline

# assuming `indexers` is your list of StringIndexers you built
idx_pipeline = Pipeline(stages=indexers)
df_idx = idx_pipeline.fit(df_categorized).transform(df_categorized)

df_feat_src = df_idx   # <- use this as the DF to assemble from

In [18]:
from pyspark.sql import functions as F

df_feat_src = df_feat_src.withColumn(
    "price_per_sqft",
    F.when((F.col("square_feet").isNull()) | (F.col("square_feet") <= 0), None)
     .otherwise(F.col("price") / F.col("square_feet"))
)

# (optional) convert booleans to doubles if present
bool_cols = [c for c in ["has_photo", "pets_allowed", "fee"] if c in df_feat_src.columns]
for c in bool_cols:
    df_feat_src = df_feat_src.withColumn(c, F.col(c).cast("double"))

Impute remaining nulls in numeric features

In [19]:
from pyspark.ml.feature import Imputer

num_feats = [c for c in final_features
             if c in df_feat_src.columns
             and dict(df_feat_src.dtypes)[c] in ("double","int","bigint","float","smallint","tinyint")]

imputer = Imputer(strategy="median", inputCols=num_feats, outputCols=num_feats)
df_feat_src = imputer.fit(df_feat_src).transform(df_feat_src)

Assembling and resplitting

In [20]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=final_features, outputCol="features")
df_ready = assembler.transform(df_feat_src)

label_col = "price_num" if "price_num" in df_ready.columns else "price"

train_data, test_data = df_ready.select("features", label_col).randomSplit([0.8, 0.2], seed=42)

train_data.printSchema()
print("Train rows:", train_data.count(), " Test rows:", test_data.count())

root
 |-- features: vector (nullable = true)
 |-- price_num: double (nullable = true)

Train rows: 8072  Test rows: 1928


Imports before creating Pipelines

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, VectorAssembler, StandardScaler

## Pipelines creation

In [22]:
def make_spark_pipeline(
    model_type: str,
    num_cols: list,
    cat_cols: list,
    label_col: str = "price_num",
    use_scaler_for_lr: bool = True,
    rf_params: dict = None,
    gbt_params: dict = None,
    lr_params: dict = None,
):
    stages = []

    #impute numeric columns
    if num_cols:
        imputer = Imputer(strategy="median", inputCols=num_cols, outputCols=num_cols)
        stages.append(imputer)

    #index and OneHot categorical columns
    indexers = []
    idx_names = []
    for c in cat_cols:
        idx = StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
        indexers.append(idx)
        idx_names.append(f"{c}_idx")
    if indexers:
        stages += indexers
        encoder = OneHotEncoder(
            inputCols=idx_names,
            outputCols=[f"{c}_oh" for c in cat_cols],
            handleInvalid="keep"
        )
        stages.append(encoder)
        cat_feat_cols = [f"{c}_oh" for c in cat_cols]
    else:
        cat_feat_cols = []

    #assembling features
    feat_cols = num_cols + cat_feat_cols
    assembler = VectorAssembler(inputCols=feat_cols, outputCol="features", handleInvalid="keep")
    stages.append(assembler)

In [23]:
lr  = LinearRegression(featuresCol="features", labelCol=LABEL_COL, predictionCol="prediction")
rf  = RandomForestRegressor(featuresCol="features", labelCol=LABEL_COL, predictionCol="prediction",
                            numTrees=200, maxDepth=12, seed=42)
gbt = GBTRegressor(featuresCol="features", labelCol=LABEL_COL, predictionCol="prediction",
                   maxIter=150, maxDepth=8, stepSize=0.1, seed=42)

In [24]:
pipe_lr  = Pipeline(stages=indexers + [imputer, assembler, LinearRegression])
pipe_rf  = Pipeline(stages=indexers + [imputer, assembler, RandomForestRegressor])
pipe_gbt = Pipeline(stages=indexers + [imputer, assembler, GBTRegressor])

pipes = {
    "LinearRegression": pipe_lr,
    "RandomForest":     pipe_rf,
    "GBT":              pipe_gbt,
}

## Linear Regression Model

In [28]:
lr_model = pipes["LinearRegression"].fit(train_data)

TypeError: Cannot recognize a pipeline stage of type <class 'abc.ABCMeta'>.