In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline  
from pyspark.ml.feature import StringIndexer, Imputer, StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [55]:
spark = SparkSession.builder.appName("App").getOrCreate()

In [56]:
filePath_2018 = """C:/Users/bishe/OneDrive/Desktop/Personal/Softwerica/Big data/ground_water_quality_2018_post.csv"""
filePath_2019 = """C:/Users/bishe/OneDrive/Desktop/Personal/Softwerica/Big data/ground_water_quality_2019_post.csv"""
filePath_2020 = """C:/Users/bishe/OneDrive/Desktop/Personal/Softwerica/Big data/ground_water_quality_2020_post.csv"""

gw_2018 = spark.read.csv(filePath_2018, header=True)
gw_2019 = spark.read.csv(filePath_2019, header=True)
gw_2020 = spark.read.csv(filePath_2020, header=True)

In [57]:
# Show the column names
print(gw_2018.columns)
print(gw_2019.columns)
print(gw_2020.columns)

['sno', 'district', 'mandal', 'village', 'lat_gis', 'long_gis', 'gwl', 'season', 'pH', 'E.C', 'TDS', 'CO3', 'HCO3', 'Cl', 'F', 'NO3 ', 'SO4', 'Na', 'K', 'Ca', 'Mg', 'T.H', 'SAR', 'Classification', 'RSC  meq  / L', 'Classification.1']
['sno', 'district', 'mandal', 'village', 'lat_gis', 'long_gis', 'gwl', 'season', 'pH', 'EC', 'TDS', 'CO_-2 ', 'HCO_ - ', 'Cl -', 'F -', 'NO3- ', 'SO4-2', 'Na+', 'K+', 'Ca+2', 'Mg+2', 'T.H', 'SAR', 'Classification', 'RSC  meq  / L', 'Classification.1']
['sno', 'district', 'mandal', 'village', 'lat_gis', 'long_gis', 'gwl', 'season', 'pH', 'E.C', 'TDS', 'CO3', 'HCO3', 'Cl', 'F', 'NO3 ', 'SO4', 'Na', 'K', 'Ca', 'Mg', 'T.H', 'SAR', 'Classification', 'RSC  meq  / L', 'Classification.1']


In [58]:
gw_2018.printSchema()
#gw_2019.printSchema()
#gw_2020.printSchema()


root
 |-- sno: string (nullable = true)
 |-- district: string (nullable = true)
 |-- mandal: string (nullable = true)
 |-- village: string (nullable = true)
 |-- lat_gis: string (nullable = true)
 |-- long_gis: string (nullable = true)
 |-- gwl: string (nullable = true)
 |-- season: string (nullable = true)
 |-- pH: string (nullable = true)
 |-- E.C: string (nullable = true)
 |-- TDS: string (nullable = true)
 |-- CO3: string (nullable = true)
 |-- HCO3: string (nullable = true)
 |-- Cl: string (nullable = true)
 |-- F: string (nullable = true)
 |-- NO3 : string (nullable = true)
 |-- SO4: string (nullable = true)
 |-- Na: string (nullable = true)
 |-- K: string (nullable = true)
 |-- Ca: string (nullable = true)
 |-- Mg: string (nullable = true)
 |-- T.H: string (nullable = true)
 |-- SAR: string (nullable = true)
 |-- Classification: string (nullable = true)
 |-- RSC  meq  / L: string (nullable = true)
 |-- Classification.1: string (nullable = true)



In [8]:
#gw_2018.show(n=34)

Assigning same column names for all dataset.


In [59]:
column_names = ['sno', 'district', 'mandal', 'village', 'lat_gis', 'long_gis', 'gwl', 'season', 'pH', 'EC', 'TDS', 
                'CO3', 'HCO3', 'Cl', 'F', 'NO3 ', 'SO4', 'Na', 'K', 'Ca', 'Mg', 'TH', 'SAR', 
                'Classification', 'RSC  meq  / L', 'Classification1']

In [60]:
# Create a new DataFrame with the specified column names
gw_2018 = gw_2018.toDF(*column_names).na.drop()
gw_2019 = gw_2019.toDF(*column_names).na.drop()
gw_2020= gw_2020.toDF(*column_names).na.drop()

DELETE CLASSES THAT ARE NOT PRESENT IN TESTING SET


In [11]:
unique_names = gw_2018.select("Classification").distinct()
unique_names.show()

+--------------+
|Classification|
+--------------+
|          C1S1|
|          C2S2|
|          C3S2|
|          C4S4|
|          C2S1|
|          C4S1|
|          C4S2|
|          C4S3|
|          C3S3|
|          C3S4|
|          C3S1|
+--------------+



In [12]:
unique_names = gw_2019.select("Classification").distinct()
unique_names.show()

+--------------+
|Classification|
+--------------+
|          C1S1|
|          C3S2|
|          C2S1|
|          C4S1|
|          C4S2|
|          C4S3|
|            OG|
|          C3S1|
+--------------+



In [19]:
from pyspark.sql.functions import when, col
# Replace 'O.G.' with 'OG' in the 'Classification' column
gw_2020 = gw_2020.withColumn("Classification", 
                   when(col("Classification") == "O.G", "OG")
                   .otherwise(col("Classification")))

In [49]:
unique_names = gw_2020.select("Classification").distinct()
unique_names.show()

+--------------+
|Classification|
+--------------+
|          C3S2|
|          C4S4|
|          C2S1|
|          C4S1|
|          C4S2|
|          C4S3|
|          C3S3|
|          C3S1|
+--------------+



COMBINE 2018, 2019 AND 2020 DATASET

In [52]:
gw_combined = gw_2018.union(gw_2019)
combined_df = gw_combined.union(gw_2020)
combined_df = combined_df.filter(combined_df['Classification'] != 'OG')

In [53]:
def export_to_csv(df, path):
    pandas_df = df.toPandas()
    pandas_df.to_csv(path, index=False)


# Usage
export_to_csv(combined_df, "C:/hadoop/combined_data.csv")

SEPERATE FEATURES AND CLASSIFICATION AND CAST COLUMNS TO FLOAT

In [26]:
target_column = 'Classification'
feature_columns = ['gwl', 'pH', 'EC', 'TDS','CO3', 'HCO3', 'Cl', 'F', 'NO3 ', 'SO4',
            'Na', 'K', 'Ca', 'Mg', 'TH', 'SAR', 'RSC  meq  / L']
all_columns = ['Classification','gwl', 'pH', 'EC', 'TDS',
            'CO3', 'HCO3', 'Cl', 'F', 'NO3 ', 'SO4', 'Na', 'K', 'Ca', 'Mg', 'TH', 'SAR', 'RSC  meq  / L']

# Cast columns to float
for column in feature_columns:
    combined_df = combined_df.withColumn(column, col(column).cast("float"))

In [27]:
combined_data = combined_df[all_columns]

EXPORT AS CSV FOR TABLEAU

Creating a pipeline

In [30]:


# Handle missing values by imputing with mean for numerical columns
imputer = Imputer(inputCols=feature_columns, outputCols=[f"{col}_imputed" for col in feature_columns])

# Encode target variable
target_indexer = StringIndexer(inputCol=target_column, outputCol=target_column + '_indexed')

# Combine all features into a single vector
assembler = VectorAssembler(inputCols=[f"{col}_imputed" for col in feature_columns], outputCol="features")

# Standardize numerical features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Define the pipeline
pipeline = Pipeline(stages=[imputer, target_indexer, assembler, scaler])


In [31]:
# Fit the pipeline
pipeline_model = pipeline.fit(combined_data)
transformed_data = pipeline_model.transform(combined_data)

# Select features and target variable
final_data = transformed_data.select("scaledFeatures", target_column + '_indexed',target_column)
final_data = final_data.withColumnRenamed("scaledFeatures", "features").withColumnRenamed(target_column + '_indexed', "label")

# Split the data into training and validation sets
train_data, val_data = final_data.randomSplit([0.8, 0.2], seed=42)

RANDOM FOREST

In [41]:


# Train a RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=100)
rf_model = rf.fit(train_data)

# Make predictions on the validation set
rf_predictions = rf_model.transform(val_data)

# Evaluate the model
rf_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = rf_evaluator.evaluate(rf_predictions)

print(f"Validation Accuracy: {accuracy}")

# Show some predictions
rf_predictions.select("features", "label", "prediction","Classification").show()

Validation Accuracy: 0.9408284023668639
+--------------------+-----+----------+--------------+
|            features|label|prediction|Classification|
+--------------------+-----+----------+--------------+
|[0.13858473032769...|  0.0|       0.0|          C3S1|
|[0.21777601154375...|  0.0|       0.0|          C3S1|
|[0.23229441441119...|  1.0|       1.0|          C2S1|
|[0.27980916740693...|  0.0|       0.0|          C3S1|
|[0.33260336537357...|  1.0|       1.0|          C2S1|
|[0.34316219237978...|  0.0|       0.0|          C3S1|
|[0.36296002448422...|  0.0|       0.0|          C3S1|
|[0.40255565722530...|  1.0|       1.0|          C2S1|
|[0.47382780245281...|  0.0|       0.0|          C3S1|
|[0.47382780245281...|  1.0|       1.0|          C2S1|
|[0.48174695417526...|  1.0|       1.0|          C2S1|
|[0.48306680755103...|  0.0|       0.0|          C3S1|
|[0.48834622105414...|  0.0|       0.0|          C3S1|
|[0.52134258691634...|  0.0|       0.0|          C3S1|
|[0.55829851290588...|  0

Decision tree

In [42]:


dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
dt_model = dt.fit(train_data)

# Make predictions on the validation set
dt_predictions = dt_model.transform(val_data)

# Evaluate the model
dt_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
dt_accuracy = dt_evaluator.evaluate(dt_predictions)

print(f"Validation Accuracy: {dt_accuracy}")

# Show some predictions
dt_predictions.select("features", "label", "prediction","Classification").show()

Validation Accuracy: 0.9408284023668639
+--------------------+-----+----------+--------------+
|            features|label|prediction|Classification|
+--------------------+-----+----------+--------------+
|[0.13858473032769...|  0.0|       0.0|          C3S1|
|[0.21777601154375...|  0.0|       0.0|          C3S1|
|[0.23229441441119...|  1.0|       1.0|          C2S1|
|[0.27980916740693...|  0.0|       0.0|          C3S1|
|[0.33260336537357...|  1.0|       1.0|          C2S1|
|[0.34316219237978...|  0.0|       0.0|          C3S1|
|[0.36296002448422...|  0.0|       2.0|          C3S1|
|[0.40255565722530...|  1.0|       9.0|          C2S1|
|[0.47382780245281...|  0.0|       0.0|          C3S1|
|[0.47382780245281...|  1.0|       1.0|          C2S1|
|[0.48174695417526...|  1.0|       1.0|          C2S1|
|[0.48306680755103...|  0.0|       0.0|          C3S1|
|[0.48834622105414...|  0.0|       0.0|          C3S1|
|[0.52134258691634...|  0.0|       0.0|          C3S1|
|[0.55829851290588...|  0