In [1]:
from pyspark.sql import SparkSession
from IPython.display import display, HTML
from pyspark.sql.functions import col, lpad

import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 8g pyspark-shell"

spark = SparkSession.builder \
    .appName("VSCodeTest") \
    .master("local[*]") \
    .getOrCreate()

print("✅ Spark session running in VS Code!")
print("Spark version:", spark.version)

✅ Spark session running in VS Code!
Spark version: 4.0.0


In [2]:
from pyspark.sql.functions import col, lpad

# Load the datasets
diamonds = (spark.read \
    .option("header", "true") \
    .csv("C:/Users/jverc/Downloads/diamonds.csv")
)


In [3]:
diamonds.createOrReplaceTempView("diamonds")

In [4]:
spark.table("diamonds").describe().show()

+-------+------------------+------------------+---------+-----+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|               _c0|             carat|      cut|color|clarity|             depth|             table|            price|                 x|                 y|                 z|
+-------+------------------+------------------+---------+-----+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|             53940|             53940|    53940|53940|  53940|             53940|             53940|            53940|             53940|             53940|             53940|
|   mean|           26970.5|0.7979397478679852|     NULL| NULL|   NULL| 61.74940489432624| 57.45718390804603|3932.799721913237| 5.731157211716609| 5.734525954764462|3.5387337782723316|
| stddev|15571.281096942537|0.4740112444054196|     NULL| NULL|   NULL|1.43

In [5]:
df_display = spark.sql("""
SELECT 
  CAST(carat AS DOUBLE) AS carat,
  CAST(cut AS STRING) AS cut,
  CAST(color AS STRING) AS color,
  CAST(clarity AS STRING) AS clarity,
  CAST(depth AS DOUBLE) AS depth,
  CAST(table AS DOUBLE) AS table,
  CAST(price AS DOUBLE) AS price,
  CAST(x AS DOUBLE) AS x,
  CAST(y AS DOUBLE) AS y,
  CAST(z AS DOUBLE) AS z
FROM diamonds
WHERE 
  carat IS NOT NULL AND
  CAST(x AS DOUBLE) != 0 AND
  CAST(y AS DOUBLE) != 0 AND
  CAST(z AS DOUBLE) != 0
""")

df_display.show()

df_display.createOrReplaceTempView("cleaned_diamonds")
spark.catalog.cacheTable("cleaned_diamonds")

+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|326.0|3.95|3.98|2.43|
| 0.21|  Premium|    E|    SI1| 59.8| 61.0|326.0|3.89|3.84|2.31|
| 0.23|     Good|    E|    VS1| 56.9| 65.0|327.0|4.05|4.07|2.31|
| 0.29|  Premium|    I|    VS2| 62.4| 58.0|334.0| 4.2|4.23|2.63|
| 0.31|     Good|    J|    SI2| 63.3| 58.0|335.0|4.34|4.35|2.75|
| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|336.0|3.94|3.96|2.48|
| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|336.0|3.95|3.98|2.47|
| 0.26|Very Good|    H|    SI1| 61.9| 55.0|337.0|4.07|4.11|2.53|
| 0.22|     Fair|    E|    VS2| 65.1| 61.0|337.0|3.87|3.78|2.49|
| 0.23|Very Good|    H|    VS1| 59.4| 61.0|338.0| 4.0|4.05|2.39|
|  0.3|     Good|    J|    SI1| 64.0| 55.0|339.0|4.25|4.28|2.73|
| 0.23|    Ideal|    J|    VS1| 62.8| 56.0|340.0|3.93| 3.9|2.46|
| 0.22|  Premium|    F|  

In [6]:
spark.table("cleaned_diamonds").describe().show()

+-------+-------------------+---------+-----+-------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|summary|              carat|      cut|color|clarity|             depth|             table|            price|                x|                 y|                 z|
+-------+-------------------+---------+-----+-------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|  count|              53920|    53920|53920|  53920|             53920|             53920|            53920|            53920|             53920|             53920|
|   mean| 0.7976982566765384|     NULL| NULL|   NULL|  61.7495140949547|57.456834198813105|3930.993230712166|5.731626854599296| 5.734887054896052|3.5400463649853404|
| stddev|0.47379529239063944|     NULL| NULL|   NULL|1.4323310748193272|2.2340642090636664|3987.280445975295|1.119422825791889|1.1401257950841792|0.7025303439301769|
|   

In [7]:
# Double Check for zero values in x, y, z
spark.table("cleaned_diamonds").filter("z = 0").count()

0

In [8]:
cleaned_diamonds = spark.sql("SELECT * FROM cleaned_diamonds")

In [9]:
from pyspark.sql.functions import col, when, log, expr

engineered_diamonds = (
    cleaned_diamonds
    # Create volume field as a product of dimensions
    .withColumn("volume", col("x") * col("y") * col("z"))

    # # Log transform price to help reduce skewness
    # .withColumn("log_price", log(col("price")))

    # Quality flag for high clarity and color
    .withColumn("is_high_quality", when(
        (col("clarity").isin("IF", "VVS1", "VVS2")) & 
        (col("color").isin("D", "E")), 1).otherwise(0))
)

engineered_diamonds.createOrReplaceTempView("engineered_diamonds")

In [10]:
high_quality = engineered_diamonds.filter("is_high_quality = 1").count()
low_quality = engineered_diamonds.filter("is_high_quality = 0").count()

print(f"High Quality Diamonds: {high_quality}")
print(f"Other Diamonds: {low_quality}")

High Quality Diamonds: 2682
Other Diamonds: 51238


In [11]:
engineered_diamonds.filter("is_high_quality = 1").show()

+-----+---------+-----+-------+-----+-----+------+----+----+----+------------------+---------------+
|carat|      cut|color|clarity|depth|table| price|   x|   y|   z|            volume|is_high_quality|
+-----+---------+-----+-------+-----+-----+------+----+----+----+------------------+---------------+
| 0.24|  Premium|    E|   VVS1| 60.7| 58.0| 553.0|4.01|4.03|2.44|         39.431132|              1|
| 0.24|Very Good|    D|   VVS1| 61.5| 60.0| 553.0|3.97| 4.0|2.45|38.906000000000006|              1|
| 0.26|Very Good|    E|   VVS2| 59.9| 58.0| 554.0|4.15|4.23|2.51| 44.06179500000001|              1|
| 0.26|Very Good|    D|   VVS2| 62.4| 54.0| 554.0|4.08|4.13|2.56|43.137024000000004|              1|
| 0.26|Very Good|    D|   VVS2| 62.8| 60.0| 554.0|4.01|4.05|2.53| 41.08846499999999|              1|
| 0.26|Very Good|    E|   VVS1| 62.6| 59.0| 554.0|4.06|4.09|2.55|          42.34377|              1|
| 0.26|Very Good|    E|   VVS1| 63.4| 59.0| 554.0| 4.0|4.04|2.55|            41.208|       

In [12]:
engineered_diamonds = engineered_diamonds.drop("x", "y", "z")

In [13]:
engineered_diamonds.describe().show()

+-------+-------------------+---------+-----+-------+------------------+------------------+-----------------+------------------+-------------------+
|summary|              carat|      cut|color|clarity|             depth|             table|            price|            volume|    is_high_quality|
+-------+-------------------+---------+-----+-------+------------------+------------------+-----------------+------------------+-------------------+
|  count|              53920|    53920|53920|  53920|             53920|             53920|            53920|             53920|              53920|
|   mean| 0.7976982566765384|     NULL| NULL|   NULL|  61.7495140949547|57.456834198813105|3930.993230712166| 129.8975670619965|0.04974035608308605|
| stddev|0.47379529239063944|     NULL| NULL|   NULL|1.4323310748193272|2.2340642090636664|3987.280445975295| 78.21978923213373| 0.2174100496198833|
|    min|                0.2|     Fair|    D|     I1|              43.0|              43.0|            326

In [14]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import StringIndexer

# Index categorical features
cut_indexer = StringIndexer(inputCol="cut", outputCol="cut_index")
color_indexer = StringIndexer(inputCol="color", outputCol="color_index")
clarity_indexer = StringIndexer(inputCol="clarity", outputCol="clarity_index")

# Transform the DataFrame with the indexers
indexed = cut_indexer.fit(engineered_diamonds).transform(engineered_diamonds)
indexed = color_indexer.fit(indexed).transform(indexed)
indexed = clarity_indexer.fit(indexed).transform(indexed)

# Save final DataFrame with indexed columns
mapping = indexed

indexed = indexed.drop("cut", "color", "clarity")
# indexed = indexed.withColumnRenamed("log_price", "price")
indexed.show()

+-----+-----+-----+-----+------------------+---------------+---------+-----------+-------------+
|carat|depth|table|price|            volume|is_high_quality|cut_index|color_index|clarity_index|
+-----+-----+-----+-----+------------------+---------------+---------+-----------+-------------+
| 0.23| 61.5| 55.0|326.0|          38.20203|              0|      0.0|        1.0|          2.0|
| 0.21| 59.8| 61.0|326.0|         34.505856|              0|      1.0|        1.0|          0.0|
| 0.23| 56.9| 65.0|327.0|         38.076885|              0|      3.0|        1.0|          3.0|
| 0.29| 62.4| 58.0|334.0|          46.72458|              0|      1.0|        5.0|          1.0|
| 0.31| 63.3| 58.0|335.0|51.917249999999996|              0|      3.0|        6.0|          2.0|
| 0.24| 62.8| 57.0|336.0|38.693951999999996|              0|      2.0|        6.0|          4.0|
| 0.24| 62.3| 57.0|336.0|38.830870000000004|              0|      2.0|        5.0|          5.0|
| 0.26| 61.9| 55.0|337.0|     

In [15]:
from pyspark.ml.feature import VectorAssembler

nonFeatureCols = [
    "price"
]
featureCols = [col for col in indexed.columns if col not in nonFeatureCols]

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(indexed)

In [16]:
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) # Why execute count here??
print(test.count())

37803
16117


In [17]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("price")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [5, 10])
  .addGrid(rfModel.numTrees, [20, 60])
  .build())
# Note, that this parameter grid will take a long time
# to run in the community edition due to limited number
# of workers available! Be patient for it to run!
# If you want it to run faster, remove some of
# the above parameters and it'll speed right up!

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("price")))

pipelineFitted = cv.fit(training)

In [18]:
print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

The Best Parameters:
--------------------
RandomForestRegressionModel: uid=RandomForestRegressor_c262131eff68, numTrees=20, numFeatures=8


{Param(parent='RandomForestRegressor_c262131eff68', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True,
 Param(parent='RandomForestRegressor_c262131eff68', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestRegressor_c262131eff68', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestRegressor_c262131eff68', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supporte

In [19]:
pipelineFitted.bestModel

PipelineModel_f7feee4cc687

In [20]:
holdout2 = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", 
              "double(round(prediction)) as prediction",
              "price ", 
              "cut_index",
              "round(abs((prediction - price) / price) * 100, 2) AS pct_off",
              "round(100 - (abs((prediction - price) / price) * 100), 2) AS pct_accuracy"
  ))
  
holdout2.show()

+------------------+----------+-----+---------+-------+------------+
|    raw_prediction|prediction|price|cut_index|pct_off|pct_accuracy|
+------------------+----------+-----+---------+-------+------------+
|509.89059955622287|     510.0|367.0|      0.0|  38.93|       61.07|
| 552.4290700127754|     552.0|367.0|      0.0|  50.53|       49.47|
| 542.0659199708288|     542.0|404.0|      1.0|  34.17|       65.83|
| 516.0763758267157|     516.0|470.0|      1.0|    9.8|        90.2|
|469.13064742024716|     469.0|327.0|      3.0|  43.47|       56.53|
| 563.3670676379622|     563.0|438.0|      2.0|  28.62|       71.38|
| 644.6659498107296|     645.0|550.0|      1.0|  17.21|       82.79|
| 549.3501843743887|     549.0|530.0|      2.0|   3.65|       96.35|
| 464.4631461342551|     464.0|373.0|      2.0|  24.52|       75.48|
| 483.4722606040126|     483.0|402.0|      2.0|  20.27|       79.73|
| 568.6253397117944|     569.0|530.0|      2.0|   7.29|       92.71|
| 570.3690150620312|     570.0|505

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate Mean Squared Error
evaluator = RegressionEvaluator(
    labelCol="price", 
    predictionCol="raw_prediction", 
    metricName="r2"
)
mse = evaluator.evaluate(holdout2)

mae = evaluator.setMetricName("mae").evaluate(holdout2)
rmse = evaluator.setMetricName("rmse").evaluate(holdout2)
r2 = evaluator.setMetricName("r2").evaluate(holdout2)

print(f"MSE: {mse}")
print(f"MAE: {mae}")
print(f"RMSE: {rmse}")
print(f"R^2: {r2}")


MSE: 0.9720007234928092
MAE: 352.8628793743668
RMSE: 665.9413324221837
R^2: 0.9720007234928092


In [22]:
from pyspark.sql.functions import col, abs

# Mean Absolute Percentage Error
mape_df = holdout2.withColumn(
    "pct_error", abs(col("raw_prediction") - col("price")) / col("price")
)

mape = mape_df.selectExpr("avg(pct_error)").first()[0]
print(f"MAPE: {mape * 100:.2f}%")

MAPE: 9.95%


In [23]:
holdout2.selectExpr("avg(pct_accuracy) as avg_prediction_accuracy").show()

+-----------------------+
|avg_prediction_accuracy|
+-----------------------+
|      90.04846125209383|
+-----------------------+



In [24]:
from pyspark.sql.functions import avg

cut_mapping = mapping.select("cut", "cut_index").distinct()
holdout2_with_cut = holdout2.join(cut_mapping, on="cut_index", how="left")

# Group by cut index and calculate average accuracy
holdout2_with_cut.groupBy("cut").agg(avg("pct_accuracy").alias("avg_accuracy")).show()

+---------+-----------------+
|      cut|     avg_accuracy|
+---------+-----------------+
|  Premium|90.00256478566219|
|    Ideal|89.89090993214067|
|     Good|89.83024561403501|
|     Fair|84.95818763326223|
|Very Good|91.13138781163435|
+---------+-----------------+



In [25]:
# spark.stop(