# Water Potability Pipeline Development Notebook

In [1]:
import sys
sys.path.append('/home/jovyan/work')

from handyspark import *

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import Imputer, VectorAssembler, PolynomialExpansion, StandardScaler

# from pyspark.mllib.stat import Statistics
#from pipeline import feature_pipeline

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

21/07/26 19:05:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Step 2 - Feature Pipeline Development

### Make Test-Train Split

It is important to make the test train split before doing any feature pipeline development. Giving your pipline prior knowledge of the test data is a form of data leakage that can introduce bias into the model.

In [13]:
# Randomly split the the dataset into test and train.
# Set the seed value for reproducability

df = spark.read.csv('../data/water_potability.csv',inferSchema=True, header=True)
df_train, df_test = df.randomSplit([0.8, 0.2], seed = 42)
print("Training Dataset Count: " + str(df_train.count()))
print("Test Dataset Count: " + str(df_test.count()))

Training Dataset Count: 2675
Test Dataset Count: 601


### Impute Missing Values

Imputing missing values based on the value of the target column is somewhat difficult to do with Pyspark, so I am going to use a package called HandySpark which makes it easier.

In [14]:
hdf = df_train.toHandy()


#hdf_filled = hdf.stratify(['Potability']).fill(continuous=['ph'], strategy=['mean'])
#df_train = hdf_filled.notHandy()

# Check for Missing Data
#df_train.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

#imputer = Imputer(
#    inputCols=['ph', 'Sulfate', 'Trihalomethanes'],
#    outputCols=['ph', 'Sulfate', 'Trihalomethanes']
#)
#df_imputed = imputer.setStrategy("mean").fit(df_train).transform(df_train)
#df_imputed.show(10)


[1m[91m---------------------------------------------------------------------------
HANDY EXCEPTION SUMMARY
[0m
Location: "/opt/conda/lib/python3.9/site-packages/pandas/core/indexing.py"
Line	: 1500
Function: _validate_integer
[1m[91mError	: IndexError: single positional indexer is out-of-bounds[0m
[1m[91m---------------------------------------------------------------------------[0m


HandyException: single positional indexer is out-of-bounds

### Create Feature Vector

In [17]:
vec_assembler = VectorAssembler(
    inputCols=['Hardness','Solids','Chloramines','Conductivity','Organic_carbon',
               'Turbidity'],
    outputCol='Features'
)
df_features = vec_assembler.transform(df_imputed)
df_features.select('Features').take(3)

[Row(Features=DenseVector([98.3679, 28415.5758, 10.5589, 505.2403, 12.8826, 4.1191])),
 Row(Features=DenseVector([103.4648, 27420.1674, 8.4173, 485.9745, 11.3511, 4.6208])),
 Row(Features=DenseVector([108.9166, 14476.3357, 5.3982, 512.2323, 15.0138, 3.8956]))]

### Scale the Features

In [18]:
scaler = StandardScaler(
    inputCol='Features', outputCol='ScaledFeatures',
    withStd=True, withMean=True
)

scalerFit = scaler.fit(df_features)

df_features_scaled = scalerFit.transform(df_features)
df_features_scaled.select('ScaledFeatures').take(3)

[Row(ScaledFeatures=DenseVector([-2.9992, 0.7207, 2.1483, 0.9852, -0.4139, 0.1916])),
 Row(ScaledFeatures=DenseVector([-2.844, 0.6065, 0.8047, 0.7464, -0.8819, 0.8329])),
 Row(ScaledFeatures=DenseVector([-2.678, -0.8779, -1.0894, 1.0718, 0.2373, -0.0941]))]

### Perform Polynomial Feature Expansion

In [19]:
poly_feature_exp = PolynomialExpansion(degree=3, inputCol="ScaledFeatures", outputCol="PolynomialFeatures")
poly_features = poly_feature_exp.transform(df_features_scaled)
poly_features.select('PolynomialFeatures').take(1)

[Row(PolynomialFeatures=DenseVector([-2.9992, 8.9953, -26.9789, 0.7207, -2.1615, 6.4829, 0.5194, -1.5578, 0.3743, 2.1483, -6.4432, 19.3247, 1.5483, -4.6436, 1.1158, 4.6152, -13.842, 3.3262, 9.9149, 0.9852, -2.9547, 8.8619, 0.71, -2.1295, 0.5117, 2.1164, -6.3476, 1.5253, 4.5467, 0.9705, -2.9109, 0.6995, 2.085, 0.9561, -0.4139, 1.2414, -3.7232, -0.2983, 0.8947, -0.215, -0.8892, 2.6669, -0.6408, -1.9103, -0.4078, 1.223, -0.2939, -0.876, -0.4017, 0.1713, -0.5138, 0.1235, 0.368, 0.1688, -0.0709, 0.1916, -0.5748, 1.7238, 0.1381, -0.4142, 0.0995, 0.4117, -1.2347, 0.2967, 0.8844, 0.1888, -0.5662, 0.1361, 0.4056, 0.186, -0.0793, 0.2379, -0.0572, -0.1704, -0.0781, 0.0328, 0.0367, -0.1101, 0.0265, 0.0789, 0.0362, -0.0152, 0.007]))]

## Test Pipeline Code

In [20]:
feature_cols = ['ph','Hardness','Solids','Chloramines','Sulfate','Conductivity',
               'Organic_carbon', 'Trihalomethanes', 'Turbidity']
assembler_out_col = 'Features'
scaler_out_col = 'ScaledFeatures'
expander_out_col = 'ExpandedFeatures'
degree = 3

pipeline = feature_pipeline.create_feature_pipeline(df_train, feature_cols, assembler_out_col, 
                                                    scaler_out_col, expander_out_col, degree)

pipeline.transform(df_train).select('ExpandedFeatures').take(1)

[Row(ExpandedFeatures=DenseVector([-0.0, 0.0, -0.0, -2.9992, 0.0, -0.0, 8.9953, -0.0, -26.9789, 0.7207, -0.0, 0.0, -2.1615, 0.0, 6.4829, 0.5194, -0.0, -1.5578, 0.3743, 2.1483, -0.0, 0.0, -6.4432, 0.0, 19.3247, 1.5483, -0.0, -4.6436, 1.1158, 4.6152, -0.0, -13.842, 3.3262, 9.9149, -1.0138, 0.0, -0.0, 3.0405, -0.0, -9.1191, -0.7306, 0.0, 2.1913, -0.5265, -2.1779, 0.0, 6.5319, -1.5696, -4.6787, 1.0277, -0.0, -3.0824, 0.7407, 2.2079, -1.0419, 0.9852, -0.0, 0.0, -2.9547, 0.0, 8.8619, 0.71, -0.0, -2.1295, 0.5117, 2.1164, -0.0, -6.3476, 1.5253, 4.5467, -0.9987, 0.0, 2.9954, -0.7198, -2.1456, 1.0125, 0.9705, -0.0, -2.9109, 0.6995, 2.085, -0.9839, 0.9561, -0.4139, 0.0, -0.0, 1.2414, -0.0, -3.7232, -0.2983, 0.0, 0.8947, -0.215, -0.8892, 0.0, 2.6669, -0.6408, -1.9103, 0.4196, -0.0, -1.2585, 0.3024, 0.9014, -0.4254, -0.4078, 0.0, 1.223, -0.2939, -0.876, 0.4134, -0.4017, 0.1713, -0.0, -0.5138, 0.1235, 0.368, -0.1737, 0.1688, -0.0709, 1.2052, -0.0, 0.0, -3.6145, 0.0, 10.8408, 0.8685, -0.0, -2.605, 0.