# **Water Quality Prediction using Pyspark and MLLib**

Installing Pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=e4ebbceebf533dcebae4fad81693756af1f18548ecbbb6b898939ad1fa58aee1
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


Importing Necessary Libraries

In [None]:
from pyspark import SparkFiles

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler
from pyspark.ml.regression import LinearRegression,RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

Create a SparkSession

In [None]:
spark = SparkSession.builder.appName("Water Quality Prediction").getOrCreate()
spark

Load the Dataset

In [None]:
df = spark.read.csv('/content/Water_Quality_Prediction.csv', header=True, inferSchema=True)
df.show()

+-----+-----------+-----------+-----------+-----------+---------+-----------+--------------+-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+----------------------+---------+-----------------+---------------+---------+---+-----------+------+
|Index|         pH|       Iron|    Nitrate|   Chloride|     Lead|       Zinc|         Color|  Turbidity|   Fluoride|     Copper|       Odor|    Sulfate|Conductivity|   Chlorine|  Manganese|Total Dissolved Solids|   Source|Water Temperature|Air Temperature|    Month|Day|Time of Day|Target|
+-----+-----------+-----------+-----------+-----------+---------+-----------+--------------+-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+----------------------+---------+-----------------+---------------+---------+---+-----------+------+
|    0|8.332988427|    8.35E-5|8.605777156|122.7997723| 3.71E-52|3.434827042|     Colorless|0.022683282|0.607283431|  0.1445987|1.

In [None]:
df.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- pH: double (nullable = true)
 |-- Iron: double (nullable = true)
 |-- Nitrate: double (nullable = true)
 |-- Chloride: double (nullable = true)
 |-- Lead: double (nullable = true)
 |-- Zinc: double (nullable = true)
 |-- Color: string (nullable = true)
 |-- Turbidity: double (nullable = true)
 |-- Fluoride: double (nullable = true)
 |-- Copper: double (nullable = true)
 |-- Odor: double (nullable = true)
 |-- Sulfate: double (nullable = true)
 |-- Conductivity: double (nullable = true)
 |-- Chlorine: double (nullable = true)
 |-- Manganese: double (nullable = true)
 |-- Total Dissolved Solids: double (nullable = true)
 |-- Source: string (nullable = true)
 |-- Water Temperature: double (nullable = true)
 |-- Air Temperature: double (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Time of Day: integer (nullable = true)
 |-- Target: integer (nullable = true)



In [None]:
df.describe().show()

+-------+-----------------+------------------+-------------------+-----------------+-----------------+--------------------+------------------+---------+-------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+--------------------+----------------------+-------+------------------+------------------+---------+------------------+------------------+------+
|summary|            Index|                pH|               Iron|          Nitrate|         Chloride|                Lead|              Zinc|    Color|          Turbidity|          Fluoride|             Copper|              Odor|           Sulfate|     Conductivity|          Chlorine|           Manganese|Total Dissolved Solids| Source| Water Temperature|   Air Temperature|    Month|               Day|       Time of Day|Target|
+-------+-----------------+------------------+-------------------+-----------------+-----------------+--------------------+-------------

Displaying Shape of the data

In [None]:
print("Shape:", (df.count(), len(df.columns)))

Shape: (13490, 24)


Checking for null values

In [None]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(q) | col(q).isNull(), q)).alias(q) for q in df.columns]
   ).show()

+-----+---+----+-------+--------+----+----+-----+---------+--------+------+----+-------+------------+--------+---------+----------------------+------+-----------------+---------------+-----+---+-----------+------+
|Index| pH|Iron|Nitrate|Chloride|Lead|Zinc|Color|Turbidity|Fluoride|Copper|Odor|Sulfate|Conductivity|Chlorine|Manganese|Total Dissolved Solids|Source|Water Temperature|Air Temperature|Month|Day|Time of Day|Target|
+-----+---+----+-------+--------+----+----+-----+---------+--------+------+----+-------+------------+--------+---------+----------------------+------+-----------------+---------------+-----+---+-----------+------+
|    0|263|  99|    243|     376|  71| 340|   18|      120|     467|   434| 406|    415|         376|     135|      245|                     5|   194|              385|             67|  205|218|        224|     0|
+-----+---+----+-------+--------+----+----+-----+---------+--------+------+----+-------+------------+--------+---------+----------------------+-

In [None]:
df = df.dropna()
#missing_counts = df.select([count(when(col(q).isNull(), q)).alias(q) for q in df.columns])
#missing_counts.show()

In [None]:
missing_counts = df.select([count(when(col(q).isNull(), q)).alias(q) for q in df.columns])
missing_counts.show()

+-----+---+----+-------+--------+----+----+-----+---------+--------+------+----+-------+------------+--------+---------+----------------------+------+-----------------+---------------+-----+---+-----------+------+
|Index| pH|Iron|Nitrate|Chloride|Lead|Zinc|Color|Turbidity|Fluoride|Copper|Odor|Sulfate|Conductivity|Chlorine|Manganese|Total Dissolved Solids|Source|Water Temperature|Air Temperature|Month|Day|Time of Day|Target|
+-----+---+----+-------+--------+----+----+-----+---------+--------+------+----+-------+------------+--------+---------+----------------------+------+-----------------+---------------+-----+---+-----------+------+
|    0|  0|   0|      0|       0|   0|   0|    0|        0|       0|     0|   0|      0|           0|       0|        0|                     0|     0|                0|              0|    0|  0|          0|     0|
+-----+---+----+-------+--------+----+----+-----+---------+--------+------+----+-------+------------+--------+---------+----------------------+-

In [None]:
print("Shape:", (df.count(), len(df.columns)))

Shape: (9047, 24)


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

vectorAssembler = VectorAssembler(inputCols=["Chloride","Lead","Zinc","Turbidity","Fluoride"], outputCol="features")
vectorAssembler

VectorAssembler_229ca6c25ef5

In [None]:
normalizer = Normalizer(inputCol="features",outputCol="features_norm")
normalizer

Normalizer_429b35951961

In [None]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
target_col = 'Target'

In [None]:
lr = LinearRegression(featuresCol="features_norm", labelCol=target_col)

In [None]:
# Define the stages of the pipeline
stages = [vectorAssembler, normalizer, lr]

# Create a pipeline with the defined stages
pipeline = Pipeline(stages=stages)

In [None]:
from pyspark.sql.functions import col
df = df.withColumn("color_int", col("Color").cast("int"))

In [None]:
pipeline_model = pipeline.fit(df)

In [None]:
predictions_from_pipeline = pipeline_model.transform(df)

In [None]:
predictions_from_pipeline.select("Target", "prediction").show()

+------+----------+
|Target|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
+------+----------+
only showing top 20 rows



In [None]:
# Evaluate the model and perform further analysis as needed

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate the model using accuracy (or other appropriate metric for multi-class classification)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="accuracy")

# Calculate the accuracy
accuracy = evaluator.evaluate(predictions_from_pipeline)
print(f"Accuracy: {accuracy}")


Accuracy: 1.0


In [None]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="Target", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_from_pipeline)
print(f'RMSE without hyperParamater Tuning: {rmse}')

RMSE without hyperParamater Tuning: 0.0
