# Big Data Project: River Water Quality Analysis with PySpark

**Objective:** This notebook demonstrates a basic Big Data analytics project using PySpark. We will load, clean, analyze, and visualize the 'river_water_resources.csv' dataset to uncover insights about water quality across different states and water body types.

## 1. Setup: Install and Import Libraries

First, we need to install `pyspark`, the Python library for Apache Spark.

In [1]:
!pip install pyspark



## 2. Initialize Spark Session

The `SparkSession` is the entry point for any PySpark application. It allows us to create DataFrames and execute Spark commands.

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, min, max, count, desc, when, expr
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import pandas as pd
import re

# Create a SparkSession
spark = SparkSession.builder \
    .appName("WaterQualityAnalysis") \
    .getOrCreate()

print("SparkSession Created successfully")

SparkSession Created successfully


## 3. Load the Data

We will now read the `river_water_resources.csv` file into a Spark DataFrame. We'll specify `header=True` to use the first row as column names and `inferSchema=True` to automatically detect data types.

In [3]:
# Define the file path
# This assumes the file is in the same directory as the notebook.
file_path = 'river_water_resources.csv'

# Read the CSV file into a Spark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first 5 rows to verify it loaded correctly
df.show(5)

+--------+--------------------+----+---------------+----------+---------------------+---------------------+---------------+---------------+--------+--------+----------------+----------------+---------------------+---------------------+
|STN code| Monitoring Location|Year|Type Water Body|State Name|Temperature (C) - Min|Temperature (C) - Max|Dissolved - Min|Dissolved - Max|pH - Min|pH - Max|BOD (mg/L) - Min|BOD (mg/L) - Max|NitrateN (mg/L) - Min|NitrateN (mg/L) - Max|
+--------+--------------------+----+---------------+----------+---------------------+---------------------+---------------+---------------+--------+--------+----------------+----------------+---------------------+---------------------+
|    4085|RIVER JUMAR AT BI...|2022|          RIVER| JHARKHAND|                 12.0|                 29.0|            3.3|            5.2|     6.5|     6.6|               2|             2.9|                 0.37|                  1.9|
|    2396|RIVER JUMAR AT KA...|2022|          RIVER| JHA

## 4. Data Exploration and Cleaning

Let's start by looking at the schema to understand the data types Spark has inferred.

In [4]:
# Print the schema to understand data types
df.printSchema()

root
 |-- STN code: integer (nullable = true)
 |-- Monitoring Location: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Type Water Body: string (nullable = true)
 |-- State Name: string (nullable = true)
 |-- Temperature (C) - Min: double (nullable = true)
 |-- Temperature (C) - Max: double (nullable = true)
 |-- Dissolved - Min: string (nullable = true)
 |-- Dissolved - Max: double (nullable = true)
 |-- pH - Min: double (nullable = true)
 |-- pH - Max: double (nullable = true)
 |-- BOD (mg/L) - Min: string (nullable = true)
 |-- BOD (mg/L) - Max: double (nullable = true)
 |-- NitrateN (mg/L) - Min: string (nullable = true)
 |-- NitrateN (mg/L) - Max: double (nullable = true)



### 4a. Clean Column Names

The column names contain spaces, parentheses, and other special characters (e.g., `BOD (mg/L) - Max`). This makes querying difficult. We'll clean them by replacing special characters with underscores.

In [5]:
# Function to clean column names
def clean_col_name(name):
    name = re.sub(r'[^a-zA-Z0-9_]', '_', name) # Replace special chars with _
    name = re.sub(r'__+', '_', name) # Replace multiple _ with single _
    name = name.strip('_') # Remove leading/trailing _
    return name

# Create a new DataFrame with cleaned column names
df_cleaned = df
for c in df.columns:
    df_cleaned = df_cleaned.withColumnRenamed(c, clean_col_name(c))

print("Cleaned Schema:")
df_cleaned.printSchema()

print("DataFrame with Cleaned Column Names:")
df_cleaned.show(5)

Cleaned Schema:
root
 |-- STN_code: integer (nullable = true)
 |-- Monitoring_Location: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Type_Water_Body: string (nullable = true)
 |-- State_Name: string (nullable = true)
 |-- Temperature_C_Min: double (nullable = true)
 |-- Temperature_C_Max: double (nullable = true)
 |-- Dissolved_Min: string (nullable = true)
 |-- Dissolved_Max: double (nullable = true)
 |-- pH_Min: double (nullable = true)
 |-- pH_Max: double (nullable = true)
 |-- BOD_mg_L_Min: string (nullable = true)
 |-- BOD_mg_L_Max: double (nullable = true)
 |-- NitrateN_mg_L_Min: string (nullable = true)
 |-- NitrateN_mg_L_Max: double (nullable = true)

DataFrame with Cleaned Column Names:
+--------+--------------------+----+---------------+----------+-----------------+-----------------+-------------+-------------+------+------+------------+------------+-----------------+-----------------+
|STN_code| Monitoring_Location|Year|Type_Water_Body|State_Name|Temper

### 4b. Handle Missing Values and Bad Data (FIXED)

Real-world data is often incomplete or contains bad data (like text in a number column). 

1.  **Cast Numeric Columns:** We will explicitly cast our numeric columns using `try_cast`, as recommended by the error log. This will turn any malformed text (like 'BDL') into `null` without throwing an error.
2.  **Drop Nulls:** After casting, we will drop any rows where our critical analysis columns are `null`.

In [None]:
# Get the count of rows before cleaning
original_count = df_cleaned.count()

# Define the columns that should be numeric
# FIX: Corrected Temp_C_Min/Max to Temperature_C_Min/Max to match cleaned names
numeric_cols_to_cast = [
    "Temperature_C_Min", "Temperature_C_Max", "Dissolved_Min", "Dissolved_Max",
    "pH_Min", "pH_Max", "BOD_mg_L_Min", "BOD_mg_L_Max",
    "NitrateN_mg_L_Min", "NitrateN_mg_L_Max"
]

# Cast columns to Double. Malformed values like 'BDL' will become null.
for col_name in numeric_cols_to_cast:
    if col_name in df_cleaned.columns:
         # Use expr("try_cast(... AS DOUBLE)") as suggested by the error
         df_cleaned = df_cleaned.withColumn(col_name, expr(f"try_cast({col_name} AS DOUBLE)"))

# Now, define the critical columns for analysis
# We will drop any row that has a null value in any of these columns
critical_cols = [
    "State_Name", "Type_Water_Body", "BOD_mg_L_Max", 
    "Dissolved_Min", "pH_Min", "pH_Max"
]

# Drop rows where any of these critical columns are null
# (this includes original nulls AND values that failed to cast, like 'BDL')
df_cleaned = df_cleaned.dropna(subset=critical_cols)

# Get the count after dropping nulls/bad data
cleaned_count = df_cleaned.count()

print(f"Original row count: {original_count}")
print(f"Row count after dropping nulls/bad data: {cleaned_count}")
print(f"Rows dropped: {original_count - cleaned_count}")

# Cache the cleaned DataFrame for faster performance on subsequent actions
df_cleaned.cache()

print("\nSchema after casting and cleaning:")
df_cleaned.printSchema()

## 5. Big Data Analysis with PySpark

Now we can perform distributed analysis on our cleaned data.

### 5a. Basic Statistics

Let's get some basic summary statistics for the key water quality parameters. This should work now that all data is numeric.

In [None]:
# Show summary statistics for important numerical columns
# FIX: Corrected Temp_C_Max to Temperature_C_Max to match cleaned name
df_cleaned.select("Temperature_C_Max", "Dissolved_Min", "pH_Max", "BOD_mg_L_Max", "NitrateN_mg_L_Max").describe().show()

### 5b. State-wise Analysis

Let's aggregate the data by state to see which states have the most monitoring locations and what the average water quality is like.

In [None]:
# Group by 'State_Name' and count monitoring locations
state_counts = df_cleaned.groupBy("State_Name") \
                         .count() \
                         .orderBy(desc("count"))

print("Top 10 States by Number of Monitoring Locations:")
state_counts.show(10)

# Calculate average max BOD and min Dissolved Oxygen by state
# High BOD (Biochemical Oxygen Demand) is generally bad.
# Low DO (Dissolved Oxygen) is also bad.
state_quality = df_cleaned.groupBy("State_Name") \
                          .agg(
                              avg("BOD_mg_L_Max").alias("Avg_Max_BOD"),
                              avg("Dissolved_Min").alias("Avg_Min_DO"),
                              avg("pH_Max").alias("Avg_Max_pH")
                          ) \
                          .orderBy(desc("Avg_Max_BOD"))

print("Top 10 States by Average Max BOD (Higher is worse):")
state_quality.show(10)

### 5c. Water Body Type Analysis

Let's analyze the data based on the type of water body (e.g., River, Drain, Canal).

In [None]:
# Group by 'Type_Water_Body'
water_body_analysis = df_cleaned.groupBy("Type_Water_Body") \
                                .agg(
                                    count("*").alias("Count"),
                                    avg("pH_Max").alias("Avg_Max_pH"),
                                    avg("BOD_mg_L_Max").alias("Avg_Max_BOD"),
                                    avg("Dissolved_Min").alias("Avg_Min_DO")
                                ) \
                                .orderBy(desc("Count"))

print("Analysis by Water Body Type:")
water_body_analysis.show()

### 5d. Identifying Potential Problem Areas

We can use Spark's powerful filtering to quickly identify locations that fail to meet certain quality standards. 

Let's define a "problem area" as having:
* **High BOD:** `BOD_mg_L_Max > 6.0` (indicates organic pollution)
* **Low DO:** `Dissolved_Min < 4.0` (harmful to aquatic life)
* **Extreme pH:** `pH_Min < 6.5` or `pH_Max > 8.5` (too acidic or alkaline)

In [None]:
# Filter for locations with poor water quality
problem_areas = df_cleaned.filter(
    (col("BOD_mg_L_Max") > 6.0) | 
    (col("Dissolved_Min") < 4.0) | 
    (col("pH_Min") < 6.5) | 
    (col("pH_Max") > 8.5)
) \
.select("Monitoring_Location", "State_Name", "Type_Water_Body", "BOD_mg_L_Max", "Dissolved_Min", "pH_Min", "pH_Max") \
.orderBy(desc("BOD_mg_L_Max"))

print("Potential Problem Areas (High BOD, Low DO, or Extreme pH):")
problem_areas.show(20)

print(f"Total locations identified as potential problem areas: {problem_areas.count()}")

## 6. Data Visualization

For visualization, we'll use `matplotlib`. PySpark DataFrames can be massive (billions of rows), so we can't plot them directly. 

The correct pattern is:
1.  Perform heavy aggregation in Spark (as we did with `state_counts`).
2.  The aggregated result is small (e.g., one row per state).
3.  Convert the small Spark DataFrame to a Pandas DataFrame using `.toPandas()`.
4.  Plot the Pandas DataFrame.

### Plot 1: Top 15 States by Monitoring Locations

In [None]:
# We already have `state_counts`. Let's take the top 15 states for plotting.
state_counts_pd = state_counts.limit(15).toPandas()

plt.figure(figsize=(12, 7))
plt.bar(state_counts_pd['State_Name'], state_counts_pd['count'], color='skyblue')
plt.xlabel("State", fontsize=12)
plt.ylabel("Number of Monitoring Locations", fontsize=12)
plt.title("Top 15 States by Monitoring Locations", fontsize=16)
plt.xticks(rotation=90)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

### Plot 2: Distribution of Water Body Types

In [None]:
# We have `water_body_analysis`. Let's use the 'Count' column.
water_body_pd = water_body_analysis.select("Type_Water_Body", "Count").toPandas()

plt.figure(figsize=(10, 8))
plt.pie(water_body_pd['Count'], labels=water_body_pd['Type_Water_Body'], autopct='%1.1f%%', startangle=140)
plt.title("Distribution of Water Body Types", fontsize=16)
plt.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle.
plt.show()

### Plot 3: Top 15 States by Average Max BOD

In [None]:
# We have `state_quality`. Let's plot the top 15 states with the highest Avg Max BOD.
# We'll sort ascending for the horizontal bar chart (plots from bottom up).
state_quality_pd = state_quality.limit(15).toPandas().sort_values("Avg_Max_BOD", ascending=True)

plt.figure(figsize=(12, 8))
plt.barh(state_quality_pd['State_Name'], state_quality_pd['Avg_Max_BOD'], color='coral')
plt.xlabel("Average Max BOD (mg/L)", fontsize=12)
plt.ylabel("State", fontsize=12)
plt.title("Top 15 States by Average Max BOD (Higher is worse)", fontsize=16)
plt.tight_layout()
plt.show()

## 7. Conclusion

This project successfully demonstrated the use of PySpark for a big data analytics workflow. We loaded a CSV file, cleaned messy column names, handled missing data, performed distributed aggregations, and filtered a large dataset to find insights. Finally, we converted our aggregated Spark DataFrames to Pandas to visualize the results.

This notebook can be scaled to run on a cluster with terabytes of data with no changes to the code, which is the true power of Apache Spark.

In [None]:
# Stop the SparkSession to release resources
spark.stop()
print("SparkSession stopped.")