In [1]:
import pandas as pd

# Load the CSV file with low_memory=False
file_path = "../data/commericalnj.csv"
df2 = pd.read_csv(file_path, low_memory=False)

# Display the first few rows
df2.head()

# Print the available columns in the DataFrame
print("\nAvailable columns in the DataFrame:")
print(df2.columns)


Available columns in the DataFrame:
Index(['Municipality', 'Block', 'Lot', 'Qual', 'Property Location',
       'Property Class', 'Owner's Name', 'Owner's Mailing Address',
       'City/State/Zip', 'Sq. Ft.', 'Yr. Built', 'Building Class',
       'Prior Block', 'Prior Lot', 'Prior Qual', 'Updated', 'Zone', 'Account',
       'Mortgage Account', 'Bank Code', 'Sp Tax Cd', 'Sp Tax Cd.1',
       'Sp Tax Cd.2', 'Sp Tax Cd.3', 'Map Page', 'Additional Lots',
       'Land Desc', 'Building Desc', 'Class 4 Code', 'Acreage', 'EPL Own',
       'EPL Use', 'EPL Desc', 'EPL Statute', 'EPL Init', 'EPL Further',
       'EPL Facility Name', 'Taxes 1', 'Taxes 2', 'Taxes 3', 'Taxes 4',
       'Sale Date', 'Deed Book', 'Deed Page', 'Sale Price', 'NU Code', 'Ratio',
       'Type/Use', 'Year', 'Owner', 'Street', 'City/State/Zip.1',
       'Land Assmnt', 'Building Assmnt', 'Exempt', 'Total Assmnt', 'Assessed',
       'Year.1', 'Owner.1', 'Street.1', 'City/State/Zip.2', 'Land Assmnt.1',
       'Building Assmnt.


# Scalable Prototype for Commercial Property Price Prediction Using Spark

This notebook demonstrates how we scale our machine learning prototype to large datasets using Apache Spark. In this prototype, we use Spark ML to build a regression pipeline with a Gradient-Boosted Tree (GBT) model. We also explain the trade-offs of distributed computing and address common schema issues with CSV data.

---

## 1. Setup Spark and Load Data

We first install PySpark and create a SparkSession. We then load our CSV file (which contains our commercial property data) with the header and inferred schema.

```python
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sin, cos, lit, log1p, expr
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create a SparkSession
spark = SparkSession.builder.appName("CommercialPropertyScaling").getOrCreate()

# Adjust file path as needed
file_path = "../data/commericalnj.csv"
df_spark = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
```

---

## 2. Select Only the Needed Columns

Our CSV file has extra columns and slight differences in header names (for example, the header uses `Total Assmnt` rather than `Total Assmnt55`). To avoid schema mismatches and header warnings, we explicitly select only the columns needed for our model. We then rename columns that contain spaces or punctuation.

```python
# Select only the necessary columns from the CSV.
df_spark = df_spark.select(
    "Municipality", 
    "Block", 
    "Lot", 
    "Qual", 
    "Property Location", 
    "Property Class", 
    "Sq. Ft.", 
    "Yr. Built", 
    "Acreage", 
    "Total Assmnt",   # Select the header as it appears in the CSV
    "Taxes 1", 
    "Sale Date", 
    "Sale Price", 
    "Type/Use", 
    "Neigh", 
    "Latitude", 
    "Longitude"
)

# Rename problematic columns to remove spaces and punctuation.
df_spark = df_spark.withColumnRenamed("Sq. Ft.", "Sq_Ft") \
                   .withColumnRenamed("Yr. Built", "Yr_Built") \
                   .withColumnRenamed("Total Assmnt", "Total_Assmnt") \
                   .withColumnRenamed("Taxes 1", "Taxes_1")
```

*Note:* Although Spark warns that the header does not conform to the schema (because of extra columns or slight naming differences), explicitly selecting only the needed columns and renaming them ensures consistency throughout our code.

---

## 3. Data Cleaning & Feature Engineering

We now perform several data cleaning and feature engineering steps:

- **Filtering:** We remove transactions with a sale price below \$500K and remove extreme outliers (keeping only properties at or below the 95th percentile of sale price).
- **Date Features:** We convert the "Sale Date" column to a timestamp and extract the sale year and month. We then create cyclic features (sine and cosine) for the sale month to capture seasonality.
- **Building Age:** We calculate the building age using the "Yr_Built" column.
- **Target Variable:** We create a log-transformed target variable (`Log_Sale_Price`) to help stabilize variance.

```python
# Filter out transactions with a Sale Price below $500K.
df_spark = df_spark.filter(col("Sale Price") >= 500000)

# Remove extreme outliers: keep properties at or below the 95th percentile.
quantiles = df_spark.approxQuantile("Sale Price", [0.95], 0.05)
if quantiles:
    upper_threshold = quantiles[0]
    df_spark = df_spark.filter(col("Sale Price") <= upper_threshold)
else:
    print("Warning: No quantiles computed; check your 'Sale Price' column.")

# Convert Sale Date to timestamp and extract date features.
df_spark = df_spark.withColumn("Sale_Date", col("Sale Date").cast("timestamp"))
df_spark = df_spark.withColumn("Sale_Year", year(col("Sale_Date")))
df_spark = df_spark.withColumn("Sale_Month", month(col("Sale_Date")))
# Create cyclic features for Sale Month.
df_spark = df_spark.withColumn("Sale_Month_Sine", sin(2 * 3.14159 * col("Sale_Month") / 12))
df_spark = df_spark.withColumn("Sale_Month_Cosine", cos(2 * 3.14159 * col("Sale_Month") / 12))

# Compute Building Age using the renamed "Yr_Built" column.
current_year = 2023
df_spark = df_spark.withColumn("Building_Age", lit(current_year) - col("Yr_Built"))

# Create the log-transformed target variable.
df_spark = df_spark.withColumn("Log_Sale_Price", log1p(col("Sale Price")))
```

---

## 4. Feature Selection & Categorical Indexing

We define our feature lists:

- **Numerical features:** Use the renamed columns.
- **Categorical features:** We use columns such as "Municipality", "Property Class", "Type/Use", and "Neigh".

We then create a pipeline for encoding the categorical features and assembling all features into a single vector. We set `handleInvalid="skip"` in the `VectorAssembler` to drop any rows that have null values in the feature columns.

```python
# Define numerical features.
numerical_cols = ["Sq_Ft", "Acreage", "Building_Age", "Sale_Year", 
                  "Sale_Month_Sine", "Sale_Month_Cosine", "Latitude", "Longitude", "Total_Assmnt", "Taxes_1"]

# Define categorical features.
categorical_cols = ["Municipality", "Property Class", "Type/Use", "Neigh"]

# Create StringIndexers for categorical features.
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index", handleInvalid="skip") for col in categorical_cols]

# Create OneHotEncoders for the indexed categorical features.
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_OHE") for indexer in indexers]

# Assemble all features into a single vector.
assembler_inputs = numerical_cols + [encoder.getOutputCol() for encoder in encoders]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="skip")
```

---

## 5. Define the Regression Model

We use Spark’s Gradient-Boosted Tree Regressor (GBTRegressor) to predict the log-transformed sale price.

```python
gbt = GBTRegressor(featuresCol="features", labelCol="Log_Sale_Price", maxIter=100)
```

---

## 6. Create the Pipeline

We build a pipeline that includes the indexing, encoding, feature assembling, and regression model.

```python
pipeline = Pipeline(stages=indexers + encoders + [assembler, gbt])
```

---

## 7. Split the Data into Training and Test Sets

We split our data with an 80/20 ratio.

```python
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)
```

---

## 8. Train the Model

We train our pipeline on the training data.

```python
model = pipeline.fit(train_data)
```

---

## 9. Evaluate the Model

We evaluate our model by computing the RMSE on the log scale and then converting predictions back to the dollar scale. We also compute the R² value.

```python
# Generate predictions on the test data.
predictions = model.transform(test_data)

# Evaluate RMSE in log scale.
evaluator_log = RegressionEvaluator(labelCol="Log_Sale_Price", predictionCol="prediction", metricName="rmse")
rmse_log = evaluator_log.evaluate(predictions)
print("Test RMSE (log scale):", rmse_log)

# Convert predictions back to dollar scale.
predictions = predictions.withColumn("Predicted_Sale_Price", expr("exp(prediction) - 1"))
predictions = predictions.withColumn("Actual_Sale_Price", col("Sale Price"))
evaluator_dollar = RegressionEvaluator(labelCol="Actual_Sale_Price", predictionCol="Predicted_Sale_Price", metricName="rmse")
rmse_dollar = evaluator_dollar.evaluate(predictions)
print("Test RMSE (dollar scale):", rmse_dollar)

# Optionally, compute R² on the log scale.
evaluator_r2 = RegressionEvaluator(labelCol="Log_Sale_Price", predictionCol="prediction", metricName="r2")
r2_log = evaluator_r2.evaluate(predictions)
print("Test R² (log scale):", r2_log)
```

---

## 10. Trade-offs and Discussion

In your project documentation, explain the following trade-offs:
- **Scalability:**  
  Spark enables processing of billions of rows via distributed computing. However, it introduces overhead (e.g., data partitioning and shuffling) compared to a single-machine approach.
- **Schema Challenges:**  
  Real-world CSV files often have inconsistent headers. We explicitly selected and renamed columns to ensure a consistent schema.
- **Algorithm Choice:**  
  We selected the GBTRegressor from Spark ML because it can scale across large datasets, even though tuning options may differ from those available in single-machine libraries.

---

## 11. Stop the SparkSession

Always stop your SparkSession at the end of your processing to release resources.

```python
spark.stop()
```

---



In [6]:
# Set JAVA_HOME and update PATH so Spark can find the Java executable.
import os
os.environ["JAVA_HOME"] = "/opt/conda"  # Use /opt/conda since 'which java' returns /opt/conda/bin/java
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
print("JAVA_HOME is set to:", os.environ.get("JAVA_HOME"))
!which java
!java -version

# Install pyspark if needed
!pip install pyspark

# Import required libraries from pyspark and Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sin, cos, lit, log1p, expr
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create a SparkSession
spark = SparkSession.builder.appName("CommercialPropertyScaling").getOrCreate()

# Adjust file path as needed and load the CSV file
file_path = "../data/commericalnj.csv"
df_spark = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)

# Rename problematic columns to remove spaces and punctuation
df_spark = df_spark.withColumnRenamed("Sq. Ft.", "Sq_Ft") \
                   .withColumnRenamed("Yr. Built", "Yr_Built") \
                   .withColumnRenamed("Total Assmnt55", "Total_Assmnt") \
                   .withColumnRenamed("Taxes 1", "Taxes_1")

# Select only the needed columns using the renamed names
df_spark = df_spark.select(
    "Municipality", 
    "Block", 
    "Lot", 
    "Qual", 
    "Property Location", 
    "Property Class", 
    "Sq_Ft", 
    "Yr_Built", 
    "Acreage", 
    "Total_Assmnt", 
    "Taxes_1", 
    "Sale Date", 
    "Sale Price", 
    "Type/Use", 
    "Neigh", 
    "Latitude", 
    "Longitude"
)

# Data Cleaning & Feature Engineering
# Filter out transactions with a Sale Price below $500K.
df_spark = df_spark.filter(col("Sale Price") >= 500000)

# Remove extreme outliers: keep properties at or below the 95th percentile.
quantiles = df_spark.approxQuantile("Sale Price", [0.95], 0.05)
if quantiles:
    upper_threshold = quantiles[0]
    df_spark = df_spark.filter(col("Sale Price") <= upper_threshold)
else:
    print("Warning: No quantiles computed; check your 'Sale Price' column.")

# Convert 'Sale Date' to timestamp and extract year and month.
df_spark = df_spark.withColumn("Sale_Date", col("Sale Date").cast("timestamp"))
df_spark = df_spark.withColumn("Sale_Year", year(col("Sale_Date")))
df_spark = df_spark.withColumn("Sale_Month", month(col("Sale_Date")))
# Create cyclic features for the sale month.
df_spark = df_spark.withColumn("Sale_Month_Sine", sin(2 * 3.14159 * col("Sale_Month") / 12))
df_spark = df_spark.withColumn("Sale_Month_Cosine", cos(2 * 3.14159 * col("Sale_Month") / 12))

# Compute Building Age using the renamed "Yr_Built" column.
current_year = 2023  # Adjust programmatically if needed
df_spark = df_spark.withColumn("Building_Age", lit(current_year) - col("Yr_Built"))

# Create the log-transformed target variable for stability.
df_spark = df_spark.withColumn("Log_Sale_Price", log1p(col("Sale Price")))

# Feature Selection & Categorical Indexing
# Define numerical and categorical columns.
numerical_cols = ["Sq_Ft", "Acreage", "Building_Age", "Sale_Year", 
                  "Sale_Month_Sine", "Sale_Month_Cosine", "Latitude", "Longitude", "Total_Assmnt", "Taxes_1"]
categorical_cols = ["Municipality", "Property Class", "Type/Use", "Neigh"]

# Create StringIndexers for categorical features.
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index", handleInvalid="skip") for col in categorical_cols]
# Create OneHotEncoders for indexed features.
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_OHE") for indexer in indexers]

# Assemble all features into a single vector.
assembler_inputs = numerical_cols + [encoder.getOutputCol() for encoder in encoders]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="skip")

# Define the regression model using Spark's GBTRegressor.
gbt = GBTRegressor(featuresCol="features", labelCol="Log_Sale_Price", maxIter=100)

# Create the pipeline by combining all stages.
pipeline = Pipeline(stages=indexers + encoders + [assembler, gbt])

# Split the data into training and testing sets.
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)

# Train the model.
model = pipeline.fit(train_data)

# Evaluate the model.
predictions = model.transform(test_data)
evaluator_log = RegressionEvaluator(labelCol="Log_Sale_Price", predictionCol="prediction", metricName="rmse")
rmse_log = evaluator_log.evaluate(predictions)
print("Test RMSE (log scale):", rmse_log)

# Convert predictions back to the original sale price scale.
predictions = predictions.withColumn("Predicted_Sale_Price", expr("exp(prediction) - 1"))
predictions = predictions.withColumn("Actual_Sale_Price", col("Sale Price"))
evaluator_dollar = RegressionEvaluator(labelCol="Actual_Sale_Price", predictionCol="Predicted_Sale_Price", metricName="rmse")
rmse_dollar = evaluator_dollar.evaluate(predictions)
print("Test RMSE (dollar scale):", rmse_dollar)

# Optionally, compute R² on the log scale.
evaluator_r2 = RegressionEvaluator(labelCol="Log_Sale_Price", predictionCol="prediction", metricName="r2")
r2_log = evaluator_r2.evaluate(predictions)
print("Test R² (log scale):", r2_log)

# Trade-offs and Discussion (this section is printed to explain your design choices)
print("\nTrade-offs and Discussion:")
print(" - Distributed computing with Spark enables processing large datasets but introduces overhead from data partitioning.")
print(" - The chosen GBTRegressor, while scalable, may have tuning trade-offs compared to single-machine algorithms.")
print(" - Explicit column renaming and feature engineering help maintain schema consistency.")

# Stop the SparkSession to free resources.
spark.stop()


JAVA_HOME is set to: /opt/conda
/opt/conda/bin/java
openjdk version "11.0.26-internal" 2025-01-21
OpenJDK Runtime Environment (build 11.0.26-internal+0-adhoc..src)
OpenJDK 64-Bit Server VM (build 11.0.26-internal+0-adhoc..src, mixed mode)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/20 23:31:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/20 23:31:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Municipality, Block, Lot, Qual, Property Location, Property Class, Sq. Ft., Yr. Built, Acreage, Taxes 1, Sale Date, Sale Price, Type/Use, Total Assmnt, Latitude, Longitude, Neigh
 Schema: Municipality, Block, Lot, Qual, Property Location, Property Class, Sq. Ft., Yr. Built, Acreage, Taxes 1, Sale Date, Sale Price, Type/Use, Total Assmnt55, Latitude, Longitude, Neigh
Expected: Total Assmnt55 but found: Total Assmnt
CSV file: file:///home/sagemaker-user/realestate-tool-final/data/commericalnj.csv
25/02/20 23:31:38 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Municipality, Block, Lot, Qual, Property 

Test RMSE (log scale): 0.5123740278419254


25/02/20 23:33:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Municipality, Block, Lot, Qual, Property Location, Property Class, Sq. Ft., Yr. Built, Acreage, Taxes 1, Sale Date, Sale Price, Type/Use, Total Assmnt, Latitude, Longitude, Neigh
 Schema: Municipality, Block, Lot, Qual, Property Location, Property Class, Sq. Ft., Yr. Built, Acreage, Taxes 1, Sale Date, Sale Price, Type/Use, Total Assmnt55, Latitude, Longitude, Neigh
Expected: Total Assmnt55 but found: Total Assmnt
CSV file: file:///home/sagemaker-user/realestate-tool-final/data/commericalnj.csv
[Stage 1020:>                                                       (0 + 2) / 2]

Test RMSE (dollar scale): 3643790.7587949852


25/02/20 23:33:38 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Municipality, Block, Lot, Qual, Property Location, Property Class, Sq. Ft., Yr. Built, Acreage, Taxes 1, Sale Date, Sale Price, Type/Use, Total Assmnt, Latitude, Longitude, Neigh
 Schema: Municipality, Block, Lot, Qual, Property Location, Property Class, Sq. Ft., Yr. Built, Acreage, Taxes 1, Sale Date, Sale Price, Type/Use, Total Assmnt55, Latitude, Longitude, Neigh
Expected: Total Assmnt55 but found: Total Assmnt
CSV file: file:///home/sagemaker-user/realestate-tool-final/data/commericalnj.csv
                                                                                

Test R² (log scale): 0.6477173695635902

Trade-offs and Discussion:
 - Distributed computing with Spark enables processing large datasets but introduces overhead from data partitioning.
 - The chosen GBTRegressor, while scalable, may have tuning trade-offs compared to single-machine algorithms.
 - Explicit column renaming and feature engineering help maintain schema consistency.
