Question: Can we use the predicted turbidity levels to determine the optimal chemical dosage (chlorine and alum) required for efficient treatment of water at the plant?

# 1. XGBoost

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m26.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=3355d19a57bb5d62a04a8ede63d57903ae41436e7e5f97c3e5c386323d6ad62f
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

### Predicting Chlorine

Reads a CSV file as a Spark DataFrame, drops rows with missing or zero values, and splits the data into training and test sets. 

In [22]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from xgboost.spark import SparkXGBRegressor
from pyspark.sql import SparkSession
from sklearn.metrics import r2_score

spark = SparkSession.builder.getOrCreate()


sparkDF = spark.read.csv("/content/data.csv", header=True, inferSchema=True)
sparkDF = sparkDF.select(sparkDF.columns[2:4]).dropna()

# Remove rows with missing or zero values in the turbidity_raw column
sparkDF = sparkDF.dropna(subset=["turbidity"]).filter(col("turbidity") != 0)
sparkDF = sparkDF.dropna(subset=["chlorine"]).filter(col("chlorine") != 0)
# Split the data into training and test sets (70% training, 30% test)

(train_cl, test_cl) = sparkDF.randomSplit([0.8, 0.2], seed = 123)

sparkDF.show()
sparkDF.dtypes

+---------+------------------+
|turbidity|          chlorine|
+---------+------------------+
|    0.401|2.8764551519644184|
|    0.374| 4.180292307692309|
|    0.361|3.1440257723955907|
|    0.351| 2.930638991845812|
|    0.339| 2.828991513437058|
|    0.374|3.6329153225806454|
|    0.373| 2.846970024721879|
|    0.369|3.8199298752191404|
|    0.334| 2.640179351921628|
|    0.336|2.9609897974722093|
|    0.368|3.8589270799871502|
|    0.321|2.4871304347826086|
|    0.307| 2.693291314373559|
|    0.306|2.7784936234058515|
|    0.289| 2.519682684973303|
|    0.279|2.6954764397905757|
|    0.254|2.7801233328552994|
|    0.347| 2.697494011976048|
|    0.346| 2.768611695086818|
|    0.371| 2.818426525998493|
+---------+------------------+
only showing top 20 rows



[('turbidity', 'double'), ('chlorine', 'double')]

Uses PySpark to train a SparkXGBRegressor model to predict the chlorine dosage based on the turbidity values in the dataset. Assembles the training and testing data into feature vectors using VectorAssembler. It then fits the model to the training data, predicts the chlorine values on the test data, and evaluates the model using the RMSE metric.

In [23]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import stddev

assembler = VectorAssembler().setInputCols(['turbidity']).setOutputCol('features')
assembledTrainingData = assembler.transform(train_cl)

# create a xgboost pyspark regressor estimator and set use_gpu=True
regressor = SparkXGBRegressor(
  features_col="features",
  label_col="chlorine",
  num_workers=2,
)
# train and return the model
model = regressor.fit(assembledTrainingData)

# predict on test data
assembledTestingData = assembler.transform(test_cl)

predict_df = model.transform(assembledTestingData)

# Evaluate the model using RMSE for the target variable Chemical Dosages_Chlorine
evaluatorChlorine = RegressionEvaluator(labelCol="chlorine", predictionCol="prediction", metricName="rmse")
rmseChlorine = evaluatorChlorine.evaluate(predict_df)
evaluatorChlorine = RegressionEvaluator(labelCol="chlorine", predictionCol="prediction", metricName="r2")
r2Chlorine = evaluatorChlorine.evaluate(predict_df)
print("RMSE for Chemical Dosages_Chlorine: ", rmseChlorine)
print("Standard Deviation for Chemical Dosages_Chlorine: ", predict_df.agg(stddev("chlorine")).collect()[0][0])
print("R2 Score for Chemical Dosages_Chlorine: ", r2Chlorine)




RMSE for Chemical Dosages_Chlorine:  0.31906414236447145
Standard Deviation for Chemical Dosages_Chlorine:  0.3143491900859291
R2 Score for Chemical Dosages_Chlorine:  -0.03474167844625997


### Predicting Alum

Reads turbidity and alum columns from CSV file containing data, and removes rows with missing or zero values in either column. It then splits the remaining data into training and test sets using an 80-20 split.

In [24]:
sparkDF = spark.read.csv("/content/data.csv", header=True, inferSchema=True)
sparkDF = sparkDF.select(["turbidity", "alum"]).dropna()

# Remove rows with missing or zero values in the turbidity_raw column
sparkDF = sparkDF.dropna(subset=["turbidity"]).filter(col("turbidity") != 0)
sparkDF = sparkDF.dropna(subset=["alum"]).filter(col("alum") != 0)
# Split the data into training and test sets (70% training, 30% test)

(train_al, test_al) = sparkDF.randomSplit([0.8, 0.2], seed = 123)

Using Spark's VectorAssembler to assemble the features into a vector for the SparkXGBRegressor estimator to use. The estimator is then trained on the assembled training data, and the resulting model is used to predict on the assembled testing data. Finally, the RMSE is computed for the predicted and actual values of the target variable "alum".

In [26]:
assembler = VectorAssembler().setInputCols(['turbidity']).setOutputCol('features')
assembledTrainingData = assembler.transform(train_al)

# create a xgboost pyspark regressor estimator and set use_gpu=True
regressor = SparkXGBRegressor(
  features_col="features",
  label_col="alum",
  num_workers=2,
)
# train and return the model
model = regressor.fit(assembledTrainingData)

# predict on test data
assembledTestingData = assembler.transform(test_al)

predict_df = model.transform(assembledTestingData)

# Evaluate the model using RMSE for the target variable Chemical Dosages_Chlorine
evaluatorAlum = RegressionEvaluator(labelCol="alum", predictionCol="prediction", metricName="rmse")
rmseAlum = evaluatorAlum.evaluate(predict_df)
evaluatorAlum = RegressionEvaluator(labelCol="alum", predictionCol="prediction", metricName="r2")
r2Alum = evaluatorAlum.evaluate(predict_df)
print("RMSE for Chemical Dosages_Alum: ", rmseAlum)
print("Standard Deviation for Chemical Dosages_Alum: ", predict_df.agg(stddev("alum")).collect()[0][0])
print("R2 Score for Chemical Dosages_Alum: ", r2Alum)



RMSE for Chemical Dosages_Alum:  2.8535596692548126
Standard Deviation for Chemical Dosages_Alum:  2.7065028138676683
R2 Score for Chemical Dosages_Alum:  -0.11631191880055902


### Predicting Chlorine and Alum with XGBoost + k-fold

Fits a SparkXGBRegressor to predict the target variables "chlorine" and "alum" separately, using k-fold cross-validation. The code evaluates the model's performance on the test data by calculating the root mean squared error (RMSE) for each target variable separately. 

In [27]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder.appName("RandomForestRegressor").getOrCreate()

# Define the feature and target columns
featuresCol = "features"
targetCol_Chlorine = "chlorine"
targetCol_Alum = "alum"

# Create a VectorAssembler to combine the features into a single vector column
assembler = VectorAssembler().setInputCols(['turbidity']).setOutputCol('features')


# Fit the VectorAssembler to the training data
assembledTrainingData = assembler.transform(train_cl)

# Train a SparkXGBRegressor on the training data using k-fold cross-validation
regressor = SparkXGBRegressor(
  features_col=featuresCol,
  label_col=targetCol_Chlorine,
  num_workers=2,
)
paramGrid = ParamGridBuilder().addGrid(regressor.max_depth, [3, 6, 9]).build()
crossval = CrossValidator(estimator=regressor, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol=targetCol_Chlorine, predictionCol="prediction", metricName="rmse"), numFolds=3)
model = crossval.fit(assembledTrainingData)

# Apply the trained model to the test data
assembledTestData = assembler.transform(test_cl)
predictions = model.transform(assembledTestData)

# Evaluate the model using RMSE for Chemical Dosages_Chlorine
evaluator_Chlorine = RegressionEvaluator(labelCol=targetCol_Chlorine, predictionCol="prediction", metricName="rmse")
rmse_Chlorine = evaluator_Chlorine.evaluate(predictions)
evaluator_Chlorine = RegressionEvaluator(labelCol=targetCol_Chlorine, predictionCol="prediction", metricName="r2")
r2_Chlorine = evaluator_Chlorine.evaluate(predictions)
print("RMSE for Chemical Dosages_Chlorine: ", rmse_Chlorine)
print("Standard Deviation for Chemical Dosages_Chlorine: ", predictions.agg(stddev("chlorine")).collect()[0][0])
print("R2 Score for Chemical Dosages_Chlorine: ", r2_Chlorine)

# Train a RandomForestRegressor on the training data using k-fold cross-validation
assembledTrainingData = assembler.transform(train_al)
regressor = SparkXGBRegressor(
  features_col=featuresCol,
  label_col=targetCol_Alum,
  num_workers=2,
)
paramGrid = ParamGridBuilder().addGrid(regressor.max_depth, [3, 6, 9]).build()
crossval = CrossValidator(estimator=regressor, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol=targetCol_Alum, predictionCol="prediction", metricName="rmse"), numFolds=3)
model = crossval.fit(assembledTrainingData)

# Apply the trained model to the test data
assembledTestData = assembler.transform(test_al)
predictions = model.transform(assembledTestData)

# Evaluate the model using RMSE for Chemical Dosages_Alum
evaluator_Alum = RegressionEvaluator(labelCol=targetCol_Alum, predictionCol="prediction", metricName="rmse")
rmse_Alum = evaluator_Alum.evaluate(predictions)
evaluator_Alum = RegressionEvaluator(labelCol=targetCol_Alum, predictionCol="prediction", metricName="r2")
r2_Alum = evaluator_Alum.evaluate(predictions)
print()
print("RMSE for Chemical Dosages_Alum: ", rmse_Alum)
print("Standard Deviation for Chemical Dosages_Alum: ", predictions.agg(stddev("alum")).collect()[0][0])
print("R2 Score for Chemical Dosages_Alum: ", r2_Alum)


RMSE for Chemical Dosages_Chlorine:  0.30633394977368344
Standard Deviation for Chemical Dosages_Chlorine:  0.3143491900859291
R2 Score for Chemical Dosages_Chlorine:  0.04618048217202242

RMSE for Chemical Dosages_Alum:  2.7802192371476444
Standard Deviation for Chemical Dosages_Alum:  2.7065028138676683
R2 Score for Chemical Dosages_Alum:  -0.05966778904424408


# RandomForestRegressor

**Predicting Chemical Dosage for chlorine and Alum using Random Forest Regressor and evaluating model using RMSE**

Performs a regression analysis using Random Forest Regressor algorithm on the dataset to predict the Chemical Dosages of Chlorine and Alum. It splits the dataset into training and test sets, fits the model on the training set, and evaluates the model's performance on the test set using RMSE.

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import stddev
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("RandomForestRegressor").getOrCreate()
data = spark.read.csv("/content/data.csv", header=True, inferSchema=True).dropna()
# Convert Turbidity_Raw column to DoubleType
data = data.withColumn("turbidity", col("turbidity").cast(DoubleType()))
# Remove rows with missing or zero values in the turbidity_raw column
data = data.dropna(subset=["turbidity"]).filter(col("turbidity") != 0)
# Split the data into training and test sets (70% training, 30% test)
(trainingData, testData) = data.randomSplit([0.8, 0.2])
# Define the feature and target columns
featuresCol = ["turbidity"]
targetColChlorine = "chlorine"
targetColAlum = "alum"
# Create a VectorAssembler to combine the features into a single vector column
assembler = VectorAssembler(inputCols=featuresCol, outputCol="features")
# Fit the VectorAssembler to the training data
assembledTrainingData = assembler.transform(trainingData)
# Train a RandomForestRegressor on the training data for target column Chemical Dosages_Chlorine
rfChlorine = RandomForestRegressor(featuresCol="features", labelCol=targetColChlorine, numTrees=60, maxDepth=5)
modelChlorine = rfChlorine.fit(assembledTrainingData)
# Apply the trained model to the test data
assembledTestData = assembler.transform(testData)
# Predict for the target variable Chemical Dosages_Chlorine using the trained model
predictionsChlorine = modelChlorine.transform(assembledTestData)
# Evaluate the model using RMSE for the target variable Chemical Dosages_Chlorine
evaluatorChlorine = RegressionEvaluator(labelCol=targetColChlorine, predictionCol="prediction", metricName="rmse")
rmseChlorine = evaluatorChlorine.evaluate(predictionsChlorine)
print("RMSE for Chemical Dosages_Chlorine: ", rmseChlorine)

# Train a RandomForestRegressor on the training data for target column Chemical Dosages_Alum
rfAlum = RandomForestRegressor(featuresCol="features", labelCol=targetColAlum, numTrees=60, maxDepth=5)
modelAlum = rfAlum.fit(assembledTrainingData)
# Predict for the target variable Chemical Dosages_Alum using the trained model
predictionsAlum = modelAlum.transform(assembledTestData)
# Evaluate the model using RMSE for the target variable Chemical Dosages_Alum
evaluatorAlum = RegressionEvaluator(labelCol=targetColAlum, predictionCol="prediction", metricName="rmse")
rmseAlum = evaluatorAlum.evaluate(predictionsAlum)
print("RMSE for Chemical Dosages_Alum: ", rmseAlum)

stdDevChlorine = testData.select(targetColChlorine).rdd.map(lambda x: x[0]).stdev()
stdDevAlum = testData.select(targetColAlum).rdd.map(lambda x: x[0]).stdev()

#print("RMSE for Chemical Dosages_Chlorine: ", rmseChlorine)
print("Standard deviation for Chemical Dosages_Chlorine: ", stdDevChlorine)
# if rmseChlorine < stdDevChlorine:
#     print("The model for Chemical Dosages_Chlorine is performing well as RMSE is less than the standard deviation.")
# else:
#     print("The model for Chemical Dosages_Chlorine is not performing well as RMSE is greater than or equal to the standard deviation.")

#print("RMSE for Chemical Dosages_Alum: ", rmseAlum)
print("Standard deviation for Chemical Dosages_Alum: ", stdDevAlum)
# if rmseAlum < stdDevAlum:
#     print("The model for Chemical Dosages_Alum is performing well as RMSE is less than the standard deviation.")
# else:
#     print("The model for Chemical Dosages_Alum is not performing well as RMSE is greater than or equal to the standard deviation.")
evaluatorChlorine = RegressionEvaluator(labelCol=targetColChlorine, predictionCol="prediction", metricName="r2")
r2Chlorine = evaluatorChlorine.evaluate(predictionsChlorine)
print("R2 score for Chemical Dosages_Chlorine: ", r2Chlorine)
evaluatorAlum = RegressionEvaluator(labelCol=targetColAlum, predictionCol="prediction", metricName="r2")
r2Alum = evaluatorAlum.evaluate(predictionsAlum)
print("R2 score for Chemical Dosages_Alum: ", r2Alum)


RMSE for Chemical Dosages_Chlorine:  0.2872971110811574
RMSE for Chemical Dosages_Alum:  4.344632352246219
Standard deviation for Chemical Dosages_Chlorine:  0.30792591039179934
Standard deviation for Chemical Dosages_Alum:  4.374064473496737
R2 score for Chemical Dosages_Chlorine:  0.1294974458577488
R2 score for Chemical Dosages_Alum:  0.0134122852012728


**Predicting Chemical Dosage for chlorine using Random Forest Regressor and evaluating model using K Fold Cross Validation**

**K Fold Cross Validation Technique**

Trains two Random Forest Regressor models to predict the chemical dosages of chlorine and alum in water treatment based on the turbidity level. The models are trained using k-fold cross-validation and their performance is evaluated using RMSE on a test dataset. The best hyperparameters for each model are selected using a grid search.

In [32]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import stddev

spark = SparkSession.builder.appName("RandomForestRegressor").getOrCreate()
data = spark.read.csv("/content/data.csv", header=True, inferSchema=True).dropna()

# Convert Turbidity_Raw column to DoubleType
data = data.withColumn("turbidity", col("turbidity").cast(DoubleType()))

# Remove rows with missing or zero values in the turbidity_raw column
data = data.dropna(subset=["turbidity"]).filter(col("turbidity") != 0)

# Split the data into training and test sets (70% training, 30% test)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Define the feature and target columns
featuresCol = ["turbidity"]
targetCol_Chlorine = "chlorine"
targetCol_Alum = "alum"

# Create a VectorAssembler to combine the features into a single vector column
assembler = VectorAssembler(inputCols=featuresCol, outputCol="features")

# Fit the VectorAssembler to the training data
assembledTrainingData = assembler.transform(trainingData)

# Train a RandomForestRegressor on the training data using k-fold cross-validation
rf = RandomForestRegressor(featuresCol="features", labelCol=targetCol_Chlorine, numTrees=60, maxDepth=5)
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [30, 60, 90]).addGrid(rf.maxDepth, [3, 5, 7]).build()
crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol=targetCol_Chlorine, predictionCol="prediction", metricName="rmse"), numFolds=3)
model = crossval.fit(assembledTrainingData)

# Apply the trained model to the test data
assembledTestData = assembler.transform(testData)
predictions = model.transform(assembledTestData)

# Evaluate the model using RMSE for Chemical Dosages_Chlorine
evaluator_Chlorine = RegressionEvaluator(labelCol=targetCol_Chlorine, predictionCol="prediction", metricName="rmse")
rmse_Chlorine = evaluator_Chlorine.evaluate(predictions)
evaluator_Chlorine = RegressionEvaluator(labelCol=targetCol_Chlorine, predictionCol="prediction", metricName="r2")
r2_Chlorine = evaluator_Chlorine.evaluate(predictions)
print("RMSE for Chemical Dosages_Chlorine: ", rmse_Chlorine)

# Train a RandomForestRegressor on the training data using k-fold cross-validation
rf = RandomForestRegressor(featuresCol="features", labelCol=targetCol_Alum, numTrees=100, maxDepth=5)
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [30, 60, 90]).addGrid(rf.maxDepth, [3, 5, 7]).build()
crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol=targetCol_Alum, predictionCol="prediction", metricName="rmse"), numFolds=3)
model = crossval.fit(assembledTrainingData)

# Apply the trained model to the test data
assembledTestData = assembler.transform(testData)
predictions = model.transform(assembledTestData)

# Evaluate the model using RMSE for Chemical Dosages_Alum
evaluator_Alum = RegressionEvaluator(labelCol=targetCol_Alum, predictionCol="prediction", metricName="rmse")
rmse_Alum = evaluator_Alum.evaluate(predictions)
print("RMSE for Chemical Dosages_Alum: ", rmse_Alum)

# Calculate the standard deviation of the target column in the test data
stddev_Chlorine = testData.agg(stddev(targetCol_Chlorine)).collect()[0][0]
stddev_Alum = testData.agg(stddev(targetCol_Alum)).collect()[0][0]

print("Standard deviation for Chemical Dosages_Chlorine: ", stddev_Chlorine)
print("Standard deviation for Chemical Dosages_Alum: ", stddev_Alum)

print("R2 score for Chemical Dosages_Chlorine: ", r2_Chlorine)

evaluator_Alum = RegressionEvaluator(labelCol=targetCol_Alum, predictionCol="prediction", metricName="r2")
r2_Alum = evaluator_Alum.evaluate(predictions)
print("R2 score for Chemical Dosages_Alum: ", r2_Alum)

# Compare the RMSE values with the standard deviation
# if rmse_Chlorine < stddev_Chlorine:
#     print("RMSE for Chemical Dosages_Chlorine is less than the standard deviation.")
# else:
#     print("RMSE for Chemical Dosages_Chlorine is greater than or equal to the standard deviation.")

# if rmse_Alum < stddev_Alum:
#     print("RMSE for Chemical Dosages_Alum is less than the standard deviation.")
# else:
#     print("RMSE for Chemical Dosages_Alum is greater than or equal to the standard deviation.")

RMSE for Chemical Dosages_Chlorine:  0.28664328288192314
RMSE for Chemical Dosages_Alum:  3.14956166920326
Standard deviation for Chemical Dosages_Chlorine:  0.31651539040239485
Standard deviation for Chemical Dosages_Alum:  3.1604213835981416
R2 score for Chemical Dosages_Chlorine:  0.17727788955402002
R2 score for Chemical Dosages_Alum:  0.003747223415838885
