# **Diamond Price Prediction Using Apache Spark ML pipeline**

In [None]:
# Import the data
diamonds = (spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("diamonds.csv.gz"))

pd_diamonds = diamonds.toPandas()
pd_diamonds

Unnamed: 0,_c0,carat,cut,color,clarity,depth,table,price,x,y,z
0,1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
1,2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
2,3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
3,4,0.29,Premium,I,VS2,62.4,58.0,334,4.20,4.23,2.63
4,5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
...,...,...,...,...,...,...,...,...,...,...,...
53935,53936,0.72,Ideal,D,SI1,60.8,57.0,2757,5.75,5.76,3.50
53936,53937,0.72,Good,D,SI1,63.1,55.0,2757,5.69,5.75,3.61
53937,53938,0.70,Very Good,D,SI1,62.8,60.0,2757,5.66,5.68,3.56
53938,53939,0.86,Premium,H,SI2,61.0,58.0,2757,6.15,6.12,3.74


In [None]:
# Count number of rows
num_rows = diamonds.count()
print(f"Number of rows: {num_rows}")

Number of rows: 53940


In [None]:
# Counting nulls in each column
null_counts = diamonds.select([count(when(col(c).isNull(), c)).alias(c) for c in diamonds.columns])
null_counts.show()

+---+-----+---+-----+-------+-----+-----+-----+---+---+---+
|_c0|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+---+-----+---+-----+-------+-----+-----+-----+---+---+---+
|  0|    0|  0|    0|      0|    0|    0|    0|  0|  0|  0|
+---+-----+---+-----+-------+-----+-----+-----+---+---+---+



In [None]:
from pyspark.ml.feature import StringIndexer

# Convert string columns to indexed numeric columns
indexer = StringIndexer(inputCols=["cut", "color", "clarity"],
                        outputCols=["cut_index", "color_index", "clarity_index"])
diamonds_indexed = indexer.fit(diamonds).transform(diamonds)

diamonds_indexed.show()

+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+---------+-----------+-------------+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|cut_index|color_index|clarity_index|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+---------+-----------+-------------+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|      0.0|        1.0|          2.0|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|      1.0|        1.0|          0.0|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|      3.0|        1.0|          3.0|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|      1.0|        5.0|          1.0|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|      3.0|        6.0|          2.0|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|      2.0|        6.0|          4.0|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.

In [None]:
nonFeatureCols = ["price", "_c0", "cut", "color", "clarity"]
featureCols = [item for item in diamonds.columns if item not in nonFeatureCols]

# VectorAssembler Assembles all of these columns into one single vector.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(diamonds)

In [None]:
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) # Why execute count here??
print(test.count())

37897
16043


In [None]:
# Hyperparameter tuning

rfModel = (RandomForestRegressor()
  .setLabelCol("price")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [5, 10])
  .addGrid(rfModel.numTrees, [20, 60])
  .build())

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("price")))

pipelineFitted = cv.fit(training)

print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

The Best Parameters:
--------------------
RandomForestRegressionModel: uid=RandomForestRegressor_cb4365b9f4d9, numTrees=60, numFeatures=6


{Param(parent='RandomForestRegressor_cb4365b9f4d9', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True,
 Param(parent='RandomForestRegressor_cb4365b9f4d9', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestRegressor_cb4365b9f4d9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestRegressor_cb4365b9f4d9', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supporte

In [None]:
pipelineFitted.bestModel

holdout = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction",
    "double(round(prediction)) as rounded_prediction",
    "price",
    "abs(price - prediction) as absolute_difference"))

pd_holdout = holdout.toPandas()
pd_holdout

Unnamed: 0,raw_prediction,rounded_prediction,price,absolute_difference
0,535.921661,536.0,336,199.921661
1,537.335444,537.0,336,201.335444
2,503.245454,503.0,338,165.245454
3,542.260169,542.0,340,202.260169
4,508.477525,508.0,342,166.477525
...,...,...,...,...
16038,3096.301959,3096.0,2756,340.301959
16039,2789.318287,2789.0,2756,33.318287
16040,2499.162123,2499.0,2756,256.837877
16041,2662.291303,2662.0,2757,94.708697


In [None]:
# Calculate Mean Squared Error (MSE)
mse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="rounded_prediction", metricName="mse")
mse = mse_evaluator.evaluate(holdout)

# Calculate Root Mean Squared Error (RMSE)
rmse_evaluator = RegressionEvaluator(labelCol="price", predictionCol="rounded_prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(holdout)

# Calculate Mean Absolute Error (MAE)
mae_evaluator = RegressionEvaluator(labelCol="price", predictionCol="rounded_prediction", metricName="mae")
mae = mae_evaluator.evaluate(holdout)

# Calculate R-squared
r2_evaluator = RegressionEvaluator(labelCol="price", predictionCol="rounded_prediction", metricName="r2")
r2 = r2_evaluator.evaluate(holdout)

# Print the metrics
print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared: {r2}")

Mean Squared Error (MSE): 1826617.3869600452
Root Mean Squared Error (RMSE): 1351.524097809597
Mean Absolute Error (MAE): 775.1869974443682
R-squared: 0.88546265545127


**Overall, while the model fits well in general (as indicated by the high R-squared), the high error values (MSE, RMSE, MAE) suggest room for improvement in prediction accuracy.**